/*
 * Decompiled with CFR 0.152.
 */
package com.axibase.tsd.client;

import com.axibase.tsd.client.HttpClientManager;
import com.axibase.tsd.client.PlainSender;
import com.axibase.tsd.client.StreamingManager;
import com.axibase.tsd.model.system.MarkerState;
import com.axibase.tsd.plain.MarkerCommand;
import com.axibase.tsd.plain.PlainCommand;
import com.axibase.tsd.query.Query;
import com.axibase.tsd.query.QueryPart;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultStreamingManager
implements StreamingManager {
    private static final Logger log = LoggerFactory.getLogger(DefaultStreamingManager.class);
    public static final String CHECK = "check";
    private static final int DEFAULT_CHECK_PERIOD_MS = 5000;
    private long checkPeriodMillis = 5000L;
    private final AtomicReference<PlainSender> plainSender = new AtomicReference();
    private final AtomicLong lastPingTime = new AtomicLong(0L);
    private final AtomicReference<String> marker = new AtomicReference();
    private boolean lastPingResult = false;
    private final List<String> saved = new ArrayList<String>();
    private final HttpClientManager httpClientManager;
    private Future<?> senderFuture;
    private ExecutorService checkExecutor;
    private ExecutorService senderExecutor;

    public DefaultStreamingManager(HttpClientManager httpClientManager) {
        if (httpClientManager == null) {
            throw new IllegalArgumentException("httpClientManager is null");
        }
        this.httpClientManager = httpClientManager;
        this.checkExecutor = Executors.newSingleThreadExecutor();
        this.senderExecutor = Executors.newSingleThreadExecutor();
    }

    @Override
    public void close() {
        log.info("Close streaming manager");
        PlainSender sender = this.plainSender.get();
        if (sender != null) {
            sender.close();
        }
        this.checkExecutor.shutdown();
        this.senderExecutor.shutdown();
    }

    @Override
    public void send(PlainCommand plainCommand) {
        if (!this.lastPingResult) {
            throw new IllegalStateException("Last check was bad, call canSend() method before command sending");
        }
        PlainSender sender = this.plainSender.get();
        if (sender == null) {
            throw new IllegalStateException("Sender is null");
        }
        if (!sender.isWorking()) {
            throw new IllegalStateException("Sender is in the wrong state");
        }
        sender.send(plainCommand);
    }

    @Override
    public boolean canSend() {
        long last = this.lastPingTime.get();
        long current = System.currentTimeMillis();
        if (current - last > this.checkPeriodMillis && this.lastPingTime.compareAndSet(last, current)) {
            this.checkExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        boolean beforeLastResult = DefaultStreamingManager.this.lastPingResult;
                        DefaultStreamingManager.this.prepareAndCheckSender();
                        if (beforeLastResult && DefaultStreamingManager.this.lastPingResult) {
                            DefaultStreamingManager.this.saved.clear();
                        }
                    }
                    catch (Throwable e) {
                        log.error("Could not prepare sender: ", e);
                    }
                }
            });
        }
        return this.lastPingResult && this.plainSender.get() != null && this.plainSender.get().isWorking();
    }

    private void prepareAndCheckSender() {
        PlainSender newSender;
        PlainSender sender = this.plainSender.get();
        if ((sender == null || sender.isClosed()) && this.plainSender.compareAndSet(sender, newSender = new PlainSender(this.httpClientManager.getClientConfiguration(), sender))) {
            if (sender != null) {
                log.info("Prepare new sender, close old");
                sender.close();
            }
            if (this.senderFuture != null) {
                this.senderFuture.cancel(true);
            }
            this.senderFuture = this.senderExecutor.submit(newSender);
        }
        this.lastPingResult = this.check();
        if (this.lastPingResult) {
            this.compareAndSendNewMarker(this.marker.get());
        }
    }

    private boolean check() {
        if (this.httpClientManager.getClientConfiguration().isSkipStreamingControl()) {
            return true;
        }
        PlainSender sender = this.plainSender.get();
        try {
            if (sender != null) {
                Map.Entry<String, List<String>> markerAndCommands;
                Map<String, List<String>> markerToMessages = sender.getMarkerToMessages();
                int size = markerToMessages.size();
                if (size <= 2) {
                    boolean checkResult;
                    MarkerState markerState = this.askMarkerState(CHECK);
                    boolean bl = checkResult = markerState != null && CHECK.equals(markerState.getMarker());
                    if (!checkResult) {
                        log.warn("Bad check result, close sender");
                        sender.close();
                    }
                    return checkResult;
                }
                Iterator<Map.Entry<String, List<String>>> iterator = markerToMessages.entrySet().iterator();
                for (int i = 0; iterator.hasNext() && i < size - 2; ++i) {
                    markerAndCommands = iterator.next();
                    String checkedMarker = markerAndCommands.getKey();
                    MarkerState markerState = this.askMarkerState(checkedMarker);
                    List<String> commands = markerAndCommands.getValue();
                    if (markerState != null && markerState.getCount() != null) {
                        if (markerState.getCount() > commands.size()) {
                            log.warn("Server received more ({}) commands then client sent ({}), marker: {}", new Object[]{markerState.getCount(), commands.size(), checkedMarker});
                        } else if (markerState.getCount() < commands.size()) {
                            log.error("Server received less ({}) commands then client sent ({}), marker: {}", new Object[]{markerState.getCount(), commands.size(), checkedMarker});
                            this.saved.addAll(commands);
                        } else {
                            log.debug("Server received same command count ({}) that client sent, marker: {}", (Object)commands.size(), (Object)checkedMarker);
                        }
                        iterator.remove();
                        continue;
                    }
                    log.error("Could not get command count for marker {}", this.marker);
                    this.saved.addAll(commands);
                    iterator.remove();
                }
                if (this.saved.isEmpty()) {
                    return true;
                }
                iterator = markerToMessages.entrySet().iterator();
                while (iterator.hasNext()) {
                    markerAndCommands = iterator.next();
                    List<String> commands = markerAndCommands.getValue();
                    this.saved.addAll(commands);
                    iterator.remove();
                }
                log.error("Save {} commands, broken sender will be closed", (Object)this.saved.size());
                sender.close();
                return false;
            }
            log.warn("Sender is null");
            return false;
        }
        catch (Throwable e) {
            log.warn("Ping error: ", e);
            return false;
        }
    }

    private MarkerState askMarkerState(String marker) {
        MarkerState markerState = null;
        try {
            QueryPart markersPath = new Query("command").path("marker");
            QueryPart query = markersPath.path(marker);
            markerState = this.httpClientManager.requestData(MarkerState.class, query, null);
            log.debug("From server {} received the following state of marker: {}", (Object)this.httpClientManager.getClientConfiguration().getDataUrl(), (Object)markerState);
        }
        catch (Throwable e) {
            log.error("Error while checking marker count: ", e);
        }
        return markerState;
    }

    private void compareAndSendNewMarker(String current) {
        if (this.httpClientManager.getClientConfiguration().isSkipStreamingControl()) {
            return;
        }
        MarkerCommand markerCommand = new MarkerCommand();
        String newMarker = markerCommand.getMarker();
        if (this.marker.compareAndSet(current, newMarker)) {
            PlainSender sender = this.plainSender.get();
            if (sender == null) {
                throw new IllegalStateException("Sender is null");
            }
            if (!sender.isWorking()) {
                throw new IllegalStateException("Sender is incorrect");
            }
            sender.send(markerCommand);
        } else {
            log.warn("Current marker:{} is already replaced by another marker:", (Object)current, (Object)this.marker.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<String> removeSavedPlainCommands() {
        if (this.saved.isEmpty()) {
            return Collections.emptyList();
        }
        List<String> list = this.saved;
        synchronized (list) {
            ArrayList<String> result = new ArrayList<String>(this.saved);
            this.saved.removeAll(result);
            if (result.size() > 0) {
                log.info("{} commands are removed from saved list", (Object)result.size());
            }
            return result;
        }
    }
}

