/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.examples.runnable;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.ClusterRoutingException;
import com.pushtechnology.diffusion.client.features.IncompatibleTopicException;
import com.pushtechnology.diffusion.client.features.InvalidUpdateStreamException;
import com.pushtechnology.diffusion.client.features.NoSuchTopicException;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.UnsatisfiedConstraintException;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.features.UpdateStream;
import com.pushtechnology.diffusion.client.session.PermissionsException;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionClosedException;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.examples.runnable.AbstractClient;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CoordinatingSessionExample
extends AbstractClient {
    private static final Logger LOG = LoggerFactory.getLogger(CoordinatingSessionExample.class);
    private static final long PERIOD = 5000L;
    private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    private final Random random = new Random();

    public CoordinatingSessionExample(String url, String principal) {
        super(url, principal);
    }

    @Override
    public void onConnected(Session session) {
        this.updatePath(session, "topic");
    }

    private void updatePath(Session session, String path) {
        session.lock(path, Session.SessionLockScope.UNLOCK_ON_CONNECTION_LOSS).thenAccept(lock -> this.onLockAcquired(session, path, (Session.SessionLock)lock));
    }

    private void onLockAcquired(Session session, String path, Session.SessionLock lock) {
        TopicUpdate topicUpdate = (TopicUpdate)session.feature(TopicUpdate.class);
        UpdateConstraint locked = Diffusion.updateConstraints().locked(lock);
        topicUpdate.addAndSet(path + "/last_updater", Diffusion.newTopicSpecification((TopicType)TopicType.STRING), String.class, (Object)session.getSessionId().toString(), locked);
        TopicSpecification specification = Diffusion.newTopicSpecification((TopicType)TopicType.INT64);
        Supplier<UpdateStream> updateStreamFactory = () -> topicUpdate.createUpdateStream(path, specification, Long.class, locked);
        UpdateStream updateStream = updateStreamFactory.get();
        updateStream.validate().whenComplete((result, ex) -> {
            if (ex != null) {
                LOG.warn("Unable to initialise first value stream. Unable to begin updating path.", ex);
                this.stop();
            } else {
                UpdateTask updateTask = new UpdateTask(updateStreamFactory, updateStream, this.random::nextLong);
                updateTask.scheduleUpdate();
            }
        });
    }

    private final class UpdateTask {
        private final Supplier<UpdateStream<Long>> updateStreamFactory;
        private final Supplier<Long> valueSupplier;
        private volatile UpdateStream<Long> updateStream;

        private UpdateTask(Supplier<UpdateStream<Long>> updateStreamFactory, UpdateStream<Long> updateStream, Supplier<Long> valueSupplier) {
            this.updateStreamFactory = updateStreamFactory;
            this.updateStream = updateStream;
            this.valueSupplier = valueSupplier;
        }

        private void scheduleUpdate() {
            CoordinatingSessionExample.this.executor.schedule(this::performUpdate, 5000L, UNIT);
        }

        private void performUpdate() {
            this.performUpdate(this.valueSupplier.get());
        }

        private void performUpdate(long value) {
            this.updateStream.set((Object)value).whenComplete((result, ex) -> {
                if (ex != null) {
                    this.handleUpdateFailure(value, (Throwable)ex);
                } else {
                    this.scheduleUpdate();
                }
            });
        }

        private void handleUpdateFailure(long value, Throwable ex) {
            Throwable cause = ex.getCause();
            if (cause instanceof ClusterRoutingException) {
                this.updateStream = this.updateStreamFactory.get();
                this.performUpdate(value);
            } else if (cause instanceof UnsatisfiedConstraintException) {
                LOG.warn("Another session has gained the responsibility for updating the topic");
                CoordinatingSessionExample.this.stop();
            } else if (cause instanceof NoSuchTopicException) {
                LOG.warn("The topic has been deleted");
                CoordinatingSessionExample.this.stop();
            } else if (cause instanceof InvalidUpdateStreamException) {
                LOG.warn("The update stream is no longer valid");
                CoordinatingSessionExample.this.stop();
            } else if (cause instanceof IncompatibleTopicException) {
                LOG.warn("The topic is not compatible");
                CoordinatingSessionExample.this.stop();
            } else if (cause instanceof PermissionsException) {
                LOG.warn("The session doesn't have permission to update the path");
                CoordinatingSessionExample.this.stop();
            } else if (cause instanceof SessionClosedException) {
                LOG.warn("The session has closed");
            }
        }
    }
}

