/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.data.BasicAnyVector;
import com.xxdb.data.BasicBoolean;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicLong;
import com.xxdb.data.BasicString;
import com.xxdb.data.BasicStringVector;
import com.xxdb.data.Entity;
import com.xxdb.data.Vector;
import com.xxdb.data.Void;
import com.xxdb.streaming.client.Daemon;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageDispatcher;
import com.xxdb.streaming.client.MessageHandler;
import com.xxdb.streaming.client.QueueManager;
import java.io.IOException;
import java.net.SocketException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

abstract class AbstractClient
implements MessageDispatcher {
    protected static final int DEFAULT_PORT = 8849;
    protected static final String DEFAULT_HOST = "localhost";
    protected static final String DEFAULT_ACTION_NAME = "javaStreamingApi";
    protected ConcurrentHashMap<String, ReconnectItem> reconnectTable = new ConcurrentHashMap();
    protected int listeningPort;
    protected String listeningHost = "";
    protected QueueManager queueManager = new QueueManager();
    protected ConcurrentHashMap<String, List<IMessage>> messageCache = new ConcurrentHashMap();
    protected HashMap<String, String> tableNameToTrueTopic = new HashMap();
    protected HashMap<String, String> HATopicToTrueTopic = new HashMap();
    protected HashMap<String, Boolean> hostEndian = new HashMap();
    protected Thread pThread;
    protected ConcurrentHashMap<String, Site[]> trueTopicToSites = new ConcurrentHashMap();
    protected CopyOnWriteArraySet<String> waitReconnectTopic = new CopyOnWriteArraySet();

    @Override
    public void setNeedReconnect(String topic, int v) {
        if (topic.equals("")) {
            return;
        }
        String site = topic.substring(0, topic.indexOf("/"));
        if (!this.reconnectTable.contains(site)) {
            ReconnectItem item = new ReconnectItem(v, System.currentTimeMillis());
            item.putTopic(topic);
            this.reconnectTable.put(site, item);
        } else {
            ReconnectItem item = this.reconnectTable.get(site);
            item.setState(v);
            item.setTimestamp(System.currentTimeMillis());
            item.putTopic(topic);
        }
    }

    @Override
    public int getNeedReconnect(String site) {
        ReconnectItem item = this.reconnectTable.get(site);
        if (item != null) {
            return item.getState();
        }
        return 0;
    }

    @Override
    public long getReconnectTimestamp(String site) {
        ReconnectItem item = this.reconnectTable.get(site);
        if (item != null) {
            return item.getTimestamp();
        }
        return 0L;
    }

    @Override
    public void setReconnectTimestamp(String site, long v) {
        ReconnectItem item = this.reconnectTable.get(site);
        if (item != null) {
            item.setTimestamp(v);
        }
    }

    @Override
    public List<String> getAllTopicsBySite(String site) {
        ArrayList<String> re = new ArrayList<String>();
        for (String topic : this.trueTopicToSites.keySet()) {
            String s = topic.substring(0, topic.indexOf("/"));
            if (!s.equals(site)) continue;
            re.add(topic);
        }
        return re;
    }

    @Override
    public Set<String> getAllReconnectTopic() {
        return this.waitReconnectTopic;
    }

    @Override
    public List<String> getAllReconnectSites() {
        ArrayList<String> re = new ArrayList<String>();
        for (String site : this.reconnectTable.keySet()) {
            ReconnectItem item = this.reconnectTable.get(site);
            if (item.getState() <= 0) continue;
            re.add(site);
        }
        return re;
    }

    @Override
    public Site getSiteByName(String site) {
        Site[] sites;
        List<String> topics = this.getAllTopicsBySite(site);
        if (topics.size() > 0 && (sites = this.trueTopicToSites.get(topics.get(0))).length > 0) {
            return this.getActiveSite(sites);
        }
        return null;
    }

    protected abstract boolean doReconnect(Site var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setMsgId(String topic, long msgId) {
        ConcurrentHashMap<String, Site[]> concurrentHashMap = this.trueTopicToSites;
        synchronized (concurrentHashMap) {
            Site[] sites = this.trueTopicToSites.get(topic);
            if (sites == null || sites.length == 0) {
                return;
            }
            if (sites.length == 1) {
                sites[0].msgId = msgId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean tryReconnect(String topic) {
        ConcurrentHashMap<String, ReconnectItem> concurrentHashMap = this.reconnectTable;
        synchronized (concurrentHashMap) {
            topic = this.HATopicToTrueTopic.get(topic);
            this.queueManager.removeQueue(topic);
            Site[] sites = null;
            ConcurrentHashMap<String, Site[]> concurrentHashMap2 = this.trueTopicToSites;
            synchronized (concurrentHashMap2) {
                sites = this.trueTopicToSites.get(topic);
            }
            if (sites == null || sites.length == 0) {
                return false;
            }
            if (sites.length == 1 && !sites[0].reconnect) {
                return false;
            }
            Site site = this.getActiveSite(sites);
            if (site != null) {
                if (!this.doReconnect(site)) {
                    this.waitReconnectTopic.add(topic);
                    return false;
                }
                this.waitReconnectTopic.remove(topic);
                return true;
            }
            return false;
        }
    }

    private Site getActiveSite(Site[] sites) {
        int siteId = 0;
        int siteNum = sites.length;
        while (true) {
            Site site = sites[siteId];
            siteId = (siteId + 1) % siteNum;
            DBConnection conn = new DBConnection();
            conn.connect(site.host, site.port);
            try {
                conn.run("1");
                Site site2 = site;
                conn.close();
                return site2;
            }
            catch (IOException ioex) {
                try {
                    try {
                        throw ioex;
                    }
                    catch (Throwable throwable) {
                        conn.close();
                        throw throwable;
                    }
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                    try {
                        Thread.sleep(500L);
                        continue;
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        continue;
                    }
                }
            }
            break;
        }
    }

    private int getVersionNumber(String ver) {
        try {
            String[] s = ver.split(" ");
            if (s.length >= 2) {
                String vernum = s[0].replace(".", "");
                return Integer.parseInt(vernum);
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        return 0;
    }

    @Override
    public void activeCloseConnection(Site site) {
        try {
            DBConnection conn = new DBConnection();
            conn.connect(site.host, site.port);
            try {
                BasicString version = (BasicString)conn.run("version()");
                int verNum = this.getVersionNumber(version.getString());
                String localIP = this.listeningHost;
                if (localIP.equals("")) {
                    localIP = conn.getLocalAddress().getHostAddress();
                }
                ArrayList<Entity> params = new ArrayList<Entity>();
                params.add(new BasicString(localIP));
                params.add(new BasicInt(this.listeningPort));
                if (verNum >= 995) {
                    params.add(new BasicBoolean(true));
                }
                conn.run("activeClosePublishConnection", params);
                System.out.println("Successfully closed publish connection");
            }
            catch (IOException ioex) {
                throw ioex;
            }
            finally {
                conn.close();
            }
        }
        catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("Unable to actively close the publish connection from site " + site.host + ":" + site.port);
        }
        try {
            Thread.sleep(1000L);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public AbstractClient() throws SocketException {
        this(8849);
    }

    public AbstractClient(int subscribePort) throws SocketException {
        this.listeningPort = subscribePort;
        Daemon daemon = new Daemon(subscribePort, this);
        this.pThread = new Thread(daemon);
        this.pThread.start();
    }

    public AbstractClient(String subsribeHost, int subscribePort) throws SocketException {
        this.listeningHost = subsribeHost;
        this.listeningPort = subscribePort;
        Daemon daemon = new Daemon(subscribePort, this);
        this.pThread = new Thread(daemon);
        this.pThread.start();
    }

    private void addMessageToCache(IMessage msg) {
        String[] topics;
        String topicString = msg.getTopic();
        for (String topic : topics = topicString.split(",")) {
            List<IMessage> cache = this.messageCache.get(topic = this.HATopicToTrueTopic.get(topic));
            if (cache == null) {
                cache = new ArrayList<IMessage>();
                this.messageCache.put(topic, cache);
            }
            cache.add(msg);
        }
    }

    private void flushToQueue() {
        Set keySet = this.messageCache.keySet();
        for (String topic : keySet) {
            try {
                BlockingQueue<List<IMessage>> q = this.queueManager.getQueue(topic);
                if (q == null) continue;
                q.put(this.messageCache.get(topic));
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.messageCache.clear();
    }

    @Override
    public synchronized void dispatch(IMessage msg) {
        String[] topics;
        String topicString = msg.getTopic();
        for (String topic : topics = topicString.split(",")) {
            topic = this.HATopicToTrueTopic.get(topic);
            BlockingQueue<List<IMessage>> queue = this.queueManager.getQueue(topic);
            try {
                if (queue == null) continue;
                queue.put(Arrays.asList(msg));
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public synchronized void batchDispatch(List<IMessage> messags) {
        for (int i = 0; i < messags.size(); ++i) {
            this.addMessageToCache(messags.get(i));
        }
        this.flushToQueue();
    }

    @Override
    public boolean isRemoteLittleEndian(String host) {
        if (this.hostEndian.containsKey(host)) {
            return this.hostEndian.get(host);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized boolean isClosed(String topic) {
        topic = this.HATopicToTrueTopic.get(topic);
        ConcurrentHashMap<String, Site[]> concurrentHashMap = this.trueTopicToSites;
        synchronized (concurrentHashMap) {
            Site[] sites = this.trueTopicToSites.get(topic);
            if (sites == null || sites.length == 0) {
                return true;
            }
            return sites[0].closed;
        }
    }

    private String getTopic(String host, int port, String alias, String tableName, String actionName) {
        return String.format("%s:%d:%s/%s/%s", host, port, alias, tableName, actionName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic) throws IOException, RuntimeException {
        String topic;
        block30: {
            topic = "";
            DBConnection dbConn = new DBConnection();
            dbConn.connect(host, port);
            try {
                String localIP = this.listeningHost;
                if (localIP.equals("")) {
                    localIP = dbConn.getLocalAddress().getHostAddress();
                }
                if (!this.hostEndian.containsKey(host)) {
                    this.hostEndian.put(host, dbConn.getRemoteLittleEndian());
                }
                ArrayList<Entity> params = new ArrayList<Entity>();
                params.add(new BasicString(tableName));
                params.add(new BasicString(actionName));
                Entity re = dbConn.run("getSubscriptionTopic", params);
                topic = ((BasicAnyVector)re).getEntity(0).getString();
                params.clear();
                params.add(new BasicString(localIP));
                params.add(new BasicInt(this.listeningPort));
                params.add(new BasicString(tableName));
                params.add(new BasicString(actionName));
                params.add(new BasicLong(offset));
                if (filter != null) {
                    params.add(filter);
                } else {
                    params.add(new Void());
                }
                if (allowExistTopic) {
                    params.add(new BasicBoolean(allowExistTopic));
                }
                if ((re = dbConn.run("publishTable", params)) instanceof BasicAnyVector) {
                    BasicStringVector HASiteStrings = (BasicStringVector)((BasicAnyVector)re).getEntity(1);
                    int HASiteNum = HASiteStrings.rows();
                    Site[] sites = new Site[HASiteNum];
                    for (int i = 0; i < HASiteNum; ++i) {
                        String HASite = HASiteStrings.getString(i);
                        String[] HASiteHostAndPort = HASite.split(":");
                        String HASiteHost = HASiteHostAndPort[0];
                        int HASitePort = new Integer(HASiteHostAndPort[1]);
                        String HASiteAlias = HASiteHostAndPort[2];
                        sites[i] = new Site(HASiteHost, HASitePort, tableName, actionName, handler, offset - 1L, true, filter, allowExistTopic);
                        HashMap<String, String> hashMap = this.tableNameToTrueTopic;
                        synchronized (hashMap) {
                            this.tableNameToTrueTopic.put(HASiteHost + ":" + HASitePort + ":" + tableName, topic);
                        }
                        String HATopic = this.getTopic(HASiteHost, HASitePort, HASiteAlias, tableName, actionName);
                        HashMap<String, String> hashMap2 = this.HATopicToTrueTopic;
                        synchronized (hashMap2) {
                            this.HATopicToTrueTopic.put(HATopic, topic);
                            continue;
                        }
                    }
                    ConcurrentHashMap<String, Site[]> concurrentHashMap = this.trueTopicToSites;
                    synchronized (concurrentHashMap) {
                        this.trueTopicToSites.put(topic, sites);
                        break block30;
                    }
                }
                Site[] sites = new Site[]{new Site(host, port, tableName, actionName, handler, offset - 1L, reconnect, filter, allowExistTopic)};
                AbstractMap abstractMap = this.tableNameToTrueTopic;
                synchronized (abstractMap) {
                    this.tableNameToTrueTopic.put(host + ":" + port + ":" + tableName, topic);
                }
                abstractMap = this.HATopicToTrueTopic;
                synchronized (abstractMap) {
                    this.HATopicToTrueTopic.put(topic, topic);
                }
                abstractMap = this.trueTopicToSites;
                synchronized (abstractMap) {
                    this.trueTopicToSites.put(topic, sites);
                }
            }
            catch (Exception ex) {
                throw ex;
            }
            finally {
                dbConn.close();
            }
        }
        BlockingQueue<List<IMessage>> queue = this.queueManager.addQueue(topic);
        return queue;
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset, boolean reconnect) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, null, offset, reconnect, null, false);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME, offset, false);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, offset, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
        DBConnection dbConn = new DBConnection();
        dbConn.connect(host, port);
        try {
            String localIP = this.listeningHost;
            if (localIP.equals("")) {
                localIP = dbConn.getLocalAddress().getHostAddress();
            }
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new BasicString(localIP));
            params.add(new BasicInt(this.listeningPort));
            params.add(new BasicString(tableName));
            params.add(new BasicString(actionName));
            dbConn.run("stopPublishTable", params);
            String topic = null;
            String fullTableName = host + ":" + port + ":" + tableName;
            AbstractMap abstractMap = this.tableNameToTrueTopic;
            synchronized (abstractMap) {
                topic = this.tableNameToTrueTopic.get(fullTableName);
            }
            abstractMap = this.trueTopicToSites;
            synchronized (abstractMap) {
                Site[] sites = this.trueTopicToSites.get(topic);
                if (sites == null || sites.length == 0) {
                    // empty if block
                }
                for (int i = 0; i < sites.length; ++i) {
                    sites[i].closed = true;
                }
            }
            System.out.println("Successfully unsubscribed table " + fullTableName);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            dbConn.close();
        }
    }

    protected void unsubscribeInternal(String host, int port, String tableName) throws IOException {
        this.unsubscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME);
    }

    public class Site {
        String host;
        int port;
        String tableName;
        String actionName;
        MessageHandler handler;
        long msgId;
        boolean reconnect;
        Vector filter = null;
        boolean closed = false;
        boolean allowExistTopic = false;

        Site(String host, int port, String tableName, String actionName, MessageHandler handler, long msgId, boolean reconnect, Vector filter, boolean allowExistTopic) {
            this.host = host;
            this.port = port;
            this.tableName = tableName;
            this.actionName = actionName;
            this.handler = handler;
            this.msgId = msgId;
            this.reconnect = reconnect;
            this.filter = filter;
            this.allowExistTopic = allowExistTopic;
        }
    }

    class ReconnectItem {
        private int reconnectState;
        private long lastReconnectTimestamp;
        private List<String> topics;

        public ReconnectItem(int v, long t) {
            this.reconnectState = v;
            this.lastReconnectTimestamp = t;
            this.topics = new ArrayList<String>();
        }

        public void setState(int v) {
            this.reconnectState = v;
        }

        public int getState() {
            return this.reconnectState;
        }

        public void setTimestamp(long v) {
            this.lastReconnectTimestamp = v;
        }

        public long getTimestamp() {
            return this.lastReconnectTimestamp;
        }

        public void putTopic(String topic) {
            if (this.topics == null) {
                this.topics = new ArrayList<String>();
                this.topics.add(topic);
            } else if (!this.topics.contains(this.topics)) {
                this.topics.add(topic);
            }
        }

        public List<String> getTopics() {
            return this.topics;
        }
    }
}

