/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.examples;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.callbacks.Stream;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.datatype.json.JSON;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Consumer;

public final class TimeSeriesQueryExample {
    private static final ObjectMapper CBOR_MAPPER = new ObjectMapper((JsonFactory)new CBORFactory());

    private TimeSeriesQueryExample() {
    }

    public static void subscribeChatView(final Session session, final ChatView chatView, final String chatTopicPath, final Consumer<Throwable> errorHandler) {
        final Topics topics = (Topics)session.feature(Topics.class);
        Topics.ValueStream.Default<TimeSeries.Event<JSON>> subscriptionStream = new Topics.ValueStream.Default<TimeSeries.Event<JSON>>(){
            private volatile boolean initialValue = true;

            public void onValue(String topicPath, TopicSpecification specification, TimeSeries.Event<JSON> oldEvent, TimeSeries.Event<JSON> event) {
                if (this.initialValue && event.sequence() > chatView.expectedNextSequence()) {
                    this.initialValue = false;
                    TimeSeries.Query query = chatView.missingEventQuery((TimeSeries)session.feature(TimeSeries.class), event.sequence());
                    query.selectFrom(chatTopicPath).whenComplete((result, e) -> {
                        if (e != null) {
                            topics.removeStream((Stream)this);
                            errorHandler.accept(e);
                        } else {
                            result.stream().forEach(x$0 -> chatView.addEvent((TimeSeries.Event<JSON>)x$0));
                        }
                    });
                }
                chatView.addEvent((TimeSeries.Event<JSON>)event);
            }

            public void onError(ErrorReason errorReason) {
                errorHandler.accept(new RuntimeException("Subscription stream failed: " + errorReason));
            }
        };
        topics.addTimeSeriesStream(chatTopicPath, JSON.class, (Topics.ValueStream)subscriptionStream);
        topics.subscribe(chatTopicPath).whenComplete((arg_0, arg_1) -> TimeSeriesQueryExample.lambda$subscribeChatView$0(topics, (Topics.ValueStream)subscriptionStream, errorHandler, arg_0, arg_1));
    }

    private static ChatMessage jsonToChat(JSON value) {
        try {
            return (ChatMessage)CBOR_MAPPER.readValue(value.asInputStream(), ChatMessage.class);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to parse event as chat message", e);
        }
    }

    private static /* synthetic */ void lambda$subscribeChatView$0(Topics topics, Topics.ValueStream subscriptionStream, Consumer errorHandler, Object result, Throwable e) {
        if (e != null) {
            topics.removeStream((Stream)subscriptionStream);
            errorHandler.accept(e);
        }
    }

    public static class ChatMessage {
        private final String text;
        private final int priority;
        private final int senderId;

        @JsonCreator
        public ChatMessage(@JsonProperty(value="text") String text, @JsonProperty(value="priority") int priority, @JsonProperty(value="senderId") int senderId) {
            this.text = text;
            this.priority = priority;
            this.senderId = senderId;
        }

        public String getText() {
            return this.text;
        }

        public int getPriority() {
            return this.priority;
        }

        public int getSenderId() {
            return this.senderId;
        }
    }

    public static class ChatView {
        private final Instant startOfView;
        private final SortedMap<Long, TimeSeries.Event<ChatMessage>> messages = new TreeMap<Long, TimeSeries.Event<ChatMessage>>();
        private long latestSequence = -1L;

        public ChatView(Instant startOfView) {
            this.startOfView = startOfView;
        }

        public SortedMap<Long, TimeSeries.Event<ChatMessage>> getMessages() {
            return Collections.unmodifiableSortedMap(this.messages);
        }

        private synchronized void addEvent(TimeSeries.Event<JSON> event) {
            if (event.timestamp() >= this.startOfView.toEpochMilli()) {
                this.messages.put(event.originalEvent().sequence(), (TimeSeries.Event<ChatMessage>)event.withValue((Object)TimeSeriesQueryExample.jsonToChat((JSON)event.value())));
            }
            this.latestSequence = Math.max(this.latestSequence, event.sequence());
        }

        private synchronized long expectedNextSequence() {
            return this.latestSequence + 1L;
        }

        private synchronized TimeSeries.Query<JSON> missingEventQuery(TimeSeries timeSeries, long receivedSequence) {
            return timeSeries.rangeQuery().from(this.startOfView).editRange().from(this.latestSequence + 1L).to(receivedSequence - 1L).as(JSON.class);
        }
    }
}

