/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.data.AbstractVector;
import com.xxdb.data.BasicAnyVector;
import com.xxdb.data.BasicEntityFactory;
import com.xxdb.data.BasicTable;
import com.xxdb.data.Entity;
import com.xxdb.io.BigEndianDataInputStream;
import com.xxdb.io.ExtendedDataInput;
import com.xxdb.io.LittleEndianDataInputStream;
import com.xxdb.streaming.client.AbstractClient;
import com.xxdb.streaming.client.BasicMessage;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageDispatcher;
import com.xxdb.streaming.client.StreamDeserializer;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

class MessageParser
implements Runnable {
    private final int MAX_FORM_VALUE = Entity.DATA_FORM.values().length - 1;
    private final int MAX_TYPE_VALUE = Entity.DATA_TYPE.DT_OBJECT.getValue();
    BufferedInputStream bis = null;
    Socket socket = null;
    DBConnectionAndSocket dBConnectionAndSocket;
    MessageDispatcher dispatcher;
    String topic;
    HashMap<String, Integer> nameToIndex = null;
    int listeningPort;
    ConcurrentHashMap<String, HashMap<String, Integer>> topicNameToIndex = null;
    private static final char[] hexArray = "0123456789ABCDEF".toCharArray();

    public MessageParser(DBConnectionAndSocket dBConnectionAndSocket, MessageDispatcher dispatcher, int listeningPort) {
        this.dBConnectionAndSocket = dBConnectionAndSocket;
        this.dispatcher = dispatcher;
        this.topicNameToIndex = new ConcurrentHashMap();
        this.listeningPort = listeningPort;
    }

    public static String bytesToHex(byte[] bytes) {
        char[] hexChars = new char[bytes.length * 2];
        for (int j = 0; j < bytes.length; ++j) {
            int v = bytes[j] & 0xFF;
            hexChars[j * 2] = hexArray[v >>> 4];
            hexChars[j * 2 + 1] = hexArray[v & 0xF];
        }
        return new String(hexChars);
    }

    private Boolean isListenMode() {
        return this.listeningPort > 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Map<String, StreamDeserializer> subinfos = this.dispatcher.getSubInfos();
        ConcurrentHashMap<String, AbstractClient.Site[]> topicToSites = this.dispatcher.getTopicToSites();
        Socket socket = null;
        try {
            Boolean isReverseStreaming;
            ExtendedDataInput in = null;
            if (this.dBConnectionAndSocket == null) {
                throw new Exception("dBConnectionAndSocket is null!");
            }
            if (this.dBConnectionAndSocket.socket != null) {
                if (this.dBConnectionAndSocket.conn != null) {
                    throw new Exception("Either conn or socket must be null!");
                }
                socket = this.dBConnectionAndSocket.socket;
                this.bis = new BufferedInputStream(socket.getInputStream());
                isReverseStreaming = false;
            } else if (this.dBConnectionAndSocket.conn != null) {
                DBConnection conn = this.dBConnectionAndSocket.conn;
                in = conn.getDataInputStream();
                socket = conn.getSocket();
                isReverseStreaming = true;
            } else {
                throw new Exception("Both conn and socket is null!");
            }
            while (!this.dispatcher.isClose()) {
                boolean extended;
                if (!isReverseStreaming.booleanValue()) {
                    Boolean isLittle = this.bis.read() != 0;
                    in = isLittle.booleanValue() ? new LittleEndianDataInputStream(this.bis) : new BigEndianDataInputStream(this.bis);
                } else {
                    in.readBoolean();
                }
                in.readLong();
                long msgid = in.readLong();
                this.topic = in.readString();
                short flag = in.readShort();
                int form = flag >> 8;
                int type = flag & 0xFF;
                boolean bl = extended = type >= 128;
                if (type >= 128) {
                    type -= 128;
                }
                if (form < 0 || form > this.MAX_FORM_VALUE) {
                    throw new IOException("Invalid form value: " + form);
                }
                if (type < 0 || type > this.MAX_TYPE_VALUE) {
                    throw new IOException("Invalid type value: " + type);
                }
                Entity.DATA_FORM df = Entity.DATA_FORM.values()[form];
                Entity.DATA_TYPE dt = Entity.DATA_TYPE.valueOf(type);
                Entity body = BasicEntityFactory.instance().createEntity(df, dt, in, extended);
                if (body.isTable() && body.rows() == 0) {
                    for (String t : this.topic.split(",")) {
                        this.dispatcher.setNeedReconnect(t, 0);
                    }
                    assert (body.rows() == 0);
                    this.nameToIndex = new HashMap();
                    BasicTable schema = (BasicTable)body;
                    int columns = schema.columns();
                    for (int i = 0; i < columns; ++i) {
                        String name = schema.getColumnName(i);
                        this.nameToIndex.put(name.toLowerCase(), i);
                    }
                    this.topicNameToIndex.put(this.topic, this.nameToIndex);
                    continue;
                }
                if (body.isVector()) {
                    BasicMessage rec;
                    BasicAnyVector dTable = (BasicAnyVector)body;
                    AbstractClient.Site[] sites = topicToSites.get(this.topic);
                    int colSize = dTable.rows();
                    int rowSize = dTable.getEntity(0).rows();
                    if (sites != null && sites[0].msgAstable) {
                        rec = new BasicMessage(msgid - (long)rowSize + 1L, this.topic, dTable, this.topicNameToIndex.get(this.topic.split(",")[0]));
                        this.dispatcher.dispatch(rec);
                    } else if (rowSize >= 1) {
                        if (this.isListenMode().booleanValue() && rowSize == 1) {
                            rec = new BasicMessage(msgid, this.topic, dTable, this.topicNameToIndex.get(this.topic.split(",")[0]));
                            if (subinfos.get(this.topic) != null) {
                                rec = subinfos.get(this.topic).parse(rec);
                            }
                            this.dispatcher.dispatch(rec);
                        } else {
                            ArrayList<IMessage> messages = new ArrayList<IMessage>(rowSize);
                            long startMsgId = msgid - (long)rowSize + 1L;
                            for (int i = 0; i < rowSize; ++i) {
                                BasicAnyVector row = new BasicAnyVector(colSize);
                                for (int j = 0; j < colSize; ++j) {
                                    AbstractVector vector = (AbstractVector)dTable.getEntity(j);
                                    Entity entity = vector.get(i);
                                    row.setEntity(j, entity);
                                }
                                BasicMessage rec2 = new BasicMessage(startMsgId + (long)i, this.topic, row, this.topicNameToIndex.get(this.topic.split(",")[0]));
                                if (subinfos.get(this.topic) != null) {
                                    rec2 = subinfos.get(this.topic).parse(rec2);
                                }
                                messages.add(rec2);
                            }
                            this.dispatcher.batchDispatch(messages);
                        }
                    }
                    this.dispatcher.setMsgId(this.topic, msgid);
                    continue;
                }
                System.out.println("message body has an invalid format. Vector or table is expected");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            if (this.dispatcher.isClosed(this.topic)) {
                System.out.println("check " + this.topic + " is unsubscribed");
                return;
            }
            this.dispatcher.setNeedReconnect(this.topic, 1);
        }
        catch (Throwable t) {
            t.printStackTrace();
            this.dispatcher.setNeedReconnect(this.topic, 1);
        }
        finally {
            try {
                if (socket != null) {
                    socket.close();
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class DBConnectionAndSocket {
        public DBConnection conn;
        public Socket socket;
    }
}

