/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.data.BasicAnyVector;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicLong;
import com.xxdb.data.BasicString;
import com.xxdb.data.Entity;
import com.xxdb.data.Vector;
import com.xxdb.streaming.client.Daemon;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageDispatcher;
import com.xxdb.streaming.client.MessageHandler;
import com.xxdb.streaming.client.QueueManager;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

abstract class AbstractClient
implements MessageDispatcher {
    protected static final int DEFAULT_PORT = 8849;
    protected static final String DEFAULT_HOST = "localhost";
    protected static final String DEFAULT_ACTION_NAME = "javaStreamingApi";
    protected int listeningPort;
    protected QueueManager queueManager = new QueueManager();
    protected HashMap<String, List<IMessage>> messageCache = new HashMap();
    protected HashMap<String, String> tableNameToTopic = new HashMap();
    protected HashMap<String, Boolean> hostEndian = new HashMap();
    protected Thread pThread;
    protected HashMap<String, Site> topicToSite = new HashMap();

    protected abstract void doReconnect(Site var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setMsgId(String topic, long msgId) {
        HashMap<String, Site> hashMap = this.topicToSite;
        synchronized (hashMap) {
            Site site = this.topicToSite.get(topic);
            if (site != null) {
                site.msgId = msgId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void tryReconnect(String topic) {
        System.out.println("Trigger reconnect");
        this.queueManager.removeQueue(topic);
        Site site = null;
        HashMap<String, Site> hashMap = this.topicToSite;
        synchronized (hashMap) {
            site = this.topicToSite.get(topic);
        }
        if (site == null || !site.reconnect) {
            return;
        }
        this.activeCloseConnection(site);
        this.doReconnect(site);
    }

    private void activeCloseConnection(Site site) {
        while (true) {
            try {
                DBConnection conn = new DBConnection();
                conn.connect(site.host, site.port);
                try {
                    String localIP = conn.getLocalAddress().getHostAddress();
                    ArrayList<Entity> params = new ArrayList<Entity>();
                    params.add(new BasicString(localIP));
                    params.add(new BasicInt(this.listeningPort));
                    conn.run("activeClosePublishConnection", params);
                }
                catch (IOException ioex) {
                    throw ioex;
                }
                finally {
                    conn.close();
                }
                return;
            }
            catch (Exception ex) {
                System.out.println("Unable to actively close the publish connection from site " + site.host + ":" + site.port);
                try {
                    Thread.sleep(1000L);
                }
                catch (Exception exception) {
                }
                continue;
            }
            break;
        }
    }

    public AbstractClient() throws SocketException {
        this(8849);
    }

    public AbstractClient(int subscribePort) throws SocketException {
        this.listeningPort = subscribePort;
        Daemon daemon = new Daemon(subscribePort, this);
        this.pThread = new Thread(daemon);
        this.pThread.start();
    }

    private void addMessageToCache(IMessage msg) {
        String topic = msg.getTopic();
        List<IMessage> cache = this.messageCache.get(topic);
        if (cache == null) {
            cache = new ArrayList<IMessage>();
            this.messageCache.put(msg.getTopic(), cache);
        }
        cache.add(msg);
    }

    private void flushToQueue() {
        Set<String> keySet = this.messageCache.keySet();
        for (String topic : keySet) {
            try {
                this.queueManager.getQueue(topic).put(this.messageCache.get(topic));
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.messageCache.clear();
    }

    @Override
    public void dispatch(IMessage msg) {
        BlockingQueue<List<IMessage>> queue = this.queueManager.getQueue(msg.getTopic());
        try {
            queue.put(Arrays.asList(msg));
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void batchDispatch(List<IMessage> messags) {
        for (int i = 0; i < messags.size(); ++i) {
            this.addMessageToCache(messags.get(i));
        }
        this.flushToQueue();
    }

    @Override
    public boolean isRemoteLittleEndian(String host) {
        if (this.hostEndian.containsKey(host)) {
            return this.hostEndian.get(host);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isClosed(String topic) {
        HashMap<String, Site> hashMap = this.topicToSite;
        synchronized (hashMap) {
            Site site = this.topicToSite.get(topic);
            if (site != null) {
                return site.closed;
            }
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter) throws IOException, RuntimeException {
        String topic = "";
        DBConnection dbConn = new DBConnection();
        dbConn.connect(host, port);
        try {
            String localIP = dbConn.getLocalAddress().getHostAddress();
            if (!this.hostEndian.containsKey(host)) {
                this.hostEndian.put(host, dbConn.getRemoteLittleEndian());
            }
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new BasicString(tableName));
            params.add(new BasicString(actionName));
            Entity re = dbConn.run("getSubscriptionTopic", params);
            topic = ((BasicAnyVector)re).getEntity(0).getString();
            params.clear();
            HashMap<String, Object> hashMap = this.tableNameToTopic;
            synchronized (hashMap) {
                this.tableNameToTopic.put(host + ":" + port + ":" + tableName, topic);
            }
            hashMap = this.topicToSite;
            synchronized (hashMap) {
                this.topicToSite.put(topic, new Site(host, port, tableName, actionName, handler, offset - 1L, reconnect, filter));
            }
            params.add(new BasicString(localIP));
            params.add(new BasicInt(this.listeningPort));
            params.add(new BasicString(tableName));
            params.add(new BasicString(actionName));
            params.add(new BasicLong(offset));
            if (filter != null) {
                params.add(filter);
            }
            re = dbConn.run("publishTable", params);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            dbConn.close();
        }
        BlockingQueue<List<IMessage>> queue = this.queueManager.addQueue(topic);
        return queue;
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset, boolean reconnect) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, null, offset, reconnect, null);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME, offset, false);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, actionName, offset, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
        DBConnection dbConn = new DBConnection();
        dbConn.connect(host, port);
        try {
            String localIP = dbConn.getLocalAddress().getHostAddress();
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new BasicString(localIP));
            params.add(new BasicInt(this.listeningPort));
            params.add(new BasicString(tableName));
            params.add(new BasicString(actionName));
            dbConn.run("stopPublishTable", params);
            String topic = null;
            String fullTableName = host + ":" + port + ":" + tableName;
            HashMap<String, Object> hashMap = this.tableNameToTopic;
            synchronized (hashMap) {
                topic = this.tableNameToTopic.get(fullTableName);
            }
            hashMap = this.topicToSite;
            synchronized (hashMap) {
                Site site = this.topicToSite.get(topic);
                if (site != null) {
                    site.closed = true;
                }
            }
            System.out.println("Successfully unsubscribed table " + fullTableName);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            dbConn.close();
        }
    }

    protected void unsubscribeInternal(String host, int port, String tableName) throws IOException {
        this.unsubscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME);
    }

    protected class Site {
        String host;
        int port;
        String tableName;
        String actionName;
        MessageHandler handler;
        long msgId;
        boolean reconnect;
        Vector filter = null;
        boolean closed = false;

        Site(String host, int port, String tableName, String actionName, MessageHandler handler, long msgId, boolean reconnect, Vector filter) {
            this.host = host;
            this.port = port;
            this.tableName = tableName;
            this.actionName = actionName;
            this.handler = handler;
            this.msgId = msgId;
            this.reconnect = reconnect;
            this.filter = filter;
        }
    }
}

