/*
 * Decompiled with CFR 0.152.
 */
package io.simplesource.kafka.internal.streams;

import io.simplesource.api.CommandAPI;
import io.simplesource.api.CommandError;
import io.simplesource.data.FutureResult;
import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.AggregateSerdes;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.KafkaCommandAPI;
import io.simplesource.kafka.internal.streams.statestore.AggregateStoreBridge;
import io.simplesource.kafka.internal.streams.statestore.CommandResponseStoreBridge;
import io.simplesource.kafka.internal.streams.topology.EventSourcedTopology;
import io.simplesource.kafka.internal.streams.topology.TopologyContext;
import io.simplesource.kafka.internal.util.NamedThreadFactory;
import io.simplesource.kafka.internal.util.RetryDelay;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.AggregateUpdateResult;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.ValueWithSequence;
import io.simplesource.kafka.spec.AggregateSpec;
import java.time.Duration;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.test.ConsumerRecordFactory;

public final class AggregateTestDriver<K, C, E, A>
implements CommandAPI<K, C> {
    private final TopologyTestDriver driver;
    private final AggregateSpec<K, C, E, A> aggregateSpec;
    private final KafkaConfig kafkaConfig;
    private final AggregateSerdes<K, C, E, A> aggregateSerdes;
    private final TestDriverPublisher publisher;
    private final CommandAPI<K, C> commandAPI;

    public AggregateTestDriver(AggregateSpec<K, C, E, A> aggregateSpec, KafkaConfig kafkaConfig) {
        StreamsBuilder builder = new StreamsBuilder();
        TopologyContext ctx = new TopologyContext(aggregateSpec);
        EventSourcedTopology.addTopology((TopologyContext)ctx, (StreamsBuilder)builder);
        TestDriverStoreBridge storeBridge = new TestDriverStoreBridge();
        ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("QueryAPI-scheduler"));
        RetryDelay retryDelay = (startTime, timeoutMillis, spinCount) -> 15L;
        this.aggregateSpec = aggregateSpec;
        this.kafkaConfig = kafkaConfig;
        this.aggregateSerdes = aggregateSpec.serialization().serdes();
        Properties streamConfig = new Properties();
        streamConfig.putAll((Map<?, ?>)kafkaConfig.streamsConfig());
        this.driver = new TopologyTestDriver(builder.build(), streamConfig, 0L);
        this.publisher = new TestDriverPublisher(this.aggregateSerdes);
        this.commandAPI = new KafkaCommandAPI(aggregateSpec, kafkaConfig, (CommandResponseStoreBridge)storeBridge, null, scheduledExecutor, retryDelay);
    }

    public FutureResult<CommandError, UUID> publishCommand(CommandAPI.Request<K, C> request) {
        this.publisher.publish(this.topicName(AggregateResources.TopicEntity.command_request), request.key(), new CommandRequest(request.key(), request.command(), request.readSequence(), request.commandId()));
        return FutureResult.of((Object)request.commandId());
    }

    public FutureResult<CommandError, NonEmptyList<Sequence>> queryCommandResult(UUID commandId, Duration timeout) {
        return this.commandAPI.queryCommandResult(commandId, timeout);
    }

    Optional<KeyValue<K, AggregateUpdate<A>>> readAggregateTopic() {
        ProducerRecord maybeRecord = this.driver.readOutput(this.topicName(AggregateResources.TopicEntity.aggregate), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.aggregateUpdate().deserializer());
        return Optional.ofNullable(maybeRecord).map(record -> KeyValue.pair((Object)record.key(), (Object)record.value()));
    }

    Optional<KeyValue<K, ValueWithSequence<E>>> readEventTopic() {
        ProducerRecord maybeRecord = this.driver.readOutput(this.topicName(AggregateResources.TopicEntity.event), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.valueWithSequence().deserializer());
        return Optional.ofNullable(maybeRecord).map(record -> KeyValue.pair((Object)record.key(), (Object)record.value()));
    }

    Optional<AggregateUpdateResult<A>> fetchAggregateUpdateResult(UUID key) {
        WindowStore windowStore = this.driver.getWindowStore(this.storeName(AggregateResources.StateStoreEntity.command_response));
        WindowStoreIterator iterator = windowStore.fetch((Object)key, 0L, System.currentTimeMillis());
        Iterable iterable = () -> iterator;
        return StreamSupport.stream(iterable.spliterator(), false).max(Comparator.comparingLong(kv -> (Long)kv.key)).map(kv -> (AggregateUpdateResult)kv.value);
    }

    public void close() {
        if (this.driver != null) {
            this.driver.close();
        }
    }

    private String topicName(AggregateResources.TopicEntity topic) {
        return this.aggregateSpec.serialization().resourceNamingStrategy().topicName(this.aggregateSpec.aggregateName(), topic.name());
    }

    private String storeName(AggregateResources.StateStoreEntity store) {
        return this.aggregateSpec.serialization().resourceNamingStrategy().storeName(this.aggregateSpec.aggregateName(), store.name());
    }

    private class TestDriverStoreBridge
    implements AggregateStoreBridge<K, A>,
    CommandResponseStoreBridge<A> {
        private TestDriverStoreBridge() {
        }

        public ReadOnlyKeyValueStore<K, AggregateUpdate<A>> getAggregateStateStore() {
            return AggregateTestDriver.this.driver.getKeyValueStore(AggregateTestDriver.this.storeName(AggregateResources.StateStoreEntity.aggregate_update));
        }

        public ReadOnlyWindowStore<UUID, AggregateUpdateResult<A>> getCommandResponseStore() {
            return AggregateTestDriver.this.driver.getWindowStore(AggregateTestDriver.this.storeName(AggregateResources.StateStoreEntity.command_response));
        }

        public Optional<HostInfo> hostInfoForAggregateStoreKey(K key) {
            return Optional.of(AggregateTestDriver.this.kafkaConfig.currentHostInfo());
        }

        public Optional<HostInfo> hostInfoForCommandResponseStoreKey(UUID key) {
            return Optional.of(AggregateTestDriver.this.kafkaConfig.currentHostInfo());
        }
    }

    private class TestDriverPublisher {
        private final ConsumerRecordFactory<K, CommandRequest<K, C>> factory;

        TestDriverPublisher(AggregateSerdes<K, C, E, A> aggregateSerdes) {
            this.factory = new ConsumerRecordFactory(aggregateSerdes.aggregateKey().serializer(), aggregateSerdes.commandRequest().serializer());
        }

        private ConsumerRecordFactory<K, CommandRequest<K, C>> recordFactory() {
            return this.factory;
        }

        void publish(String topic, K key, CommandRequest<K, C> value) {
            AggregateTestDriver.this.driver.pipeInput(this.recordFactory().create(topic, key, value));
        }
    }
}

