/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.data.BasicAnyVector;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicLong;
import com.xxdb.data.BasicString;
import com.xxdb.data.BasicStringVector;
import com.xxdb.data.Entity;
import com.xxdb.data.Vector;
import com.xxdb.streaming.client.Daemon;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageDispatcher;
import com.xxdb.streaming.client.MessageHandler;
import com.xxdb.streaming.client.QueueManager;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

abstract class AbstractClient
implements MessageDispatcher {
    protected static final int DEFAULT_PORT = 8849;
    protected static final String DEFAULT_HOST = "localhost";
    protected static final String DEFAULT_ACTION_NAME = "javaStreamingApi";
    protected int listeningPort;
    protected QueueManager queueManager = new QueueManager();
    protected HashMap<String, List<IMessage>> messageCache = new HashMap();
    protected HashMap<String, String> tableNameToTrueTopic = new HashMap();
    protected HashMap<String, String> HATopicToTrueTopic = new HashMap();
    protected HashMap<String, Boolean> hostEndian = new HashMap();
    protected Thread pThread;
    protected HashMap<String, Site[]> trueTopicToSites = new HashMap();

    protected abstract void doReconnect(Site var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setMsgId(String topic, long msgId) {
        HashMap<String, Site[]> hashMap = this.trueTopicToSites;
        synchronized (hashMap) {
            Site[] sites = this.trueTopicToSites.get(topic);
            if (sites == null || sites.length == 0) {
                return;
            }
            if (sites.length == 1) {
                sites[0].msgId = msgId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void tryReconnect(String topic) {
        System.out.println("Trigger reconnect");
        topic = this.HATopicToTrueTopic.get(topic);
        this.queueManager.removeQueue(topic);
        Site[] sites = null;
        HashMap<String, Site[]> hashMap = this.trueTopicToSites;
        synchronized (hashMap) {
            sites = this.trueTopicToSites.get(topic);
        }
        if (sites == null || sites.length == 0) {
            return;
        }
        if (sites.length == 1 && !sites[0].reconnect) {
            return;
        }
        Site site = this.activeCloseConnection(sites);
        this.doReconnect(site);
    }

    private Site activeCloseConnection(Site[] sites) {
        int siteId = 0;
        int siteNum = sites.length;
        while (true) {
            Site site = sites[siteId];
            siteId = (siteId + 1) % siteNum;
            DBConnection conn = new DBConnection();
            conn.connect(site.host, site.port);
            try {
                String localIP = conn.getLocalAddress().getHostAddress();
                ArrayList<Entity> params = new ArrayList<Entity>();
                params.add(new BasicString(localIP));
                params.add(new BasicInt(this.listeningPort));
                conn.run("activeClosePublishConnection", params);
                System.out.println("Successfully closed publish connection");
                Site site2 = site;
                conn.close();
                return site2;
            }
            catch (IOException ioex) {
                try {
                    try {
                        throw ioex;
                    }
                    catch (Throwable throwable) {
                        conn.close();
                        throw throwable;
                    }
                }
                catch (Exception ex) {
                    System.out.println("Unable to actively close the publish connection from site " + site.host + ":" + site.port);
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (Exception exception) {
                    }
                    continue;
                }
            }
            break;
        }
    }

    public AbstractClient() throws SocketException {
        this(8849);
    }

    public AbstractClient(int subscribePort) throws SocketException {
        this.listeningPort = subscribePort;
        Daemon daemon = new Daemon(subscribePort, this);
        this.pThread = new Thread(daemon);
        this.pThread.start();
    }

    private void addMessageToCache(IMessage msg) {
        String[] topics;
        String topicString = msg.getTopic();
        for (String topic : topics = topicString.split(",")) {
            List<IMessage> cache = this.messageCache.get(topic = this.HATopicToTrueTopic.get(topic));
            if (cache == null) {
                cache = new ArrayList<IMessage>();
                this.messageCache.put(topic, cache);
            }
            cache.add(msg);
        }
    }

    private void flushToQueue() {
        Set<String> keySet = this.messageCache.keySet();
        for (String topic : keySet) {
            try {
                this.queueManager.getQueue(topic).put(this.messageCache.get(topic));
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.messageCache.clear();
    }

    @Override
    public void dispatch(IMessage msg) {
        String[] topics;
        String topicString = msg.getTopic();
        for (String topic : topics = topicString.split(",")) {
            topic = this.HATopicToTrueTopic.get(topic);
            BlockingQueue<List<IMessage>> queue = this.queueManager.getQueue(topic);
            try {
                queue.put(Arrays.asList(msg));
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void batchDispatch(List<IMessage> messags) {
        for (int i = 0; i < messags.size(); ++i) {
            this.addMessageToCache(messags.get(i));
        }
        this.flushToQueue();
    }

    @Override
    public boolean isRemoteLittleEndian(String host) {
        if (this.hostEndian.containsKey(host)) {
            return this.hostEndian.get(host);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isClosed(String topic) {
        topic = this.HATopicToTrueTopic.get(topic);
        HashMap<String, Site[]> hashMap = this.trueTopicToSites;
        synchronized (hashMap) {
            Site[] sites = this.trueTopicToSites.get(topic);
            if (sites == null || sites.length == 0) {
                return true;
            }
            return sites[0].closed;
        }
    }

    private String getTopic(String host, int port, String alias, String tableName, String actionName) {
        return String.format("%s:%d:%s/%s/%s", host, port, alias, tableName, actionName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter) throws IOException, RuntimeException {
        String topic;
        block27: {
            topic = "";
            DBConnection dbConn = new DBConnection();
            dbConn.connect(host, port);
            try {
                String localIP = dbConn.getLocalAddress().getHostAddress();
                if (!this.hostEndian.containsKey(host)) {
                    this.hostEndian.put(host, dbConn.getRemoteLittleEndian());
                }
                ArrayList<Entity> params = new ArrayList<Entity>();
                params.add(new BasicString(tableName));
                params.add(new BasicString(actionName));
                Entity re = dbConn.run("getSubscriptionTopic", params);
                topic = ((BasicAnyVector)re).getEntity(0).getString();
                params.clear();
                params.add(new BasicString(localIP));
                params.add(new BasicInt(this.listeningPort));
                params.add(new BasicString(tableName));
                params.add(new BasicString(actionName));
                params.add(new BasicLong(offset));
                if (filter != null) {
                    params.add(filter);
                }
                if ((re = dbConn.run("publishTable", params)) instanceof BasicAnyVector) {
                    BasicStringVector HASiteStrings = (BasicStringVector)((BasicAnyVector)re).getEntity(1);
                    int HASiteNum = HASiteStrings.rows();
                    Site[] sites = new Site[HASiteNum];
                    for (int i = 0; i < HASiteNum; ++i) {
                        String HASite = HASiteStrings.getString(i);
                        String[] HASiteHostAndPort = HASite.split(":");
                        String HASiteHost = HASiteHostAndPort[0];
                        int HASitePort = new Integer(HASiteHostAndPort[1]);
                        String HASiteAlias = HASiteHostAndPort[2];
                        sites[i] = new Site(HASiteHost, HASitePort, tableName, actionName, handler, offset - 1L, true, filter);
                        HashMap<String, String> hashMap = this.tableNameToTrueTopic;
                        synchronized (hashMap) {
                            this.tableNameToTrueTopic.put(HASiteHost + ":" + HASitePort + ":" + tableName, topic);
                        }
                        String HATopic = this.getTopic(HASiteHost, HASitePort, HASiteAlias, tableName, actionName);
                        HashMap<String, String> hashMap2 = this.HATopicToTrueTopic;
                        synchronized (hashMap2) {
                            this.HATopicToTrueTopic.put(HATopic, topic);
                            continue;
                        }
                    }
                    HashMap<String, Site[]> hashMap = this.trueTopicToSites;
                    synchronized (hashMap) {
                        this.trueTopicToSites.put(topic, sites);
                        break block27;
                    }
                }
                Site[] sites = new Site[]{new Site(host, port, tableName, actionName, handler, offset - 1L, reconnect, filter)};
                HashMap<String, String> hashMap = this.tableNameToTrueTopic;
                synchronized (hashMap) {
                    this.tableNameToTrueTopic.put(host + ":" + port + ":" + tableName, topic);
                }
                hashMap = this.HATopicToTrueTopic;
                synchronized (hashMap) {
                    this.HATopicToTrueTopic.put(topic, topic);
                }
                hashMap = this.trueTopicToSites;
                synchronized (hashMap) {
                    this.trueTopicToSites.put(topic, sites);
                }
            }
            catch (Exception ex) {
                throw ex;
            }
            finally {
                dbConn.close();
            }
        }
        BlockingQueue<List<IMessage>> queue = this.queueManager.addQueue(topic);
        return queue;
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset, boolean reconnect) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, null, offset, reconnect, null);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME, offset, false);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, offset, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
        DBConnection dbConn = new DBConnection();
        dbConn.connect(host, port);
        try {
            String localIP = dbConn.getLocalAddress().getHostAddress();
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new BasicString(localIP));
            params.add(new BasicInt(this.listeningPort));
            params.add(new BasicString(tableName));
            params.add(new BasicString(actionName));
            dbConn.run("stopPublishTable", params);
            String topic = null;
            String fullTableName = host + ":" + port + ":" + tableName;
            HashMap<String, String> hashMap = this.tableNameToTrueTopic;
            synchronized (hashMap) {
                topic = this.tableNameToTrueTopic.get(fullTableName);
            }
            hashMap = this.trueTopicToSites;
            synchronized (hashMap) {
                Site[] sites = this.trueTopicToSites.get(topic);
                if (sites == null || sites.length == 0) {
                    // empty if block
                }
                for (int i = 0; i < sites.length; ++i) {
                    sites[i].closed = true;
                }
            }
            System.out.println("Successfully unsubscribed table " + fullTableName);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            dbConn.close();
        }
    }

    protected void unsubscribeInternal(String host, int port, String tableName) throws IOException {
        this.unsubscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME);
    }

    protected class Site {
        String host;
        int port;
        String tableName;
        String actionName;
        MessageHandler handler;
        long msgId;
        boolean reconnect;
        Vector filter = null;
        boolean closed = false;

        Site(String host, int port, String tableName, String actionName, MessageHandler handler, long msgId, boolean reconnect, Vector filter) {
            this.host = host;
            this.port = port;
            this.tableName = tableName;
            this.actionName = actionName;
            this.handler = handler;
            this.msgId = msgId;
            this.reconnect = reconnect;
            this.filter = filter;
        }
    }
}

