/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.examples;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.features.UpdateStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TopicUpdateExample {
    private static final Logger LOG = LoggerFactory.getLogger(TopicUpdateExample.class);

    private TopicUpdateExample() {
    }

    public static void statelessSet(Session session) {
        ((TopicUpdate)session.feature(TopicUpdate.class)).set("a/path", Long.class, (Object)6L).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void statelessSetWithConstraint(Session session) {
        TopicUpdate update = (TopicUpdate)session.feature(TopicUpdate.class);
        UpdateConstraint.Factory constraints = Diffusion.updateConstraints();
        ((CompletableFuture)((CompletableFuture)session.lock("a/lock").thenApply(lock -> constraints.value((Object)5L).and(constraints.locked(lock)))).thenCompose(constraint -> update.set("a/path", Long.class, (Object)6L, constraint))).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void statelessSetRepeatedly(Session session) {
        TopicUpdate update = (TopicUpdate)session.feature(TopicUpdate.class);
        UpdateConstraint.Factory constraints = Diffusion.updateConstraints();
        ((CompletableFuture)((CompletableFuture)update.set("a/path", String.class, (Object)"hello").thenCompose(x -> update.set("a/path", String.class, null))).thenCompose(x -> update.set("a/path", String.class, (Object)"who are you?", constraints.noValue()))).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void statelessSetPeriodically(Session session) {
        TopicUpdate updater = (TopicUpdate)session.feature(TopicUpdate.class);
        Random random = new Random();
        TopicUpdateExample.runPeriodicallyUntilFirstFailure(Executors.newSingleThreadScheduledExecutor(), () -> updater.set("random/long", Long.class, (Object)random.nextLong()), 5L, TimeUnit.SECONDS).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void addAndSetTopicWithStateless(Session session) {
        TopicUpdate updater = (TopicUpdate)session.feature(TopicUpdate.class);
        updater.addAndSet("a/path", Diffusion.newTopicSpecification((TopicType)TopicType.STRING), String.class, (Object)"hello").thenAccept(result -> {
            if (result == TopicCreationResult.CREATED) {
                LOG.info("A new topic was created");
            } else {
                LOG.info("An existing topic was updated");
            }
        });
    }

    public static void streamSet(Session session) {
        ((TopicUpdate)session.feature(TopicUpdate.class)).createUpdateStream("a/path", Long.class).set((Object)6L).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void streamSetWithConstraint(Session session) {
        TopicUpdate update = (TopicUpdate)session.feature(TopicUpdate.class);
        UpdateConstraint.Factory constraints = Diffusion.updateConstraints();
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)session.lock("a/lock").thenApply(lock -> constraints.value((Object)5L).and(constraints.locked(lock)))).thenApply(constraint -> update.createUpdateStream("a/path", Long.class, constraint))).thenCompose(updateStream -> updateStream.set((Object)6L))).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void streamSetRepeatedly(Session session) {
        UpdateConstraint.Factory constraints = Diffusion.updateConstraints();
        UpdateStream updateStream = ((TopicUpdate)session.feature(TopicUpdate.class)).createUpdateStream("a/path", String.class, constraints.noValue());
        ((CompletableFuture)((CompletableFuture)updateStream.set((Object)"hello").thenCompose(x -> updateStream.set(null))).thenCompose(x -> updateStream.set((Object)"who are you?"))).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void streamSetRepeatedlyWithoutWaiting(Session session) {
        UpdateConstraint.Factory constraints = Diffusion.updateConstraints();
        UpdateStream updateStream = ((TopicUpdate)session.feature(TopicUpdate.class)).createUpdateStream("a/path", String.class, constraints.noValue());
        updateStream.set((Object)"hello");
        updateStream.set(null);
        updateStream.set((Object)"who are you?");
    }

    public static void streamSetPeriodically(Session session) {
        UpdateStream updateStream = ((TopicUpdate)session.feature(TopicUpdate.class)).createUpdateStream("random/long", Long.class);
        CompletableFuture validation = updateStream.validate();
        validation.whenComplete((result, ex) -> {
            if (ex != null) {
                LOG.warn("Failed to validate stream", ex);
            }
        });
        Random random = new Random();
        ((CompletableFuture)validation.thenCompose(result -> TopicUpdateExample.runPeriodicallyUntilFirstFailure(Executors.newSingleThreadScheduledExecutor(), () -> updateStream.set((Object)random.nextLong()), 5L, TimeUnit.SECONDS))).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void createTopicWithStream(Session session) {
        UpdateStream updateStream = ((TopicUpdate)session.feature(TopicUpdate.class)).createUpdateStream("a/path", Diffusion.newTopicSpecification((TopicType)TopicType.STRING), String.class);
        updateStream.validate().thenAccept(result -> {
            if (result == TopicCreationResult.CREATED) {
                LOG.info("The topic was created");
            } else {
                LOG.info("The topic already exist");
            }
        });
    }

    public static void addAndSetTopicWithStream(Session session) {
        UpdateStream updateStream = ((TopicUpdate)session.feature(TopicUpdate.class)).createUpdateStream("a/path", Diffusion.newTopicSpecification((TopicType)TopicType.STRING), String.class);
        updateStream.set((Object)"hello").thenAccept(result -> {
            if (result == TopicCreationResult.CREATED) {
                LOG.info("A new topic was created");
            } else {
                LOG.info("An existing topic was updated");
            }
        });
    }

    private static CompletableFuture<?> runPeriodicallyUntilFirstFailure(ScheduledExecutorService executor, Supplier<CompletableFuture<?>> task, long period, TimeUnit unit) {
        CompletableFuture taskHandle = new CompletableFuture();
        TopicUpdateExample.scheduleNextUpdate(executor, task, period, unit, taskHandle);
        return taskHandle;
    }

    private static void scheduleNextUpdate(ScheduledExecutorService executor, Supplier<CompletableFuture<?>> task, long period, TimeUnit unit, CompletableFuture<?> taskHandle) {
        executor.schedule(() -> {
            if (taskHandle.isDone()) {
                return;
            }
            ((CompletableFuture)task.get()).whenComplete((arg_0, arg_1) -> TopicUpdateExample.lambda$null$16(taskHandle, executor, (Supplier)task, period, unit, arg_0, arg_1));
        }, period, unit);
    }

    private static <T> void updateHandler(T result, Throwable ex) {
        if (ex != null) {
            LOG.error("Update failed", ex);
        }
    }

    private static /* synthetic */ void lambda$null$16(CompletableFuture taskHandle, ScheduledExecutorService executor, Supplier task, long period, TimeUnit unit, Object result, Throwable ex) {
        if (ex != null) {
            taskHandle.completeExceptionally(ex);
        } else if (!taskHandle.isDone()) {
            TopicUpdateExample.scheduleNextUpdate(executor, task, period, unit, taskHandle);
        }
    }
}

