/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.data.BasicAnyVector;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicLong;
import com.xxdb.data.BasicString;
import com.xxdb.data.Entity;
import com.xxdb.streaming.client.Daemon;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageDispatcher;
import com.xxdb.streaming.client.QueueManager;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

abstract class AbstractClient
implements MessageDispatcher {
    protected static final int DEFAULT_PORT = 8849;
    protected static final String DEFAULT_HOST = "localhost";
    protected static final String DEFAULT_ACTION_NAME = "javaStreamingApi";
    protected String localIP;
    protected int listeningPort;
    protected QueueManager queueManager = new QueueManager();
    protected HashMap<String, List<IMessage>> messageCache = new HashMap();
    protected HashMap<String, String> tableName2Topic = new HashMap();
    protected HashMap<String, Boolean> hostEndian = new HashMap();
    protected Thread pThread;

    public AbstractClient() throws SocketException {
        this(8849);
    }

    public AbstractClient(int subscribePort) throws SocketException {
        this.listeningPort = subscribePort;
        Daemon daemon = new Daemon(subscribePort, this);
        this.pThread = new Thread(daemon);
        this.pThread.start();
    }

    private void addMessageToCache(IMessage msg) {
        String topic = msg.getTopic();
        List<IMessage> cache = this.messageCache.get(topic);
        if (cache == null) {
            cache = new ArrayList<IMessage>();
            this.messageCache.put(msg.getTopic(), cache);
        }
        cache.add(msg);
    }

    private void flushToQueue() {
        Set<String> keySet = this.messageCache.keySet();
        for (String topic : keySet) {
            try {
                this.queueManager.getQueue(topic).put(this.messageCache.get(topic));
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.messageCache.clear();
    }

    @Override
    public void dispatch(IMessage msg) {
        BlockingQueue<List<IMessage>> queue = this.queueManager.getQueue(msg.getTopic());
        try {
            queue.put(Arrays.asList(msg));
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void batchDispatch(List<IMessage> messags) {
        for (int i = 0; i < messags.size(); ++i) {
            this.addMessageToCache(messags.get(i));
        }
        this.flushToQueue();
    }

    @Override
    public boolean isRemoteLittleEndian(String host) {
        if (this.hostEndian.containsKey(host)) {
            return this.hostEndian.get(host);
        }
        return false;
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, String actionName, long offset) throws IOException, RuntimeException {
        String topic = "";
        DBConnection dbConn = new DBConnection();
        dbConn.connect(host, port);
        this.localIP = dbConn.getLocalAddress().getHostAddress();
        if (!this.hostEndian.containsKey(host)) {
            this.hostEndian.put(host, dbConn.getRemoteLittleEndian());
        }
        ArrayList<Entity> params = new ArrayList<Entity>();
        params.add(new BasicString(tableName));
        params.add(new BasicString(actionName));
        Entity re = dbConn.run("getSubscriptionTopic", params);
        topic = ((BasicAnyVector)re).getEntity(0).getString();
        BlockingQueue<List<IMessage>> queue = this.queueManager.addQueue(topic);
        params.clear();
        this.tableName2Topic.put(host + ":" + port + ":" + tableName, topic);
        params.add(new BasicString(this.localIP));
        params.add(new BasicInt(this.listeningPort));
        params.add(new BasicString(tableName));
        params.add(new BasicString(actionName));
        if (offset != -1L) {
            params.add(new BasicLong(offset));
        }
        re = dbConn.run("publishTable", params);
        dbConn.close();
        return queue;
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port, String tableName, long offset) throws IOException, RuntimeException {
        return this.subscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME, offset);
    }

    protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
        DBConnection dbConn = new DBConnection();
        dbConn.connect(host, port);
        ArrayList<Entity> params = new ArrayList<Entity>();
        params.add(new BasicString(this.localIP));
        params.add(new BasicInt(this.listeningPort));
        params.add(new BasicString(tableName));
        params.add(new BasicString(actionName));
        dbConn.run("stopPublishTable", params);
        dbConn.close();
    }

    protected void unsubscribeInternal(String host, int port, String tableName) throws IOException {
        this.unsubscribeInternal(host, port, tableName, DEFAULT_ACTION_NAME);
    }
}

