/*
 * Decompiled with CFR 0.152.
 */
package com.tigerbrokers.stock.openapi.client.socket.executor;

import com.tigerbrokers.stock.openapi.client.socket.data.pb.SocketCommon;
import com.tigerbrokers.stock.openapi.client.socket.executor.MessageCallbackExecutor;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class PerDataTypeSingleThreadExecutor
implements MessageCallbackExecutor {
    private static final int DEFAULT_QUEUE_SIZE = 50000;
    private final Map<SocketCommon.DataType, ExecutorService> threadPools = new ConcurrentHashMap<SocketCommon.DataType, ExecutorService>();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private static final Set<SocketCommon.DataType> ORDER_DATA_TYPES = EnumSet.of(SocketCommon.DataType.OrderStatus, SocketCommon.DataType.OrderTransaction);
    private final int capacity;

    public PerDataTypeSingleThreadExecutor() {
        this(50000);
    }

    public PerDataTypeSingleThreadExecutor(int capacity) {
        this.capacity = capacity;
    }

    @Override
    public void execute(Runnable callback, SocketCommon.DataType dataType, String symbol) {
        if (!this.running.get()) {
            return;
        }
        if (dataType == null || callback == null) {
            if (callback != null) {
                callback.run();
            }
            return;
        }
        ExecutorService executorService = this.threadPools.computeIfAbsent(dataType, this::createSingleThreadExecutor);
        executorService.submit(callback);
    }

    ExecutorService createSingleThreadExecutor(SocketCommon.DataType dataType) {
        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
        if (ORDER_DATA_TYPES.contains((Object)dataType)) {
            handler = new ThreadPoolExecutor.CallerRunsPolicy();
        }
        return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(this.capacity), r -> {
            String threadName = "tiger-" + dataType.name().toLowerCase() + "-worker";
            Thread t = new Thread(r, threadName);
            t.setDaemon(true);
            return t;
        }, handler);
    }

    @Override
    public void shutdown() {
        if (!this.running.compareAndSet(true, false)) {
            return;
        }
        for (ExecutorService executorService : this.threadPools.values()) {
            executorService.shutdown();
        }
        this.threadPools.clear();
    }

    @Override
    public void shutdownNow() {
        if (!this.running.compareAndSet(true, false)) {
            return;
        }
        for (ExecutorService executorService : this.threadPools.values()) {
            executorService.shutdownNow();
        }
        this.threadPools.clear();
    }
}

