/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.comm.SqlStdEnum;
import com.xxdb.data.BasicAnyVector;
import com.xxdb.data.BasicBoolean;
import com.xxdb.data.BasicDictionary;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicLong;
import com.xxdb.data.BasicString;
import com.xxdb.data.BasicStringVector;
import com.xxdb.data.Entity;
import com.xxdb.data.Vector;
import com.xxdb.data.Void;
import com.xxdb.streaming.client.Daemon;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageDispatcher;
import com.xxdb.streaming.client.MessageHandler;
import com.xxdb.streaming.client.QueueManager;
import com.xxdb.streaming.client.Site;
import com.xxdb.streaming.client.StreamDeserializer;
import java.io.IOException;
import java.net.SocketException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractClient
implements MessageDispatcher {
    protected static final int DEFAULT_PORT = 8849;
    protected static final String DEFAULT_HOST = "localhost";
    protected static final String DEFAULT_ACTION_NAME = "javaStreamingApi";
    protected ConcurrentHashMap<String, ReconnectItem> reconnectTable = new ConcurrentHashMap();
    protected int listeningPort;
    protected String listeningHost = "";
    protected QueueManager queueManager = new QueueManager();
    protected ConcurrentHashMap<String, List<IMessage>> messageCache = new ConcurrentHashMap();
    protected HashMap<String, String> tableNameToTrueTopic = new HashMap();
    protected HashMap<String, String> HATopicToTrueTopic = new HashMap();
    protected HashMap<String, Boolean> hostEndian = new HashMap();
    protected Thread pThread;
    protected ConcurrentHashMap<String, Site[]> trueTopicToSites = new ConcurrentHashMap();
    protected CopyOnWriteArraySet<String> waitReconnectTopic = new CopyOnWriteArraySet();
    protected Map<String, StreamDeserializer> subInfos_ = new HashMap<String, StreamDeserializer>();
    protected HashMap<List<String>, List<String>> users = new HashMap();
    protected boolean isClose_ = false;
    protected LinkedBlockingQueue<DBConnection> connList = new LinkedBlockingQueue();
    private Daemon daemon = null;
    private static final Logger log = LoggerFactory.getLogger(AbstractClient.class);

    @Override
    public void setNeedReconnect(String topic, int v) {
        if (topic.equals("")) {
            return;
        }
        String site = topic.substring(0, topic.indexOf("/"));
        Set keys = this.reconnectTable.keySet();
        if (!keys.contains(site)) {
            ReconnectItem item = new ReconnectItem(v, System.currentTimeMillis());
            item.putTopic(topic);
            this.reconnectTable.put(site, item);
        } else {
            ReconnectItem item = this.reconnectTable.get(site);
            item.setState(v);
            item.setTimestamp(System.currentTimeMillis());
            item.putTopic(topic);
        }
    }

    @Override
    public int getNeedReconnect(String site) {
        ReconnectItem item = this.reconnectTable.get(site);
        if (item != null) {
            return item.getState();
        }
        return 0;
    }

    @Override
    public long getReconnectTimestamp(String site) {
        ReconnectItem item = this.reconnectTable.get(site);
        if (item != null) {
            return item.getTimestamp();
        }
        return 0L;
    }

    @Override
    public void setReconnectTimestamp(String site, long v) {
        ReconnectItem item = this.reconnectTable.get(site);
        if (item != null) {
            item.setTimestamp(v);
        }
    }

    @Override
    public List<String> getAllTopicsBySite(String site) {
        ArrayList<String> re = new ArrayList<String>();
        for (String topic : this.trueTopicToSites.keySet()) {
            String s = topic.substring(0, topic.indexOf("/"));
            if (!s.equals(site)) continue;
            re.add(topic);
        }
        return re;
    }

    @Override
    public Set<String> getAllReconnectTopic() {
        return this.waitReconnectTopic;
    }

    @Override
    public List<String> getAllReconnectSites() {
        ArrayList<String> re = new ArrayList<String>();
        for (String site : this.reconnectTable.keySet()) {
            ReconnectItem item = this.reconnectTable.get(site);
            if (item.getState() <= 0) continue;
            re.add(site);
        }
        return re;
    }

    @Override
    public Site getSiteByName(String site) {
        Site[] sites;
        List<String> topics = this.getAllTopicsBySite(site);
        if (topics.size() > 0 && (sites = this.trueTopicToSites.get(topics.get(0))).length > 0) {
            return this.getActiveSite(sites);
        }
        return null;
    }

    @Override
    public Map<String, StreamDeserializer> getSubInfos() {
        return this.subInfos_;
    }

    protected abstract boolean doReconnect(Site var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setMsgId(String topic, long msgId) {
        ConcurrentHashMap<String, Site[]> concurrentHashMap = this.trueTopicToSites;
        synchronized (concurrentHashMap) {
            Site[] sites = this.trueTopicToSites.get(topic);
            if (sites == null || sites.length == 0) {
                return;
            }
            if (sites.length == 1) {
                sites[0].msgId = msgId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean tryReconnect(String topic) {
        ConcurrentHashMap<String, ReconnectItem> concurrentHashMap = this.reconnectTable;
        synchronized (concurrentHashMap) {
            topic = this.HATopicToTrueTopic.get(topic);
            this.queueManager.removeQueue(topic);
            Site[] sites = null;
            ConcurrentHashMap<String, Site[]> concurrentHashMap2 = this.trueTopicToSites;
            synchronized (concurrentHashMap2) {
                sites = this.trueTopicToSites.get(topic);
            }
            if (sites == null || sites.length == 0) {
                return false;
            }
            if (sites.length == 1 && !sites[0].reconnect) {
                return false;
            }
            Site site = this.getActiveSite(sites);
            if (site != null) {
                if (!this.doReconnect(site)) {
                    this.waitReconnectTopic.add(topic);
                    return false;
                }
                this.waitReconnectTopic.remove(topic);
                return true;
            }
            return false;
        }
    }

    private Site getActiveSite(Site[] sites) {
        int siteId = 0;
        int siteNum = sites.length;
        while (true) {
            Site site = sites[siteId];
            siteId = (siteId + 1) % siteNum;
            DBConnection conn = new DBConnection();
            conn.connect(site.host, site.port);
            try {
                conn.run("1");
                Site site2 = site;
                conn.close();
                return site2;
            }
            catch (IOException ioex) {
                try {
                    try {
                        throw ioex;
                    }
                    catch (Throwable throwable) {
                        conn.close();
                        throw throwable;
                    }
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                    try {
                        Thread.sleep(500L);
                        continue;
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        continue;
                    }
                }
            }
            break;
        }
    }

    private int getVersionNumber(String ver) {
        try {
            String[] s = ver.split(" ");
            if (s.length >= 2) {
                String vernum = s[0].replace(".", "");
                return Integer.parseInt(vernum);
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        return 0;
    }

    @Override
    public void activeCloseConnection(Site site) {
        try {
            DBConnection conn = new DBConnection();
            conn.connect(site.host, site.port);
            try {
                ArrayList<Entity> params = new ArrayList<Entity>();
                String actionName = site.actionName;
                String tableName = site.tableName;
                params.add(new BasicString(actionName));
                params.add(new BasicString(tableName));
                BasicString version = (BasicString)conn.run("version()");
                int verNum = this.getVersionNumber(version.getString());
                if (verNum >= 995) {
                    params.add(new BasicBoolean(true));
                }
                conn.run("activeClosePublishConnection", params);
                log.info("Successfully closed publish connection");
            }
            catch (IOException ioex) {
                throw ioex;
            }
            finally {
                conn.close();
            }
        }
        catch (Exception ex) {
            ex.printStackTrace();
            log.error("Unable to actively close the publish connection from site " + site.host + ":" + site.port);
        }
        try {
            Thread.sleep(1000L);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public AbstractClient() throws SocketException {
        this(8849);
    }

    public AbstractClient(int subscribePort) throws SocketException {
        this.listeningPort = subscribePort;
    }

    public AbstractClient(String subscribeHost, int subscribePort) throws SocketException {
        this.listeningHost = subscribeHost;
        this.listeningPort = subscribePort;
    }

    private void addMessageToCache(IMessage msg) {
        String[] topics;
        String topicString = msg.getTopic();
        for (String topic : topics = topicString.split(",")) {
            List<IMessage> cache = this.messageCache.get(topic = this.HATopicToTrueTopic.get(topic));
            if (cache == null) {
                cache = new ArrayList<IMessage>();
                this.messageCache.put(topic, cache);
            }
            cache.add(msg);
        }
    }

    private void flushToQueue() {
        Set keySet = this.messageCache.keySet();
        for (String topic : keySet) {
            try {
                BlockingQueue<List<IMessage>> q = this.queueManager.getQueue(topic);
                if (q == null) continue;
                q.put(this.messageCache.get(topic));
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.messageCache.clear();
    }

    @Override
    public synchronized void dispatch(IMessage msg) {
        String[] topics;
        String topicString = msg.getTopic();
        for (String topic : topics = topicString.split(",")) {
            topic = this.HATopicToTrueTopic.get(topic);
            BlockingQueue<List<IMessage>> queue = this.queueManager.getQueue(topic);
            try {
                if (queue == null) continue;
                queue.put(Arrays.asList(msg));
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public synchronized void batchDispatch(List<IMessage> messags) {
        for (int i = 0; i < messags.size(); ++i) {
            this.addMessageToCache(messags.get(i));
        }
        this.flushToQueue();
    }

    @Override
    public boolean isRemoteLittleEndian(String host) {
        if (this.hostEndian.containsKey(host)) {
            return this.hostEndian.get(host);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized boolean isClosed(String topic) {
        topic = this.HATopicToTrueTopic.get(topic);
        ConcurrentHashMap<String, Site[]> concurrentHashMap = this.trueTopicToSites;
        synchronized (concurrentHashMap) {
            Site[] sites = this.trueTopicToSites.get(topic);
            if (sites == null || sites.length == 0) {
                return true;
            }
            return sites[0].closed;
        }
    }

    private String getTopic(String host, int port, String alias, String tableName, String actionName) {
        return String.format("%s:%d:%s/%s/%s", host, port, alias, tableName, actionName);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, "", "", false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable) throws IOException, RuntimeException {
        String topic;
        block41: {
            this.checkServerVersion(host, port);
            topic = "";
            List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
            List<String> usr = Arrays.asList(userName, passWord);
            this.users.put(tp, usr);
            DBConnection dbConn = this.listeningPort > 0 ? new DBConnection() : DBConnection.internalCreateEnableReverseStreamingDBConnection(false, false, false, false, false, SqlStdEnum.DolphinDB);
            if (!userName.equals("")) {
                dbConn.connect(host, port, userName, passWord);
            } else {
                dbConn.connect(host, port);
            }
            if (deserializer != null && !deserializer.isInited()) {
                deserializer.init(dbConn);
            }
            if (deserializer != null) {
                BasicDictionary schema = (BasicDictionary)dbConn.run(tableName + ".schema()");
                deserializer.checkSchema(schema);
            }
            try {
                String localIP = this.listeningHost;
                if (localIP.equals("")) {
                    localIP = dbConn.getLocalAddress().getHostAddress();
                }
                if (!this.hostEndian.containsKey(host)) {
                    this.hostEndian.put(host, dbConn.getRemoteLittleEndian());
                }
                ArrayList<Entity> params = new ArrayList<Entity>();
                params.add(new BasicString(tableName));
                params.add(new BasicString(actionName));
                Entity re = dbConn.run("getSubscriptionTopic", params);
                topic = ((BasicAnyVector)re).getEntity(0).getString();
                params.clear();
                params.add(new BasicString(localIP));
                params.add(new BasicInt(this.listeningPort));
                params.add(new BasicString(tableName));
                params.add(new BasicString(actionName));
                params.add(new BasicLong(offset));
                if (filter != null) {
                    params.add(filter);
                } else {
                    params.add(new Void());
                }
                if (allowExistTopic) {
                    params.add(new BasicBoolean(allowExistTopic));
                }
                re = dbConn.run("publishTable", params);
                this.connList.add(dbConn);
                if (re instanceof BasicAnyVector) {
                    BasicStringVector HASiteStrings = (BasicStringVector)((BasicAnyVector)re).getEntity(1);
                    int HASiteNum = HASiteStrings.rows();
                    Site[] sites = new Site[HASiteNum];
                    for (int i = 0; i < HASiteNum; ++i) {
                        String HASite = HASiteStrings.getString(i);
                        String[] HASiteHostAndPort = HASite.split(":");
                        String HASiteHost = HASiteHostAndPort[0];
                        int HASitePort = new Integer(HASiteHostAndPort[1]);
                        String HASiteAlias = HASiteHostAndPort[2];
                        sites[i] = new Site(HASiteHost, HASitePort, tableName, actionName, handler, offset - 1L, true, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable);
                        if (!reconnect) {
                            sites[i].closed = true;
                        }
                        HashMap<String, String> hashMap = this.tableNameToTrueTopic;
                        synchronized (hashMap) {
                            this.tableNameToTrueTopic.put(HASiteHost + ":" + HASitePort + "/" + tableName + "/" + actionName, topic);
                        }
                        String HATopic = this.getTopic(HASiteHost, HASitePort, HASiteAlias, tableName, actionName);
                        HashMap<String, String> hashMap2 = this.HATopicToTrueTopic;
                        synchronized (hashMap2) {
                            this.HATopicToTrueTopic.put(HATopic, topic);
                            continue;
                        }
                    }
                    if (this.subInfos_.containsKey(topic)) {
                        throw new RuntimeException("Subscription with topic " + topic + " exist. ");
                    }
                    this.subInfos_.put(topic, deserializer);
                    ConcurrentHashMap<String, Site[]> concurrentHashMap = this.trueTopicToSites;
                    synchronized (concurrentHashMap) {
                        this.trueTopicToSites.put(topic, sites);
                        break block41;
                    }
                }
                Site[] sites = new Site[]{new Site(host, port, tableName, actionName, handler, offset - 1L, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable)};
                if (!reconnect) {
                    sites[0].closed = true;
                }
                Map<String, StreamDeserializer> map = this.subInfos_;
                synchronized (map) {
                    this.subInfos_.put(topic, deserializer);
                }
                map = this.tableNameToTrueTopic;
                synchronized (map) {
                    this.tableNameToTrueTopic.put(host + ":" + port + "/" + tableName + "/" + actionName, topic);
                }
                map = this.HATopicToTrueTopic;
                synchronized (map) {
                    this.HATopicToTrueTopic.put(topic, topic);
                }
                map = this.trueTopicToSites;
                synchronized (map) {
                    this.trueTopicToSites.put(topic, sites);
                }
            }
            catch (Exception ex) {
                throw ex;
            }
            finally {
                if (this.listeningPort > 0) {
                    dbConn.close();
                }
            }
        }
        BlockingQueue<List<IMessage>> queue = this.queueManager.addQueue(topic);
        return queue;
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset, boolean reconnect) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, null, offset, reconnect, null, null, false);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME, offset, false);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, offset, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
        DBConnection dbConn = new DBConnection();
        List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
        List<String> usr = this.users.get(tp);
        String user = usr.get(0);
        String pwd = usr.get(1);
        if (!user.equals("")) {
            dbConn.connect(host, port, user, pwd);
        } else {
            dbConn.connect(host, port);
        }
        try {
            String localIP = this.listeningHost;
            if (localIP.equals("")) {
                localIP = dbConn.getLocalAddress().getHostAddress();
            }
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new BasicString(localIP));
            params.add(new BasicInt(this.listeningPort));
            params.add(new BasicString(tableName));
            params.add(new BasicString(actionName));
            dbConn.run("stopPublishTable", params);
            String topic = null;
            String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
            AbstractMap abstractMap = this.tableNameToTrueTopic;
            synchronized (abstractMap) {
                topic = this.tableNameToTrueTopic.get(fullTableName);
            }
            abstractMap = this.trueTopicToSites;
            synchronized (abstractMap) {
                Site[] sites = this.trueTopicToSites.get(topic);
                if (sites == null || sites.length == 0) {
                    // empty if block
                }
                for (int i = 0; i < sites.length; ++i) {
                    sites[i].closed = true;
                }
            }
            log.info("Successfully unsubscribed table " + fullTableName);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            dbConn.close();
        }
    }

    public void close() {
        if (this.pThread != null) {
            this.pThread.interrupt();
        }
        this.isClose_ = true;
    }

    @Override
    public boolean isClose() {
        return this.isClose_;
    }

    protected void unsubscribeInternal(String host, int port, String tableName) throws IOException {
        this.unsubscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkServerVersion(String host, int port) throws IOException {
        block10: {
            try (DBConnection conn = new DBConnection();){
                conn.connect(host, port);
                String version = conn.run("version()").getString();
                String[] _ = version.split(" ")[0].split("\\.");
                int v0 = Integer.parseInt(_[0]);
                int v1 = Integer.parseInt(_[1]);
                int v2 = Integer.parseInt(_[2]);
                if (v0 == 2 && v1 == 0 && v2 >= 9 || v0 == 2 && v1 == 10 || v0 == 3 && v1 == 0 && v2 >= 0) {
                    this.listeningPort = 0;
                } else if (this.listeningPort == 0) {
                    throw new IOException("The server does not support subscription through reverse connection (connection initiated by the subscriber). Specify a valid port parameter.");
                }
                if (this.daemon != null) break block10;
                LinkedBlockingQueue<DBConnection> linkedBlockingQueue = this.connList;
                synchronized (linkedBlockingQueue) {
                    if (this.daemon == null) {
                        this.daemon = new Daemon(this.listeningPort, this, this.connList);
                        this.pThread = new Thread(this.daemon);
                        this.daemon.setRunningThread(this.pThread);
                        this.pThread.start();
                    }
                }
            }
        }
    }

    @Override
    public ConcurrentHashMap<String, Site[]> getTopicToSites() {
        return this.trueTopicToSites;
    }

    class ReconnectItem {
        private int reconnectState;
        private long lastReconnectTimestamp;
        private List<String> topics;

        public ReconnectItem(int v, long t) {
            this.reconnectState = v;
            this.lastReconnectTimestamp = t;
            this.topics = new ArrayList<String>();
        }

        public void setState(int v) {
            this.reconnectState = v;
        }

        public int getState() {
            return this.reconnectState;
        }

        public void setTimestamp(long v) {
            this.lastReconnectTimestamp = v;
        }

        public long getTimestamp() {
            return this.lastReconnectTimestamp;
        }

        public void putTopic(String topic) {
            if (this.topics == null) {
                this.topics = new ArrayList<String>();
                this.topics.add(topic);
            } else if (!this.topics.contains(this.topics)) {
                this.topics.add(topic);
            }
        }

        public List<String> getTopics() {
            return this.topics;
        }
    }
}

