/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client.cep;

import com.xxdb.DBConnection;
import com.xxdb.comm.ErrorCodeInfo;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicString;
import com.xxdb.data.Entity;
import com.xxdb.data.Utils;
import com.xxdb.data.Vector;
import com.xxdb.streaming.client.AbstractClient;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageHandler;
import com.xxdb.streaming.client.Site;
import com.xxdb.streaming.client.cep.EventHandler;
import com.xxdb.streaming.client.cep.EventMessageHandler;
import com.xxdb.streaming.client.cep.EventSchema;
import java.io.IOException;
import java.net.SocketException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventClient
extends AbstractClient {
    private EventHandler eventHandler;
    private static final Logger log = LoggerFactory.getLogger(EventClient.class);

    public EventClient(List<EventSchema> eventSchemas, List<String> eventTimeFields, List<String> commonFields) throws SocketException {
        super(0);
        this.eventHandler = new EventHandler(eventSchemas, eventTimeFields, commonFields);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, null, userName, password);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, String userName, String password) throws IOException {
        BlockingQueue<List<IMessage>> queue;
        if (Utils.isEmpty(tableName)) {
            throw new IllegalArgumentException("EventClient subscribe 'tableName' param cannot be null or empty.");
        }
        if (Utils.isEmpty(actionName)) {
            actionName = "javaStreamingApi";
        }
        if ((queue = this.subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, null, false, userName, password, false)) == null) {
            log.error("Subscription already made, handler loop not created.");
            return;
        }
        Thread thread = new Thread(() -> {
            log.info("EventClient subscribe start.");
            ArrayList<String> eventTypes = new ArrayList<String>();
            ArrayList<List<Entity>> attributes = new ArrayList<List<Entity>>();
            ErrorCodeInfo errorInfo = new ErrorCodeInfo();
            boolean foundNull = false;
            while (!foundNull) {
                List msgs;
                try {
                    msgs = (List)queue.take();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                if (msgs.size() == 0) {
                    foundNull = true;
                    break;
                }
                eventTypes.clear();
                attributes.clear();
                if (!this.eventHandler.deserializeEvent(msgs, eventTypes, attributes, errorInfo)) {
                    log.error("deserialize fail " + errorInfo.getErrorInfo());
                    continue;
                }
                int rowSize = eventTypes.size();
                for (int i = 0; i < rowSize; ++i) {
                    ((EventMessageHandler)handler).doEvent((String)eventTypes.get(i), (List)attributes.get(i));
                }
            }
            log.info("nht handle exit.");
        });
        thread.start();
    }

    public void unsubscribe(String host, int port, String tableName, String actionName) throws IOException {
        if (Utils.isEmpty(actionName)) {
            actionName = "javaStreamingApi";
        }
        this.unsubscribeInternal(host, port, tableName, actionName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
        DBConnection dbConn = new DBConnection();
        List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
        List usr = (List)this.users.get(tp);
        String user = (String)usr.get(0);
        String pwd = (String)usr.get(1);
        if (!user.equals("")) {
            dbConn.connect(host, port, user, pwd);
        } else {
            dbConn.connect(host, port);
        }
        try {
            String localIP = this.listeningHost;
            if (localIP.equals("")) {
                localIP = dbConn.getLocalAddress().getHostAddress();
            }
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new BasicString(localIP));
            params.add(new BasicInt(this.listeningPort));
            params.add(new BasicString(tableName));
            params.add(new BasicString(actionName));
            dbConn.run("stopPublishTable", params);
            String topic = null;
            String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
            Object object = this.tableNameToTrueTopic;
            synchronized (object) {
                topic = (String)this.tableNameToTrueTopic.get(fullTableName);
            }
            object = this.trueTopicToSites;
            synchronized (object) {
                Site[] sites = (Site[])this.trueTopicToSites.get(topic);
                if (sites == null || sites.length == 0) {
                    // empty if block
                }
                for (int i = 0; i < sites.length; ++i) {
                    sites[i].setClosed(true);
                }
            }
            object = this.queueManager;
            synchronized (object) {
                this.queueManager.removeQueue(topic);
            }
            log.info("Successfully unsubscribed table " + fullTableName);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            dbConn.close();
        }
    }

    @Override
    protected boolean doReconnect(Site site) {
        try {
            site.getHost();
            this.subscribe(site.getHost(), site.getPort(), site.getTableName(), site.getActionName(), site.getHandler(), site.getMsgId() + 1L, true, site.getFilter(), site.getUserName(), site.getPassWord());
            Date d = new Date();
            SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            log.info(df.format(d) + " Successfully reconnected and subscribed " + site.getHost() + ":" + site.getPort() + "/" + site.getTableName() + "/" + site.getActionName());
            return true;
        }
        catch (Exception ex) {
            Date d = new Date();
            SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            log.error(df.format(d) + " Unable to subscribe table. Will try again after 1 seconds." + site.getHost() + ":" + site.getPort() + "/" + site.getTableName() + "/" + site.getActionName());
            ex.printStackTrace();
            return false;
        }
    }
}

