/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client.streamingSQL;

import com.xxdb.DBConnection;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicIntVector;
import com.xxdb.data.BasicString;
import com.xxdb.data.BasicStringVector;
import com.xxdb.data.BasicTable;
import com.xxdb.data.Entity;
import com.xxdb.data.Table;
import com.xxdb.data.Utils;
import com.xxdb.data.Vector;
import com.xxdb.data.Void;
import com.xxdb.io.ExtendedDataOutput;
import com.xxdb.streaming.client.AbstractClient;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageHandler;
import com.xxdb.streaming.client.Site;
import com.xxdb.streaming.client.streamingSQL.StreamingSQLResultUpdater;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingSQLClient
extends AbstractClient {
    private DBConnection conn;
    private String host;
    private int port;
    private String userName;
    private String password;
    private BasicIntVector deleteLineMap = new BasicIntVector(0);
    private static final Logger log = LoggerFactory.getLogger(StreamingSQLClient.class);

    public StreamingSQLClient(String host, int port, String userName, String password) throws IOException {
        if (Utils.isEmpty(host)) {
            throw new IllegalArgumentException("The param 'host' cannot be null or empty.");
        }
        this.host = host;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.conn = new DBConnection();
        this.conn.connect(this.host, this.port, this.userName, this.password);
    }

    public void declareStreamingSQLTable(String tableName) {
        try {
            this.checkConnConnect();
            this.conn.run("declareStreamingSQLTable(" + tableName + ")");
        }
        catch (IOException e) {
            throw new RuntimeException("declare streaming sql table error: " + e);
        }
    }

    public void revokeStreamingSQLTable(String tableName) {
        try {
            this.checkConnConnect();
            this.conn.run("revokeStreamingSQLTable(\"" + tableName + "\")");
        }
        catch (IOException e) {
            throw new RuntimeException("revoke streaming sql table error: " + e);
        }
    }

    public BasicTable listStreamingSQLTables() {
        try {
            this.checkConnConnect();
            return (BasicTable)this.conn.run("listStreamingSQLTables()");
        }
        catch (IOException e) {
            throw new RuntimeException("revoke streaming sql table error: " + e);
        }
    }

    public String registerStreamingSQL(String sqlQuery) {
        return this.registerStreamingSQL(sqlQuery, null, Integer.MIN_VALUE);
    }

    public String registerStreamingSQL(String sqlQuery, String queryId) {
        return this.registerStreamingSQL(sqlQuery, queryId, Integer.MIN_VALUE);
    }

    public String registerStreamingSQL(String sqlQuery, int logTableCacheSize) {
        return this.registerStreamingSQL(sqlQuery, null, logTableCacheSize);
    }

    public String registerStreamingSQL(String sqlQuery, String queryId, int logTableCacheSize) {
        try {
            if (Utils.isEmpty(sqlQuery)) {
                throw new IllegalArgumentException("The param 'sqlQuery' cannot be null or empty.");
            }
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new BasicString(sqlQuery));
            if (Utils.isNotEmpty(queryId)) {
                params.add(new BasicString(queryId));
            } else {
                params.add(new Void());
            }
            if (logTableCacheSize != Integer.MIN_VALUE && logTableCacheSize <= 0) {
                throw new IllegalArgumentException("logTableCacheSize must be a positive integer.");
            }
            if (logTableCacheSize == Integer.MIN_VALUE) {
                params.add(new Void());
            } else {
                params.add(new BasicInt(logTableCacheSize));
            }
            this.checkConnConnect();
            Entity streamingSQLTableName = this.conn.run("registerStreamingSQL", params);
            return streamingSQLTableName.getString();
        }
        catch (IOException e) {
            throw new RuntimeException("register streaming SQL error: " + e);
        }
    }

    public void revokeStreamingSQL(String queryId) {
        try {
            this.checkConnConnect();
            this.conn.run("revokeStreamingSQL(\"" + queryId + "\")");
        }
        catch (IOException e) {
            throw new RuntimeException("revoke streaming SQL error: " + e);
        }
    }

    public BasicTable getStreamingSQLStatus() {
        return this.getStreamingSQLStatus(null);
    }

    public BasicTable getStreamingSQLStatus(String queryId) {
        try {
            this.checkConnConnect();
            if (Objects.equals(queryId, "")) {
                throw new IllegalArgumentException("The param 'queryId' cannot be empty.");
            }
            if (Objects.isNull(queryId)) {
                return (BasicTable)this.conn.run("getStreamingSQLStatus()");
            }
            return (BasicTable)this.conn.run("getStreamingSQLStatus(\"" + queryId + "\")");
        }
        catch (IOException e) {
            throw new RuntimeException("get streaming SQL status error: " + e);
        }
    }

    public BasicTable subscribeStreamingSQL(String queryId) throws IOException {
        return this.subscribeStreamingSQL(queryId, -1, -1.0f);
    }

    public BasicTable subscribeStreamingSQL(String queryId, final int batchSize, final float throttle) throws IOException {
        MessageHandler handler;
        TableWrapper resultWrapper;
        block3: {
            resultWrapper = new TableWrapper(null);
            handler = new MessageHandler(){

                @Override
                public void doEvent(IMessage msg) {
                    try {
                        if (resultWrapper.table != null) {
                            log.debug("Before update\uff0ctable content: " + resultWrapper.table.getString());
                            StreamingSQLResultUpdater.StreamingSQLResult sqlResult = StreamingSQLResultUpdater.updateStreamingSQLResult(resultWrapper.table, StreamingSQLClient.this.deleteLineMap, msg);
                            BasicTable newTable = sqlResult.table;
                            StreamingSQLClient.this.deleteLineMap = sqlResult.deleteLineMap;
                            resultWrapper.table = newTable;
                        }
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            };
            if (Utils.isEmpty(queryId)) {
                throw new IllegalArgumentException("The param 'queryId' cannot be null or empty.");
            }
            try {
                this.getStreamingSQLStatus(queryId);
            }
            catch (Exception e) {
                if (!e.getMessage().contains("queryId " + queryId + " does not exist")) break block3;
                throw new IllegalArgumentException("queryId " + queryId + " does not exist.");
            }
        }
        Map<String, Object> res = this.subscribeStreamingSqlLogInfoInternal(this.host, this.port, queryId, queryId, handler, -1L, false, null, null, false, this.userName, this.password, true, true);
        final BlockingQueue queue = (BlockingQueue)res.get("queue");
        resultWrapper.table = (BasicTable)res.get("schema");
        log.debug("Created initial table, contain: " + resultWrapper.table.columns() + " cols");
        Thread thread = new Thread(new Runnable(){

            @Override
            public void run() {
                log.info("StreamingSQLClient subscribe start.");
                while (!StreamingSQLClient.this.isClose()) {
                    List tmp;
                    long now;
                    ArrayList msgs = null;
                    if (batchSize == -1 && throttle == -1.0f) {
                        try {
                            msgs = (ArrayList)queue.take();
                        }
                        catch (InterruptedException e) {
                            return;
                        }
                    }
                    if (batchSize != -1 && throttle != -1.0f) {
                        now = System.currentTimeMillis();
                        long end = now + (long)(throttle * 1000.0f);
                        while (msgs == null || msgs.size() < batchSize && System.currentTimeMillis() < end) {
                            tmp = null;
                            try {
                                now = System.currentTimeMillis();
                                tmp = end - now <= 0L ? (List)queue.take() : (List)queue.poll(end - now, TimeUnit.MILLISECONDS);
                            }
                            catch (InterruptedException e) {
                                break;
                            }
                            if (tmp == null) continue;
                            if (msgs == null) {
                                msgs = new ArrayList(tmp);
                                continue;
                            }
                            msgs.addAll(tmp);
                        }
                    } else {
                        now = System.currentTimeMillis();
                        long end = now + (long)(throttle * 1000.0f);
                        while (msgs == null || System.currentTimeMillis() < end) {
                            tmp = null;
                            try {
                                now = System.currentTimeMillis();
                                tmp = end - now <= 0L ? (List)queue.take() : (List)queue.poll(end - now, TimeUnit.MILLISECONDS);
                            }
                            catch (InterruptedException e) {
                                break;
                            }
                            if (tmp == null) continue;
                            if (msgs == null) {
                                msgs = tmp;
                                continue;
                            }
                            msgs.addAll(tmp);
                        }
                    }
                    if (msgs == null) continue;
                    for (IMessage msg : msgs) {
                        handler.doEvent(msg);
                    }
                }
            }
        });
        thread.start();
        return new ProxyTable(resultWrapper);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unsubscribeStreamingSQL(String queryId) throws IOException {
        try {
            ArrayList<Entity> params = new ArrayList<Entity>();
            params.add(new Void());
            params.add(new BasicString(queryId));
            this.checkConnConnect();
            this.conn.run("unsubscribeStreamingSQL", params);
            String topic = null;
            String fullTableName = this.host + ":" + this.port + "/" + queryId + "/" + queryId;
            Object object = this.tableNameToTrueTopic;
            synchronized (object) {
                topic = (String)this.tableNameToTrueTopic.get(fullTableName);
            }
            object = this.trueTopicToSites;
            synchronized (object) {
                Site[] sites = (Site[])this.trueTopicToSites.get(topic);
                if (sites == null || sites.length == 0) {
                    // empty if block
                }
                for (int i = 0; i < sites.length; ++i) {
                    sites[i].setClosed(true);
                }
            }
            object = this.queueManager;
            synchronized (object) {
                this.queueManager.removeQueue(topic);
            }
            log.info("Successfully unsubscribed table " + fullTableName);
        }
        catch (IOException e) {
            throw e;
        }
        finally {
            this.conn.close();
        }
    }

    @Override
    protected boolean doReconnect(Site site) {
        try {
            this.subscribeStreamingSqlLogInfoInternal(site.getHost(), site.getPort(), site.getTableName(), site.getActionName(), site.getHandler(), site.getMsgId() + 1L, true, null, null, false, site.getUserName(), site.getPassWord(), true, true);
            Date d = new Date();
            SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            log.info(df.format(d) + " Successfully reconnected and subscribed " + site.getHost() + ":" + site.getPort() + "/" + site.getTableName() + "/" + site.getActionName());
            return true;
        }
        catch (Exception ex) {
            Date d = new Date();
            SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            log.error(df.format(d) + " Unable to subscribe table. Will try again after 1 seconds." + site.getHost() + ":" + site.getPort() + "/" + site.getTableName() + "/" + site.getActionName());
            ex.printStackTrace();
            return false;
        }
    }

    private void checkConnConnect() throws IOException {
        if (!this.conn.isConnected()) {
            this.conn = new DBConnection();
            this.conn.connect(this.host, this.port, this.userName, this.password);
        }
    }

    private static List<String> createSafeColumnNames() {
        ArrayList<String> columnNames = new ArrayList<String>();
        columnNames.add("dummy");
        return columnNames;
    }

    private static List<Vector> createSafeColumns() {
        ArrayList<Vector> columns = new ArrayList<Vector>();
        columns.add(new BasicStringVector(0));
        return columns;
    }

    private class ProxyTable
    extends BasicTable {
        private final TableWrapper wrapper;

        public ProxyTable(TableWrapper wrapper) {
            super(StreamingSQLClient.createSafeColumnNames(), StreamingSQLClient.createSafeColumns());
            this.wrapper = wrapper;
        }

        @Override
        public int rows() {
            return this.wrapper.table != null ? this.wrapper.table.rows() : 0;
        }

        @Override
        public int columns() {
            return this.wrapper.table != null ? this.wrapper.table.columns() : 0;
        }

        @Override
        public String getColumnName(int index) {
            return this.wrapper.table != null ? this.wrapper.table.getColumnName(index) : "";
        }

        @Override
        public Vector getColumn(int index) {
            return this.wrapper.table != null ? this.wrapper.table.getColumn(index) : null;
        }

        @Override
        public Vector getColumn(String name) {
            return this.wrapper.table != null ? this.wrapper.table.getColumn(name) : null;
        }

        @Override
        public String getString() {
            return this.wrapper.table != null ? this.wrapper.table.getString() : "";
        }

        @Override
        public Entity.DATA_CATEGORY getDataCategory() {
            return this.wrapper.table != null ? this.wrapper.table.getDataCategory() : Entity.DATA_CATEGORY.MIXED;
        }

        @Override
        public Entity.DATA_TYPE getDataType() {
            return this.wrapper.table != null ? this.wrapper.table.getDataType() : Entity.DATA_TYPE.DT_DICTIONARY;
        }

        @Override
        public Entity.DATA_FORM getDataForm() {
            return this.wrapper.table != null ? this.wrapper.table.getDataForm() : Entity.DATA_FORM.DF_TABLE;
        }

        @Override
        public void write(ExtendedDataOutput out) throws IOException {
            if (this.wrapper.table != null) {
                this.wrapper.table.write(out);
            }
        }

        @Override
        public Table getSubTable(int[] indices) {
            return this.wrapper.table != null ? this.wrapper.table.getSubTable(indices) : null;
        }
    }

    private static class TableWrapper {
        public volatile BasicTable table;

        public TableWrapper(BasicTable table) {
            this.table = table;
        }
    }
}

