/*
 * Decompiled with CFR 0.152.
 */
package com.cloudimpl.outstack.spring.component;

import com.cloudimpl.outstack.common.CloudMessage;
import com.cloudimpl.outstack.common.FluxMap;
import com.cloudimpl.outstack.common.RetryUtil;
import com.cloudimpl.outstack.core.CloudService;
import com.cloudimpl.outstack.runtime.repo.RepoStreamingReq;
import com.cloudimpl.outstack.runtime.repo.StreamEvent;
import com.cloudimpl.outstack.spring.component.Cluster;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.retry.Retry;

public class StreamClient {
    private static final Logger log = LoggerFactory.getLogger(StreamClient.class);
    private final Cluster cluster;

    public StreamClient(Cluster cluster) {
        this.cluster = cluster;
    }

    public Flux<StreamEvent> subscribeToMicroService(String name, String domainOwner, String domainContext, RepoStreamingReq req) {
        return this.cluster.getServiceRegistry().flux("StreamClient:" + name).filter(e -> e.getType() == FluxMap.Event.Type.ADD).filter(e -> ((CloudService)e.getValue()).name().equals(domainOwner + "/" + domainContext + "/v1/PolicyService")).map(e -> (CloudService)e.getValue()).flatMap(service -> Mono.defer(() -> Mono.just((Object)service)).doOnNext(srv -> log.info("initializing service {}-> {}/{} subscription", new Object[]{name, domainOwner, domainContext})).flatMapMany(srv -> this.cluster.requestStream("RepositoryStreamingService", new CloudMessage((Object)req, service.nodeId())).doOnError(thr -> log.error("stream subscription error for service " + service.name() + " node id : " + service.nodeId(), thr)).retryWhen(RetryUtil.wrap((Retry)Retry.onlyIf(ctx -> this.cluster.getServiceRegistry().findService(srv.id()) != null).exponentialBackoffWithJitter(Duration.ofSeconds(1L), Duration.ofSeconds(60L)))).doOnError(thr -> log.error("service {}:{} not found .terminating the stream", (Object)srv.name(), (Object)srv.id())).doOnTerminate(() -> log.error("service {}:{} not found .closing the stream", (Object)srv.name(), (Object)srv.id())).onErrorResume(thr -> Flux.empty())).publishOn(Schedulers.parallel())).cast(StreamEvent.class);
    }
}

