/*
 * Decompiled with CFR 0.152.
 */
package com.cloudimpl.outstack.runtime;

import com.cloudimpl.outstack.runtime.util.Util;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.retry.Retry;
import reactor.retry.RetryContext;

public class ReactiveRemoteResourceCache {
    private final Cache<String, CacheItem> map;
    private final BiFunction<String, Object, Flux> requestStream;
    private final String serviceName;
    public static FluxSink emitter;

    public ReactiveRemoteResourceCache(int maxSize, Duration evictionDuration, String serviceName, BiFunction<String, Object, Flux> requestStream) {
        this.map = Caffeine.newBuilder().maximumSize((long)maxSize).expireAfterAccess(evictionDuration).evictionListener(this::onRemove).build();
        this.requestStream = requestStream;
        this.serviceName = serviceName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> Mono<T> request(String key, Object req, Predicate<? super RetryContext<T>> retryOn) {
        CacheItem item = (CacheItem)this.map.get((Object)key, k -> new CacheItem());
        if (item.item != null) {
            return Mono.just(item.item);
        }
        CacheItem cacheItem = item;
        synchronized (cacheItem) {
            if (item.item != null) {
                return Mono.just(item.item);
            }
            CompletableFuture future = new CompletableFuture();
            item.addFuture(future);
            if (item.hnd == null) {
                System.out.println("subscription started");
                item.hnd = Flux.defer(() -> (Publisher)this.requestStream.apply(this.serviceName, req)).doOnError(err -> ((Throwable)err).printStackTrace()).retryWhen(Util.wrap(Retry.onlyIf(retryOn).exponentialBackoffWithJitter(Duration.ofSeconds(1L), Duration.ofSeconds(20L)))).publishOn(Schedulers.parallel()).doOnNext(s -> {
                    System.out.println("out: " + s);
                    item.item = s;
                    item.completeFutures(s);
                }).doOnTerminate(() -> item.completeFuturesExceptionally(new RuntimeException("connection closed"))).doOnCancel(() -> item.completeFuturesExceptionally(new RuntimeException("connection cancelled"))).subscribe();
            }
            return Mono.fromFuture(future);
        }
    }

    private void onRemove(String key, CacheItem item, RemovalCause cause) {
        System.out.println("evicted:" + cause);
        item.close();
    }

    public static void main(String[] args) throws InterruptedException {
        ReactiveRemoteResourceCache cache = new ReactiveRemoteResourceCache(10000, Duration.ofSeconds(5L), "test", (s, o) -> Flux.interval((Duration)Duration.ofSeconds(1L)).doOnNext(i -> {
            if (i > 0L && i % 10L == 0L) {
                throw new RuntimeException("error");
            }
        }));
        int i = 0;
        while (true) {
            cache.request("1", "asf", c -> true).subscribe(k -> System.out.println("k:" + k));
            if (i == 0) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ex) {
                    Logger.getLogger(ReactiveRemoteResourceCache.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            if (++i <= 10) continue;
            i = 0;
        }
    }

    public static final class CacheItem<T> {
        private volatile T item;
        private Disposable hnd = null;
        private final Queue<CompletableFuture> futures = new ConcurrentLinkedQueue<CompletableFuture>();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            CacheItem cacheItem = this;
            synchronized (cacheItem) {
                this.hnd.dispose();
                this.hnd = null;
                this.item = null;
                this.completeFuturesExceptionally(new RuntimeException("item closed"));
            }
        }

        public void addFuture(CompletableFuture future) {
            this.futures.add(future);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void completeFutures(T item) {
            CacheItem cacheItem = this;
            synchronized (cacheItem) {
                this.item = item;
                while (!this.futures.isEmpty()) {
                    CompletableFuture future = this.futures.poll();
                    future.complete(this.item);
                }
                this.item = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void completeFuturesExceptionally(Throwable thr) {
            CacheItem cacheItem = this;
            synchronized (cacheItem) {
                while (!this.futures.isEmpty()) {
                    CompletableFuture future = this.futures.poll();
                    future.completeExceptionally(thr);
                }
                this.item = null;
            }
        }
    }
}

