/*
 * Decompiled with CFR 0.152.
 */
package io.github.mainstringargs.polygon.nats;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.github.mainstringargs.polygon.enums.ChannelType;
import io.github.mainstringargs.polygon.nats.PolygonStreamListener;
import io.github.mainstringargs.polygon.nats.message.AggregatePerMinuteMessage;
import io.github.mainstringargs.polygon.nats.message.AggregatePerSecondMessage;
import io.github.mainstringargs.polygon.nats.message.ChannelMessage;
import io.github.mainstringargs.polygon.nats.message.QuotesMessage;
import io.github.mainstringargs.polygon.nats.message.TradesMessage;
import io.github.mainstringargs.util.concurrency.ExecutorTracer;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import io.nats.client.Options;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PolygonNatsClient {
    private List<PolygonStreamListener> listeners = new ArrayList<PolygonStreamListener>();
    private static Logger LOGGER = LogManager.getLogger(PolygonNatsClient.class);
    private Options polygonOptions;
    private Connection polygonConnection;
    private Dispatcher polygonDispatcher;
    private Map<String, Set<ChannelType>> currentSubscriptions = new HashMap<String, Set<ChannelType>>();
    private static final String ALL_TICKERS = "*";
    private static final ExecutorService executor = ExecutorTracer.newSingleThreadExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "PolygonNatsThread");
        }
    });

    public PolygonNatsClient(String keyId, String ... polygonNatsServers) {
        this(keyId, -1, polygonNatsServers);
    }

    public PolygonNatsClient(String keyId, int maxReconnects, String ... polygonNatsServers) {
        Options.Builder optionsBuilder = new Options.Builder();
        for (String serverUrl : polygonNatsServers) {
            optionsBuilder.server("nats://" + keyId + "@" + serverUrl);
        }
        optionsBuilder.maxReconnects(maxReconnects);
        this.polygonOptions = optionsBuilder.verbose().build();
        LOGGER.info("Polygon Server set to " + Arrays.toString(polygonNatsServers) + " " + this.polygonOptions);
    }

    public synchronized void addListener(PolygonStreamListener listener) {
        if (this.listeners.isEmpty()) {
            this.connect();
        }
        this.listeners.add(listener);
        this.updateSubscriptions(listener, true);
    }

    public synchronized void removeListener(PolygonStreamListener listener) {
        this.listeners.remove(listener);
        this.updateSubscriptions(listener, false);
        if (this.listeners.isEmpty()) {
            this.disconnect();
        }
    }

    private synchronized void updateSubscriptions(PolygonStreamListener listener, boolean isAdd) {
        if (listener != null) {
            Map<String, Set> toRemove = listener.getStockChannelTypes().entrySet().stream().collect(Collectors.toMap(e -> (String)e.getKey(), e -> new HashSet((Collection)e.getValue())));
            Map<String, Set> toAdd = listener.getStockChannelTypes().entrySet().stream().collect(Collectors.toMap(e -> (String)e.getKey(), e -> new HashSet((Collection)e.getValue())));
            if (isAdd) {
                for (Map.Entry<String, Set> entry : toAdd.entrySet()) {
                    Set<ChannelType> currentSubbedChannels = this.currentSubscriptions.get(entry.getKey());
                    if (currentSubbedChannels == null) {
                        currentSubbedChannels = new HashSet<ChannelType>();
                        this.currentSubscriptions.put(entry.getKey(), currentSubbedChannels);
                    }
                    for (ChannelType channel : entry.getValue()) {
                        String subscriptionName = channel.getAPIName() + '.' + entry.getKey();
                        if (!currentSubbedChannels.contains((Object)channel)) {
                            currentSubbedChannels.add(channel);
                            LOGGER.info("Subscribing to " + subscriptionName);
                            this.polygonDispatcher.subscribe(subscriptionName);
                            continue;
                        }
                        LOGGER.info("Already subscribed to " + subscriptionName);
                    }
                }
            } else {
                for (PolygonStreamListener polygonStreamListener : this.listeners) {
                    for (Map.Entry<String, Set<ChannelType>> entry : polygonStreamListener.getStockChannelTypes().entrySet()) {
                        if (!toRemove.containsKey(entry.getKey())) continue;
                        for (ChannelType cType : entry.getValue()) {
                            toRemove.get(entry.getKey()).remove((Object)cType);
                        }
                    }
                }
                for (Map.Entry entry : toRemove.entrySet()) {
                    for (ChannelType channel : (Set)entry.getValue()) {
                        if (this.currentSubscriptions.containsKey(entry.getKey())) {
                            this.currentSubscriptions.get(entry.getKey()).remove((Object)channel);
                        }
                        String subscriptionName = channel.getAPIName() + "." + (String)entry.getKey();
                        LOGGER.info("Unsubscribing from " + subscriptionName);
                        this.polygonDispatcher.unsubscribe(subscriptionName);
                    }
                }
            }
        }
        LOGGER.info("Subscriptions updated to " + this.currentSubscriptions);
    }

    private void connect() {
        LOGGER.info("Connecting...");
        try {
            this.polygonConnection = Nats.connect((Options)this.polygonOptions);
            LOGGER.info("Connected.");
            this.polygonDispatcher = this.polygonConnection.createDispatcher(msg -> executor.execute(new Runnable(){

                @Override
                public void run() {
                    String response = new String(msg.getData(), StandardCharsets.UTF_8);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("onMessage " + new String(response));
                    }
                    PolygonNatsClient.this.handleMessage(msg.getSubject(), response);
                }
            }));
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void disconnect() {
        LOGGER.info("Disconnecting...");
        if (this.polygonConnection != null) {
            try {
                this.polygonConnection.close();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOGGER.info("Disconnected.");
        }
    }

    public JsonObject getAsJsonObject(String message) {
        JsonElement jelement = new JsonParser().parse(new String(message));
        JsonObject jobject = jelement.getAsJsonObject();
        return jobject;
    }

    public void handleMessage(String subject, String response) {
        String[] subjectSplit = subject.split("\\.");
        String apiNameString = subjectSplit[0].trim();
        String tickerString = subjectSplit[1].trim();
        ChannelType cType = ChannelType.fromAPIName(apiNameString);
        ChannelMessage cMessage = this.getMessageToObject(cType, tickerString, this.getAsJsonObject(response));
        this.sendStreamMessageToObservers(tickerString, cType, cMessage);
    }

    private synchronized void sendStreamMessageToObservers(String ticker, ChannelType channelType, ChannelMessage cMessage) {
        for (PolygonStreamListener observer : this.listeners) {
            boolean sendToObserver = false;
            if (observer.getStockChannelTypes().containsKey(ticker)) {
                if (observer.getStockChannelTypes().get(ticker).contains((Object)channelType)) {
                    sendToObserver = true;
                }
            } else if (observer.getStockChannelTypes().containsKey(ALL_TICKERS) && observer.getStockChannelTypes().get(ALL_TICKERS).contains((Object)channelType)) {
                sendToObserver = true;
            }
            if (!sendToObserver) continue;
            observer.streamUpdate(ticker, channelType, cMessage);
        }
    }

    private ChannelMessage getMessageToObject(ChannelType cType, String tickerString, JsonObject asJsonObject) {
        switch (cType) {
            case QUOTES: {
                return new QuotesMessage(cType, tickerString, asJsonObject);
            }
            case TRADES: {
                return new TradesMessage(cType, tickerString, asJsonObject);
            }
            case AGGREGATE_PER_MINUTE: {
                return new AggregatePerMinuteMessage(cType, tickerString, asJsonObject);
            }
            case AGGREGATE_PER_SECOND: {
                return new AggregatePerSecondMessage(cType, tickerString, asJsonObject);
            }
        }
        return null;
    }

    public synchronized Map<String, Set<ChannelType>> getRegisteredStockChannelTypes() {
        HashMap<String, Set<ChannelType>> stockChannelTypes = new HashMap<String, Set<ChannelType>>();
        for (PolygonStreamListener observer : this.listeners) {
            Map<String, Set<ChannelType>> listenerStockChannelType = observer.getStockChannelTypes();
            if (listenerStockChannelType == null) continue;
            for (Map.Entry<String, Set<ChannelType>> entry : listenerStockChannelType.entrySet()) {
                if (!stockChannelTypes.containsKey(entry.getKey())) {
                    stockChannelTypes.put(entry.getKey(), entry.getValue());
                    continue;
                }
                ((Set)stockChannelTypes.get(entry.getKey())).addAll((Collection)entry.getValue());
            }
        }
        return stockChannelTypes;
    }
}

