/*
 * Decompiled with CFR 0.152.
 */
package com.stackone.stackone_client_java.utils.pagination;

import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ReadContext;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import com.jayway.jsonpath.spi.mapper.MappingProvider;
import com.stackone.stackone_client_java.utils.Blob;
import com.stackone.stackone_client_java.utils.ResponseWithBody;
import com.stackone.stackone_client_java.utils.SpeakeasyLogger;
import com.stackone.stackone_client_java.utils.pagination.ProgressTrackerStrategy;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;

public class AsyncPaginator<ReqT, ProgressParamT>
implements Flow.Publisher<HttpResponse<Blob>> {
    private static final SpeakeasyLogger logger = SpeakeasyLogger.getLogger(AsyncPaginator.class);
    private final ReqT initialRequest;
    private final ProgressTrackerStrategy<ProgressParamT> progressTracker;
    private final BiFunction<ReqT, ProgressParamT, ReqT> requestModifier;
    private final Function<ReqT, CompletableFuture<HttpResponse<Blob>>> asyncDataFetcher;
    private static final Configuration JSON_PATH_CONFIG = Configuration.defaultConfiguration().jsonProvider((JsonProvider)new JacksonJsonProvider()).mappingProvider((MappingProvider)new JacksonMappingProvider()).addOptions(new Option[]{Option.SUPPRESS_EXCEPTIONS});

    public AsyncPaginator(ReqT initialRequest, ProgressTrackerStrategy<ProgressParamT> progressTracker, BiFunction<ReqT, ProgressParamT, ReqT> requestModifier, Function<ReqT, CompletableFuture<HttpResponse<Blob>>> asyncDataFetcher) {
        this.initialRequest = initialRequest;
        this.progressTracker = progressTracker;
        this.requestModifier = requestModifier;
        this.asyncDataFetcher = asyncDataFetcher;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super HttpResponse<Blob>> subscriber) {
        subscriber.onSubscribe(new AsyncPaginationSubscription(subscriber));
    }

    private static enum PaginationState {
        INITIAL,
        HAS_MORE_PAGES,
        EXHAUSTED;

    }

    private class AsyncPaginationSubscription
    implements Flow.Subscription {
        private final Flow.Subscriber<? super HttpResponse<Blob>> subscriber;
        private final AtomicReference<PaginationState> state = new AtomicReference<PaginationState>(PaginationState.INITIAL);
        private final AtomicBoolean cancelled = new AtomicBoolean(false);
        private final AtomicLong demand = new AtomicLong(0L);

        public AsyncPaginationSubscription(Flow.Subscriber<? super HttpResponse<Blob>> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            if (n <= 0L) {
                this.subscriber.onError(new IllegalArgumentException("Request count must be positive"));
                return;
            }
            long currentDemand = this.demand.addAndGet(n);
            if (currentDemand < 0L) {
                this.demand.set(Long.MAX_VALUE);
            }
            this.fetchNextIfNeeded();
        }

        @Override
        public void cancel() {
            this.cancelled.set(true);
        }

        private void fetchNextIfNeeded() {
            if (this.cancelled.get()) {
                return;
            }
            if (this.demand.get() <= 0L || this.state.get() == PaginationState.EXHAUSTED) {
                return;
            }
            ((CompletableFuture)this.fetchNextPage().thenAccept(this::handleResponse)).exceptionally(this::handleError);
        }

        private CompletableFuture<HttpResponse<Blob>> fetchNextPage() {
            PaginationState currentState = this.state.get();
            Object currentValue = AsyncPaginator.this.progressTracker.getPosition();
            if (currentState == PaginationState.INITIAL) {
                logger.debug("Async fetching initial page");
            } else {
                logger.debug("Async fetching next page with position: {}", currentValue);
            }
            Object request = currentState == PaginationState.INITIAL ? AsyncPaginator.this.initialRequest : AsyncPaginator.this.requestModifier.apply(AsyncPaginator.this.initialRequest, currentValue);
            return AsyncPaginator.this.asyncDataFetcher.apply(request);
        }

        private void handleResponse(HttpResponse<Blob> response) {
            if (this.cancelled.get()) {
                return;
            }
            try {
                ((CompletableFuture)response.body().toByteArray().thenAccept(data -> {
                    try {
                        DocumentContext respJson = JsonPath.using((Configuration)JSON_PATH_CONFIG).parseUtf8(data);
                        boolean hasMorePages = AsyncPaginator.this.progressTracker.advance((ReadContext)respJson);
                        this.state.set(hasMorePages ? PaginationState.HAS_MORE_PAGES : PaginationState.EXHAUSTED);
                        if (logger.isTraceEnabled()) {
                            logger.trace("Async page fetched - status: {}, hasMorePages: {}", (Object)response.statusCode(), (Object)hasMorePages);
                        }
                        Blob newBody = Blob.from(data);
                        ResponseWithBody newResponse = new ResponseWithBody(response, body -> newBody);
                        this.demand.decrementAndGet();
                        this.subscriber.onNext(newResponse);
                        if (this.state.get() == PaginationState.EXHAUSTED) {
                            logger.debug("Async pagination exhausted");
                            this.subscriber.onComplete();
                        } else {
                            this.fetchNextIfNeeded();
                        }
                    }
                    catch (Exception e) {
                        this.handleError(e);
                    }
                })).exceptionally(this::handleError);
            }
            catch (Exception e) {
                this.handleError(e);
            }
        }

        private Void handleError(Throwable error) {
            if (!this.cancelled.get()) {
                logger.debug("Async pagination error: {}", (Object)error.getMessage());
                this.subscriber.onError(error);
            }
            return null;
        }
    }
}

