/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicString;
import com.xxdb.data.Entity;
import com.xxdb.data.Vector;
import com.xxdb.streaming.client.AbstractClient;
import com.xxdb.streaming.client.BatchMessageHandler;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageHandler;
import com.xxdb.streaming.client.StreamDeserializer;
import java.io.IOException;
import java.net.SocketException;
import java.text.SimpleDateFormat;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;

public class ThreadedClient
extends AbstractClient {
    private HashMap<String, HandlerLopper> handlerLoppers = new HashMap();
    private HashMap<String, List<String>> users = new HashMap();

    public ThreadedClient() throws SocketException {
        this(8849);
    }

    public ThreadedClient(int subscribePort) throws SocketException {
        super(subscribePort);
    }

    public ThreadedClient(String subscribeHost, int subscribePort) throws SocketException {
        super(subscribeHost, subscribePort);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected boolean doReconnect(AbstractClient.Site site) {
        String topicStr = site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName;
        Thread handlerLopper = null;
        HashMap<String, HandlerLopper> hashMap = this.handlerLoppers;
        synchronized (hashMap) {
            if (!this.handlerLoppers.containsKey(topicStr)) {
                throw new RuntimeException("Subscribe thread is not started");
            }
            handlerLopper = this.handlerLoppers.get(topicStr);
        }
        handlerLopper.interrupt();
        try {
            this.subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1L, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord);
            Date d = new Date();
            SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            System.out.println(df.format(d) + " Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName);
            return true;
        }
        catch (Exception ex) {
            Date d = new Date();
            SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            System.out.println(df.format(d) + " Unable to subscribe table. Will try again after 1 seconds." + site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName);
            ex.printStackTrace();
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String password) throws IOException {
        BlockingQueue<List<IMessage>> queue = this.subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password);
        HandlerLopper handlerLopper = new HandlerLopper(queue, handler);
        handlerLopper.start();
        String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
        List<String> usr = Arrays.asList(userName, password);
        HashMap<String, HandlerLopper> hashMap = this.handlerLoppers;
        synchronized (hashMap) {
            this.handlerLoppers.put(topicStr, handlerLopper);
            this.users.put(topicStr, usr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password) throws IOException {
        if (batchSize <= 0) {
            throw new IllegalArgumentException("BatchSize must be greater than zero");
        }
        if (throttle < 0) {
            throw new IllegalArgumentException("Throttle must be greater than or equal to zero");
        }
        BlockingQueue<List<IMessage>> queue = this.subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password);
        HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0 ? -1 : throttle);
        handlerLopper.start();
        String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
        List<String> usr = Arrays.asList(userName, password);
        HashMap<String, HandlerLopper> hashMap = this.handlerLoppers;
        synchronized (hashMap) {
            this.handlerLoppers.put(topicStr, handlerLopper);
            this.users.put(topicStr, usr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, float throttle, String userName, String password) throws IOException {
        if (batchSize <= 0) {
            throw new IllegalArgumentException("BatchSize must be greater than zero");
        }
        if (throttle < 0.0f) {
            throw new IllegalArgumentException("Throttle must be greater than or equal to zero");
        }
        BlockingQueue<List<IMessage>> queue = this.subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password);
        HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0.0f ? -1.0f : throttle);
        handlerLopper.start();
        String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
        List<String> usr = Arrays.asList(userName, password);
        HashMap<String, HandlerLopper> hashMap = this.handlerLoppers;
        synchronized (hashMap) {
            this.handlerLoppers.put(topicStr, handlerLopper);
            this.users.put(topicStr, usr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(String host, int port, String tableName, String actionName, BatchMessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, float throttle, String userName, String password) throws IOException {
        if (batchSize <= 0) {
            throw new IllegalArgumentException("BatchSize must be greater than zero");
        }
        if (throttle < 0.0f) {
            throw new IllegalArgumentException("Throttle must be greater than or equal to zero");
        }
        BlockingQueue<List<IMessage>> queue = this.subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password);
        HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0.0f ? -1.0f : throttle);
        handlerLopper.start();
        String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
        List<String> usr = Arrays.asList(userName, password);
        HashMap<String, HandlerLopper> hashMap = this.handlerLoppers;
        synchronized (hashMap) {
            this.handlerLoppers.put(topicStr, handlerLopper);
            this.users.put(topicStr, usr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(String host, int port, String tableName, String actionName, BatchMessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password) throws IOException {
        if (batchSize <= 0) {
            throw new IllegalArgumentException("BatchSize must be greater than zero");
        }
        if (throttle < 0) {
            throw new IllegalArgumentException("Throttle must be greater than or equal to zero");
        }
        BlockingQueue<List<IMessage>> queue = this.subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password);
        HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0 ? -1 : throttle);
        handlerLopper.start();
        String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
        List<String> usr = Arrays.asList(userName, password);
        HashMap<String, HandlerLopper> hashMap = this.handlerLoppers;
        synchronized (hashMap) {
            this.handlerLoppers.put(topicStr, handlerLopper);
            this.users.put(topicStr, usr);
        }
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, String userName, String password) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, null, allowExistTopic, userName, password);
    }

    public void subscribe(String host, int port, String tableName, String actionName, BatchMessageHandler batchMessageHandler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, float throttle) throws IOException {
        this.subscribe(host, port, tableName, actionName, batchMessageHandler, offset, reconnect, filter, deserializer, allowExistTopic, batchSize, throttle, "", "");
    }

    public void subscribe(String host, int port, String tableName, String actionName, BatchMessageHandler batchMessageHandler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle) throws IOException {
        this.subscribe(host, port, tableName, actionName, batchMessageHandler, offset, reconnect, filter, deserializer, allowExistTopic, batchSize, throttle, "", "");
    }

    public void subscribe(String host, int port, String tableName, String actionName, BatchMessageHandler batchMessageHandler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, int batchSize, float throttle) throws IOException {
        this.subscribe(host, port, tableName, actionName, batchMessageHandler, offset, reconnect, filter, null, allowExistTopic, batchSize, throttle);
    }

    public void subscribe(String host, int port, String tableName, String actionName, BatchMessageHandler batchMessageHandler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, int batchSize, int throttle) throws IOException {
        this.subscribe(host, port, tableName, actionName, batchMessageHandler, offset, reconnect, filter, (StreamDeserializer)null, allowExistTopic, batchSize, throttle);
    }

    public void subscribe(String host, int port, String tableName, String actionName, BatchMessageHandler batchMessageHandler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, int batchSize, float throttle, String userName, String passWord) throws IOException {
        this.subscribe(host, port, tableName, actionName, batchMessageHandler, offset, reconnect, filter, null, allowExistTopic, batchSize, throttle, userName, passWord);
    }

    public void subscribe(String host, int port, String tableName, String actionName, BatchMessageHandler batchMessageHandler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, int batchSize, int throttle, String userName, String passWord) throws IOException {
        this.subscribe(host, port, tableName, actionName, batchMessageHandler, offset, reconnect, filter, (StreamDeserializer)null, allowExistTopic, batchSize, throttle, userName, passWord);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, batchSize, throttle, "", "");
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, float throttle) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, batchSize, throttle, "", "");
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, "", "");
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, null, allowExistTopic);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer) throws Exception {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, true, "", "");
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, int batchSize, int throttle) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, (StreamDeserializer)null, allowExistTopic, batchSize, throttle);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, int batchSize, float throttle) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, null, allowExistTopic, batchSize, throttle);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, int batchSize, int throttle, String userName, String passWord) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, (StreamDeserializer)null, allowExistTopic, batchSize, throttle, userName, passWord);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, int batchSize, float throttle, String userName, String passWord) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, null, allowExistTopic, batchSize, throttle, userName, passWord);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, null, null, false, userName, password);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, null, null, false, "", "");
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, Vector filter) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, false, filter, null, false, "", "");
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, false);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, -1L);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, boolean reconnect) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, -1L, reconnect);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, -1L);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler, boolean reconnect) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, -1L, reconnect);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler, long offset) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, offset);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler, long offset, boolean reconnect) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, offset, reconnect);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler, long offset, boolean reconnect, String userName, String password) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, offset, reconnect, userName, password);
    }

    public void unsubscribe(String host, int port, String tableName, String actionName) throws IOException {
        this.unsubscribeInternal(host, port, tableName, actionName);
    }

    public void unsubscribe(String host, int port, String tableName) throws IOException {
        this.unsubscribeInternal(host, port, tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
        DBConnection dbConn = new DBConnection();
        String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
        List<String> usr = this.users.get(fullTableName);
        String user = usr.get(0);
        String pwd = usr.get(1);
        if (!user.equals("")) {
            dbConn.connect(host, port, user, pwd);
        } else {
            dbConn.connect(host, port);
        }
        try {
            String localIP = this.listeningHost;
            if (localIP.equals("")) {
                localIP = dbConn.getLocalAddress().getHostAddress();
            }
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new BasicString(localIP));
            params.add(new BasicInt(this.listeningPort));
            params.add(new BasicString(tableName));
            params.add(new BasicString(actionName));
            dbConn.run("stopPublishTable", params);
            String topic = null;
            Object object = this.tableNameToTrueTopic;
            synchronized (object) {
                topic = (String)this.tableNameToTrueTopic.get(fullTableName);
            }
            object = this.trueTopicToSites;
            synchronized (object) {
                AbstractClient.Site[] sites = (AbstractClient.Site[])this.trueTopicToSites.get(topic);
                if (sites == null || sites.length == 0) {
                    // empty if block
                }
                for (int i = 0; i < sites.length; ++i) {
                    sites[i].closed = true;
                }
            }
            object = this.queueManager;
            synchronized (object) {
                this.queueManager.removeQueue(topic);
            }
            System.out.println("Successfully unsubscribed table " + fullTableName);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            dbConn.close();
            String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
            HandlerLopper handlerLopper = null;
            HashMap<String, HandlerLopper> hashMap = this.handlerLoppers;
            synchronized (hashMap) {
                handlerLopper = this.handlerLoppers.get(topicStr);
                this.handlerLoppers.remove(topicStr);
                handlerLopper.interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        HashMap<String, HandlerLopper> hashMap = this.handlerLoppers;
        synchronized (hashMap) {
            Iterator<HandlerLopper> it = this.handlerLoppers.values().iterator();
            while (it.hasNext()) {
                it.next().interrupt();
            }
            this.handlerLoppers.clear();
        }
        this.pThread.interrupt();
    }

    class HandlerLopper
    extends Thread {
        BlockingQueue<List<IMessage>> queue;
        MessageHandler handler;
        BatchMessageHandler batchHandler;
        private int batchSize = -1;
        private int throttle = -1;
        private float secondThrottle = -1.0f;

        HandlerLopper(BlockingQueue<List<IMessage>> queue, BatchMessageHandler batchMessageHandler, int batchSize, float secondThrottle) {
            this.queue = queue;
            this.batchHandler = batchMessageHandler;
            this.batchSize = batchSize;
            this.secondThrottle = secondThrottle;
        }

        HandlerLopper(BlockingQueue<List<IMessage>> queue, MessageHandler handler, int batchSize, float secondThrottle) {
            this.queue = queue;
            this.handler = handler;
            this.batchSize = batchSize;
            this.secondThrottle = secondThrottle;
        }

        HandlerLopper(BlockingQueue<List<IMessage>> queue, BatchMessageHandler batchHandler, int batchSize, int throttle) {
            this.queue = queue;
            this.batchHandler = batchHandler;
            this.batchSize = batchSize;
            this.throttle = throttle;
        }

        HandlerLopper(BlockingQueue<List<IMessage>> queue, MessageHandler handler) {
            this.queue = queue;
            this.handler = handler;
        }

        HandlerLopper(BlockingQueue<List<IMessage>> queue, MessageHandler handler, int batchSize, int throttle) {
            this.queue = queue;
            this.handler = handler;
            this.batchSize = batchSize;
            this.throttle = throttle;
        }

        @Override
        public void run() {
            while (!this.isInterrupted()) {
                List tmp;
                LocalTime end;
                ArrayList msgs = null;
                if (this.batchSize == -1 && this.throttle == -1 || this.batchSize == -1 && this.secondThrottle == -1.0f) {
                    try {
                        msgs = this.queue.take();
                    }
                    catch (InterruptedException e) {
                        return;
                    }
                } else if (this.batchSize != -1 && this.throttle != -1 || this.batchSize != -1 && this.secondThrottle != -1.0f) {
                    end = this.throttle != -1 ? LocalTime.now().plusNanos(this.throttle * 1000000) : LocalTime.now().plusNanos((long)this.secondThrottle * 1000000000L);
                    while (msgs == null || (msgs == null || msgs.size() < this.batchSize) && LocalTime.now().isBefore(end)) {
                        tmp = (ArrayList)this.queue.poll();
                        if (tmp == null) continue;
                        if (msgs == null) {
                            msgs = new ArrayList(tmp);
                            continue;
                        }
                        msgs.addAll(tmp);
                    }
                } else {
                    end = this.throttle != -1 ? LocalTime.now().plusNanos(this.throttle * 1000000) : LocalTime.now().plusNanos((long)this.secondThrottle * 1000000000L);
                    while (msgs == null || LocalTime.now().isBefore(end)) {
                        tmp = (List)this.queue.poll();
                        if (tmp == null) continue;
                        if (msgs == null) {
                            msgs = tmp;
                            continue;
                        }
                        msgs.addAll(tmp);
                    }
                }
                if (msgs == null) continue;
                if (this.batchHandler != null) {
                    this.batchHandler.batchHandler(msgs);
                    continue;
                }
                for (IMessage msg : msgs) {
                    this.handler.doEvent(msg);
                }
            }
        }
    }
}

