/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.examples.runnable;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.ClusterRoutingException;
import com.pushtechnology.diffusion.client.features.IncompatibleTopicException;
import com.pushtechnology.diffusion.client.features.IncompatibleTopicStateException;
import com.pushtechnology.diffusion.client.features.NoSuchTopicException;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.UnsatisfiedConstraintException;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.session.PermissionsException;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionClosedException;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.examples.runnable.AbstractClient;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CompetitiveIncrement
extends AbstractClient {
    private static final Logger LOG = LoggerFactory.getLogger(CompetitiveIncrement.class);
    private static final UpdateConstraint.Factory CONSTRAINTS = Diffusion.updateConstraints();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    private volatile Future<?> updateTask;
    private volatile Long value;

    public CompetitiveIncrement(String url, String principal) {
        super(url, principal);
    }

    @Override
    public void onStarted(Session session) {
        this.subscribeToPath(session);
    }

    private void subscribeToPath(Session session) {
        Topics topics = (Topics)session.feature(Topics.class);
        topics.addStream("long/increment", Long.class, (Topics.ValueStream)new Topics.ValueStream.Default<Long>(){

            public void onValue(String topicPath, TopicSpecification specification, Long oldValue, Long newValue) {
                CompetitiveIncrement.this.value = newValue;
            }
        });
        topics.subscribe("long/increment").whenComplete((result, ex) -> {
            if (ex != null) {
                LOG.warn("Subscription failed", ex);
                this.stop();
            } else {
                this.initialiseTopic(topics);
            }
        });
    }

    private void initialiseTopic(Topics topics) {
        topics.addAndSet("long/increment", Diffusion.newTopicSpecification((TopicType)TopicType.INT64), Long.class, (Object)0L, CONSTRAINTS.noTopic()).whenComplete((result, ex) -> {
            if (ex != null) {
                Throwable cause = ex.getCause();
                if (cause instanceof UnsatisfiedConstraintException) {
                    LOG.info("Topic exists");
                    this.scheduleIncrement(topics);
                } else if (cause instanceof IncompatibleTopicException) {
                    LOG.warn("An existing topic is not compatible");
                    this.stop();
                } else if (cause instanceof IncompatibleTopicStateException) {
                    LOG.warn("An existing topic is managed by a different component");
                    this.stop();
                } else if (cause instanceof TopicControl.TopicLicenseLimitException) {
                    LOG.warn("License limit reached", ex);
                    this.stop();
                } else {
                    LOG.warn("Topic creation failed", ex);
                    this.stop();
                }
            } else if (result == TopicCreationResult.CREATED) {
                LOG.info("Topic created");
                this.scheduleIncrement(topics);
            }
        });
    }

    private void scheduleIncrement(Topics topics) {
        this.updateTask = this.executor.schedule(() -> this.performIncrement(topics), 5L, TimeUnit.SECONDS);
    }

    private void performIncrement(Topics topics) {
        Long currentValue = this.value;
        topics.set("long/increment", Long.class, (Object)(currentValue + 1L), CONSTRAINTS.value(UpdateConstraint.Operator.IS, (Object)currentValue)).whenComplete((result, ex) -> {
            if (ex != null) {
                this.handleIncrementFailure(topics, (Throwable)ex);
            } else {
                LOG.info("Topic incremented {} -> {}", (Object)currentValue, (Object)(currentValue + 1L));
                this.scheduleIncrement(topics);
            }
        });
    }

    private void handleIncrementFailure(Topics topics, Throwable ex) {
        Throwable cause = ex.getCause();
        if (cause instanceof ClusterRoutingException || cause instanceof UnsatisfiedConstraintException) {
            this.performIncrement(topics);
        } else if (cause instanceof NoSuchTopicException) {
            LOG.warn("The topic has been deleted");
            this.stop();
        } else if (cause instanceof IncompatibleTopicException) {
            LOG.warn("The topic is not compatible");
            this.stop();
        } else if (cause instanceof IncompatibleTopicStateException) {
            LOG.warn("The topic is managed by a different component");
            this.stop();
        } else if (cause instanceof PermissionsException) {
            LOG.warn("The session does't have permission to update the path");
            this.stop();
        } else if (cause instanceof SessionClosedException) {
            LOG.warn("The session has closed");
            this.stop();
        }
    }

    @Override
    public void onDisconnected() {
        Future<?> task = this.updateTask;
        if (task != null) {
            task.cancel(false);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CompetitiveIncrement client = new CompetitiveIncrement("ws://diffusion.example.com:80", "auth");
        client.start("auth_secret");
        client.waitForStopped();
    }
}

