/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.examples;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.features.UpdateStream;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ControlClientAsExclusiveUpdater {
    private static final String TOPIC_NAME = "Feeder";
    private final Session session;

    public ControlClientAsExclusiveUpdater(String serverUrl) {
        this.session = Diffusion.sessions().principal("control").password("password").open(serverUrl);
    }

    public void start(PriceProvider provider, ScheduledExecutorService scheduler) throws InterruptedException, ExecutionException, TimeoutException {
        TopicControl topicControl = (TopicControl)this.session.feature(TopicControl.class);
        TopicSpecification specification = Diffusion.newTopicSpecification((TopicType)TopicType.STRING).withProperty("REMOVAL", "when this session closes");
        topicControl.addTopic(TOPIC_NAME, specification).get(5L, TimeUnit.SECONDS);
        TopicUpdate topicUpdate = (TopicUpdate)this.session.feature(TopicUpdate.class);
        this.session.lock(TOPIC_NAME).thenAccept(lock -> ControlClientAsExclusiveUpdater.onLockAcquired(lock, provider, scheduler, topicUpdate));
    }

    private static void onLockAcquired(Session.SessionLock lock, PriceProvider provider, ScheduledExecutorService scheduler, TopicUpdate topicUpdate) {
        UpdateConstraint exclusiveAccessConstraint = Diffusion.updateConstraints().locked(lock);
        UpdateStream updateStream = topicUpdate.createUpdateStream(TOPIC_NAME, String.class, exclusiveAccessConstraint);
        CompletableFuture failureHandler = new CompletableFuture();
        ScheduledFuture<?> theFeeder = scheduler.scheduleAtFixedRate(() -> updateStream.set((Object)provider.getPrice()).whenComplete((result, ex) -> {
            if (ex != null) {
                failureHandler.completeExceptionally((Throwable)ex);
            }
        }), 1L, 1L, TimeUnit.SECONDS);
        failureHandler.whenComplete((result, ex) -> {
            theFeeder.cancel(false);
            lock.unlock();
        });
    }

    public void close() {
        this.session.close();
    }

    public static interface PriceProvider {
        public String getPrice();
    }
}

