/*
 * Decompiled with CFR 0.152.
 */
package com.axibase.tsd.client;

import com.axibase.tsd.client.HttpClientManager;
import com.axibase.tsd.client.PlainStreamingSender;
import com.axibase.tsd.client.StreamingManager;
import com.axibase.tsd.model.system.MarkerState;
import com.axibase.tsd.network.MarkerCommand;
import com.axibase.tsd.network.PlainCommand;
import com.axibase.tsd.query.Query;
import com.axibase.tsd.query.QueryPart;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultStreamingManager
implements StreamingManager {
    private static final Logger log = LoggerFactory.getLogger(DefaultStreamingManager.class);
    public static final String CHECK = "check";
    private static final int DEFAULT_CHECK_PERIOD_MS = 5000;
    private long checkPeriodMillis = 5000L;
    private PlainStreamingSender plainSender = null;
    private final AtomicLong lastPingTime = new AtomicLong(0L);
    private final AtomicReference<String> marker = new AtomicReference();
    private boolean lastPingResult = false;
    private final List<String> saved = new ArrayList<String>();
    private final HttpClientManager httpClientManager;
    private Future<?> senderFuture;
    private ExecutorService checkExecutor;
    private ExecutorService senderExecutor;
    private ReadWriteLock senderLock = new ReentrantReadWriteLock();

    public DefaultStreamingManager(HttpClientManager httpClientManager) {
        if (httpClientManager == null) {
            throw new IllegalArgumentException("httpClientManager is null");
        }
        this.httpClientManager = httpClientManager;
        this.checkExecutor = Executors.newSingleThreadExecutor();
        this.senderExecutor = Executors.newSingleThreadExecutor();
    }

    @Override
    public void setCheckPeriodMillis(long checkPeriodMillis) {
        this.checkPeriodMillis = checkPeriodMillis;
    }

    @Override
    public void close() {
        log.info("Close streaming manager");
        PlainStreamingSender sender = this.plainSender;
        if (sender != null) {
            sender.close();
        }
        this.checkExecutor.shutdown();
        this.senderExecutor.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(PlainCommand plainCommand) {
        if (!this.lastPingResult) {
            throw new IllegalStateException("Last check was bad, call canSend() method before command sending");
        }
        Lock readLock = this.senderLock.readLock();
        readLock.lock();
        try {
            PlainStreamingSender sender = this.plainSender;
            if (sender == null) {
                throw new IllegalStateException("Sender is null");
            }
            if (!sender.isWorking()) {
                throw new IllegalStateException("Sender is in the wrong state");
            }
            sender.send(plainCommand);
        }
        finally {
            readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean canSend() {
        block4: {
            long current;
            long last;
            do {
                last = this.lastPingTime.get();
                current = System.currentTimeMillis();
                if (current - last <= this.checkPeriodMillis) break block4;
            } while (!this.lastPingTime.compareAndSet(last, current));
            this.checkExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        boolean beforeLastResult = DefaultStreamingManager.this.lastPingResult;
                        DefaultStreamingManager.this.prepareAndCheckSender();
                        if (beforeLastResult && DefaultStreamingManager.this.lastPingResult) {
                            DefaultStreamingManager.this.saved.clear();
                        }
                    }
                    catch (Throwable e) {
                        log.error("Could not prepare sender: ", e);
                    }
                }
            });
        }
        Lock readLock = this.senderLock.readLock();
        readLock.lock();
        try {
            boolean bl = this.lastPingResult && this.plainSender != null && this.plainSender.isWorking();
            return bl;
        }
        finally {
            readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareAndCheckSender() {
        if (this.plainSender == null || this.plainSender.isClosed()) {
            Lock writeLock = this.senderLock.writeLock();
            writeLock.lock();
            try {
                if (this.plainSender == null || this.plainSender.isClosed()) {
                    PlainStreamingSender newSender = new PlainStreamingSender(this.httpClientManager.getClientConfiguration(), this.plainSender);
                    if (this.plainSender != null) {
                        log.info("Prepare new sender, close old");
                        this.plainSender.close();
                    }
                    if (this.senderFuture != null) {
                        this.senderFuture.cancel(true);
                    }
                    this.senderFuture = this.senderExecutor.submit(newSender);
                    this.plainSender = newSender;
                }
            }
            finally {
                writeLock.unlock();
            }
        }
        this.lastPingResult = this.check();
        if (this.lastPingResult) {
            this.compareAndSendNewMarker(this.marker.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean check() {
        if (this.httpClientManager.getClientConfiguration().isSkipStreamingControl()) {
            return true;
        }
        boolean needClosing = false;
        Lock readLock = this.senderLock.readLock();
        readLock.lock();
        try {
            if (this.plainSender != null) {
                Map.Entry<String, List<String>> markerAndCommands;
                Map<String, List<String>> markerToMessages = this.plainSender.getMarkerToMessages();
                int size = markerToMessages.size();
                if (size <= 2) {
                    boolean checkResult;
                    MarkerState markerState = this.askMarkerState(CHECK);
                    boolean bl = checkResult = markerState != null && CHECK.equals(markerState.getMarker());
                    if (!checkResult) {
                        log.warn("Bad check result, close sender");
                        needClosing = true;
                    }
                    boolean bl2 = checkResult;
                    return bl2;
                }
                Iterator<Map.Entry<String, List<String>>> iterator2 = markerToMessages.entrySet().iterator();
                for (int i = 0; iterator2.hasNext() && i < size - 2; ++i) {
                    markerAndCommands = iterator2.next();
                    String checkedMarker = markerAndCommands.getKey();
                    MarkerState markerState = this.askMarkerState(checkedMarker);
                    List<String> commands = markerAndCommands.getValue();
                    if (markerState != null && markerState.getCount() != null) {
                        if (markerState.getCount() > commands.size()) {
                            log.warn("Server received more ({}) commands then client sent ({}), marker: {}", new Object[]{markerState.getCount(), commands.size(), checkedMarker});
                        } else if (markerState.getCount() < commands.size()) {
                            log.error("Server received less ({}) commands then client sent ({}), marker: {}", new Object[]{markerState.getCount(), commands.size(), checkedMarker});
                            this.saved.addAll(commands);
                        } else {
                            log.debug("Server received same command count ({}) that client sent, marker: {}", (Object)commands.size(), (Object)checkedMarker);
                        }
                        iterator2.remove();
                        continue;
                    }
                    log.warn("Could not get command count for marker {}", this.marker);
                    this.saved.addAll(commands);
                    iterator2.remove();
                }
                if (this.saved.isEmpty()) {
                    boolean iterator2 = true;
                    return iterator2;
                }
                iterator2 = markerToMessages.entrySet().iterator();
                while (iterator2.hasNext()) {
                    markerAndCommands = iterator2.next();
                    List<String> commands = markerAndCommands.getValue();
                    this.saved.addAll(commands);
                    iterator2.remove();
                }
                log.warn("Save {} commands, broken sender will be closed", (Object)this.saved.size());
                needClosing = true;
                boolean bl = false;
                return bl;
            }
            log.warn("Sender is null");
            boolean markerToMessages = false;
            return markerToMessages;
        }
        catch (Throwable e) {
            log.warn("Ping error: ", e);
            boolean bl = false;
            return bl;
        }
        finally {
            readLock.unlock();
            if (needClosing) {
                Lock writeLock = this.senderLock.writeLock();
                writeLock.lock();
                try {
                    this.plainSender.close();
                }
                finally {
                    writeLock.unlock();
                }
            }
        }
    }

    private MarkerState askMarkerState(String marker) {
        MarkerState markerState = null;
        try {
            QueryPart markersPath = new Query("command").path("marker");
            QueryPart query = markersPath.param("v", marker);
            markerState = this.httpClientManager.requestData(MarkerState.class, query, null);
            log.debug("From server {} received the following state of marker ({}): {}", new Object[]{this.httpClientManager.getClientConfiguration().getDataUrl(), marker, markerState});
        }
        catch (Throwable e) {
            log.error("Error while checking marker count: ", e);
        }
        return markerState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void compareAndSendNewMarker(String current) {
        log.debug("Send merker: {}", (Object)current);
        if (this.httpClientManager.getClientConfiguration().isSkipStreamingControl()) {
            return;
        }
        MarkerCommand markerCommand = new MarkerCommand();
        String newMarker = markerCommand.getMarker();
        if (this.marker.compareAndSet(current, newMarker)) {
            Lock readLock = this.senderLock.readLock();
            readLock.lock();
            try {
                if (this.plainSender == null) {
                    throw new IllegalStateException("Sender is null");
                }
                if (!this.plainSender.isWorking()) {
                    throw new IllegalStateException("Sender is incorrect");
                }
                this.plainSender.send(markerCommand);
            }
            finally {
                readLock.unlock();
            }
        } else {
            log.warn("Current marker:{} is already replaced by another marker:", (Object)current, (Object)this.marker.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<String> removeSavedPlainCommands() {
        if (this.saved.isEmpty()) {
            return Collections.emptyList();
        }
        List<String> list = this.saved;
        synchronized (list) {
            ArrayList<String> result = new ArrayList<String>(this.saved);
            this.saved.removeAll(result);
            if (result.size() > 0) {
                log.info("{} commands are removed from saved list", (Object)result.size());
            }
            return result;
        }
    }
}

