/*
 * Decompiled with CFR 0.152.
 */
package com.tigerbrokers.stock.openapi.client.socket.executor;

import com.tigerbrokers.stock.openapi.client.socket.data.pb.SocketCommon;
import com.tigerbrokers.stock.openapi.client.socket.executor.MessageCallbackExecutor;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class PerDataTypeSymbolHashExecutor
implements MessageCallbackExecutor {
    private static final int DEFAULT_QUEUE_SIZE = 50000;
    private static final int DEFAULT_SYMBOL_THREADS = 4;
    private static final Set<SocketCommon.DataType> QUOTE_DATA_TYPES = EnumSet.of(SocketCommon.DataType.Quote, new SocketCommon.DataType[]{SocketCommon.DataType.Option, SocketCommon.DataType.Future, SocketCommon.DataType.QuoteDepth, SocketCommon.DataType.TradeTick, SocketCommon.DataType.Kline});
    private static final Set<SocketCommon.DataType> ORDER_DATA_TYPES = EnumSet.of(SocketCommon.DataType.OrderStatus, SocketCommon.DataType.OrderTransaction);
    private final ConcurrentMap<SocketCommon.DataType, ExecutorGroup> executorGroups = new ConcurrentHashMap<SocketCommon.DataType, ExecutorGroup>();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final int queueCapacity;
    private final int symbolThreadCount;

    public PerDataTypeSymbolHashExecutor() {
        this(50000, 4);
    }

    public PerDataTypeSymbolHashExecutor(int queueCapacity, int symbolThreadCount) {
        this.queueCapacity = queueCapacity <= 0 ? 50000 : queueCapacity;
        this.symbolThreadCount = symbolThreadCount <= 0 ? 4 : symbolThreadCount;
    }

    @Override
    public void execute(Runnable callback, SocketCommon.DataType dataType, String symbol) {
        if (!this.running.get()) {
            return;
        }
        if (dataType == null || callback == null) {
            if (callback != null) {
                callback.run();
            }
            return;
        }
        ExecutorGroup group = this.executorGroups.computeIfAbsent(dataType, this::createGroup);
        group.execute(callback, symbol);
    }

    private ExecutorGroup createGroup(SocketCommon.DataType dataType) {
        if (QUOTE_DATA_TYPES.contains((Object)dataType)) {
            return new SymbolHashGroup("tiger-" + dataType.name().toLowerCase(), this.symbolThreadCount, this.queueCapacity);
        }
        return new SingleThreadGroup(dataType, "tiger-" + dataType.name().toLowerCase(), this.queueCapacity);
    }

    @Override
    public void shutdown() {
        if (!this.running.compareAndSet(true, false)) {
            return;
        }
        this.executorGroups.values().forEach(ExecutorGroup::shutdown);
        this.executorGroups.clear();
    }

    @Override
    public void shutdownNow() {
        if (!this.running.compareAndSet(true, false)) {
            return;
        }
        this.executorGroups.values().forEach(ExecutorGroup::shutdownNow);
        this.executorGroups.clear();
    }

    private static ThreadFactory buildThreadFactory(String threadName) {
        return r -> {
            Thread t = new Thread(r, threadName);
            t.setDaemon(true);
            return t;
        };
    }

    private static class SymbolHashGroup
    implements ExecutorGroup {
        private final ExecutorService[] executors;

        SymbolHashGroup(String baseName, int threadCount, int queueCapacity) {
            int safeCount = threadCount <= 0 ? 4 : threadCount;
            this.executors = new ExecutorService[safeCount];
            for (int i = 0; i < safeCount; ++i) {
                String threadName = baseName + "-worker-" + i;
                this.executors[i] = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), PerDataTypeSymbolHashExecutor.buildThreadFactory(threadName), new ThreadPoolExecutor.DiscardOldestPolicy());
            }
        }

        @Override
        public void execute(Runnable task, String symbol) {
            this.pick(symbol).submit(task);
        }

        private ExecutorService pick(String symbol) {
            if (symbol == null || symbol.isEmpty()) {
                return this.executors[0];
            }
            int index = (symbol.hashCode() & Integer.MAX_VALUE) % this.executors.length;
            return this.executors[index];
        }

        @Override
        public void shutdown() {
            for (ExecutorService executor : this.executors) {
                executor.shutdown();
            }
        }

        @Override
        public void shutdownNow() {
            for (ExecutorService executor : this.executors) {
                executor.shutdownNow();
            }
        }
    }

    private static class SingleThreadGroup
    implements ExecutorGroup {
        private final ExecutorService executor;

        SingleThreadGroup(SocketCommon.DataType dataType, String threadName, int queueCapacity) {
            RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
            if (ORDER_DATA_TYPES.contains((Object)dataType)) {
                handler = new ThreadPoolExecutor.CallerRunsPolicy();
            }
            this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), PerDataTypeSymbolHashExecutor.buildThreadFactory(threadName + "-worker"), handler);
        }

        @Override
        public void execute(Runnable task, String symbol) {
            this.executor.submit(task);
        }

        @Override
        public void shutdown() {
            this.executor.shutdown();
        }

        @Override
        public void shutdownNow() {
            this.executor.shutdownNow();
        }
    }

    private static interface ExecutorGroup {
        public void execute(Runnable var1, String var2);

        public void shutdown();

        public void shutdownNow();
    }
}

