/*
 * Decompiled with CFR 0.152.
 */
package com.webull.openapi.trade.grpc;

import com.webull.openapi.core.retry.RetriedFailedException;
import com.webull.openapi.core.retry.Retryable;
import com.webull.openapi.core.utils.ExceptionUtils;
import com.webull.openapi.trade.grpc.exception.UserCancelledException;
import com.webull.openapi.trade.grpc.lifecycle.SubStreamObserver;
import com.webull.openapi.trade.grpc.retry.GrpcRetryContext;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class StatefulResponseObserver<ReqT, RespT>
implements ClientResponseObserver<ReqT, RespT> {
    private ClientCallStreamObserver<ReqT> clientCallStreamObserver;
    private final CompletableFuture<Void> observingFuture = new CompletableFuture();
    private Retryable<?> retryable;
    private GrpcRetryContext retryContext;
    private final List<SubStreamObserver<RespT>> subObservers;

    public StatefulResponseObserver(List<SubStreamObserver<RespT>> subObservers) {
        this.subObservers = subObservers;
    }

    public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
        this.clientCallStreamObserver = requestStream;
        this.clientCallStreamObserver.setOnReadyHandler(() -> this.subObservers.forEach(SubStreamObserver::onReady));
    }

    public void onNext(RespT value) {
        this.subObservers.forEach(sub -> sub.onNext(value));
    }

    public void onError(Throwable cause) {
        Throwable rootCause;
        Throwable finalCause = cause;
        this.subObservers.forEach(sub -> sub.onError(finalCause));
        int attempts = this.retryContext != null ? this.retryContext.getRetriesAttempted() + 1 : 1;
        this.retryContext = new GrpcRetryContext(attempts, cause);
        if (!this.observingFuture.isDone() && this.retryable != null) {
            try {
                this.retryable.retry(this.retryContext);
                return;
            }
            catch (RetriedFailedException ex) {
                cause = ex.getCause();
            }
        }
        if ((rootCause = ExceptionUtils.getRootCause(cause)) instanceof UserCancelledException) {
            if (!this.observingFuture.isDone()) {
                this.observingFuture.cancel(false);
            }
        } else {
            this.observingFuture.completeExceptionally(cause);
        }
    }

    public void onCompleted() {
        this.subObservers.forEach(SubStreamObserver::onCompleted);
        this.observingFuture.complete(null);
    }

    public void setRetryable(Retryable<?> retryable) {
        this.retryable = retryable;
    }

    public CompletableFuture<Void> observingFuture() {
        return this.observingFuture;
    }

    public void cancel() {
        if (this.clientCallStreamObserver != null) {
            this.clientCallStreamObserver.cancel(null, (Throwable)new UserCancelledException());
        }
        if (!this.observingFuture.isDone()) {
            this.observingFuture.cancel(false);
        }
    }
}

