/*
 * Decompiled with CFR 0.152.
 */
package com.cloudimpl.outstack.workflow.component;

import com.cloudimpl.outstack.common.GsonCodec;
import com.cloudimpl.outstack.common.RetryUtil;
import com.cloudimpl.outstack.runtime.ResultSet;
import com.cloudimpl.outstack.runtime.domainspec.Command;
import com.cloudimpl.outstack.runtime.domainspec.Query;
import com.cloudimpl.outstack.runtime.domainspec.QueryByIdRequest;
import com.cloudimpl.outstack.runtime.domainspec.RootEntity;
import com.cloudimpl.outstack.spring.component.Cluster;
import com.cloudimpl.outstack.spring.service.iam.TenantProvider;
import com.cloudimpl.outstack.workflow.AbstractWork;
import com.cloudimpl.outstack.workflow.Work;
import com.cloudimpl.outstack.workflow.WorkContext;
import com.cloudimpl.outstack.workflow.WorkStatus;
import com.cloudimpl.outstack.workflow.Workflow;
import com.cloudimpl.outstack.workflow.WorkflowEngine;
import com.cloudimpl.outstack.workflow.WorkflowException;
import com.cloudimpl.outstack.workflow.component.CompleteWorkflow;
import com.cloudimpl.outstack.workflow.component.CreateWorkflow;
import com.cloudimpl.outstack.workflow.component.UpdateWorkflow;
import com.cloudimpl.outstack.workflow.domain.WorkflowCompleteRequest;
import com.cloudimpl.outstack.workflow.domain.WorkflowCreateRequest;
import com.cloudimpl.outstack.workflow.domain.WorkflowEntity;
import com.cloudimpl.outstack.workflow.domain.WorkflowUpdateRequest;
import com.google.gson.JsonObject;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.function.Function4;
import reactor.retry.Retry;

@Component
public class WorkflowRepository {
    private static final Logger log = LoggerFactory.getLogger(WorkflowRepository.class);
    @Autowired
    private Cluster cluster;
    @Autowired
    private TenantProvider tenantProvider;
    @Value(value="${outstack.domainOwner}")
    private String domainOwner;
    @Value(value="${outstack.domainContext}")
    private String domainContext;
    private final Map<String, Workflow> workFlows = new ConcurrentHashMap<String, Workflow>();
    private final Map<String, WorkflowEngine> engines = new ConcurrentHashMap<String, WorkflowEngine>();
    private final Map<String, WorkflowEntity> entityCache = new ConcurrentHashMap<String, WorkflowEntity>();

    @PostConstruct
    private void init() {
        log.info("initializing WorkflowRepository");
        this.tenantProvider.subscribeToTenants("WorkflowRepository").flatMap(tenant -> this.loadTenantWorkflows((String)tenant)).subscribe();
    }

    public Mono<String> startWorkFlow(String tenantId, Workflow workflow) {
        JsonObject json = workflow.toJson();
        return this.cluster.requestReply(null, this.domainOwner + "/" + this.domainContext + "/" + RootEntity.getVersion(WorkflowEntity.class) + "/WorkflowService", (Object)WorkflowCreateRequest.builder().withContent(json.toString()).withCommandName(CreateWorkflow.class.getSimpleName()).withTenantId(tenantId).withVersion("v1").build()).cast(WorkflowEntity.class).doOnNext(s -> this.entityCache.put(s.entityId(), (WorkflowEntity)((Object)s))).doOnNext(s -> this.workFlows.put(s.entityId(), AbstractWork.fromJson(json).asWorkflow())).doOnNext(s -> this.init(s.entityId(), tenantId)).map(s -> s.entityId());
    }

    private void init(String id, String tenantId) {
        Workflow workflow = this.workFlows.remove(id);
        if (workflow == null) {
            throw new WorkflowException("workflow {0} not found", id);
        }
        WorkflowEngine engine = new WorkflowEngine(id, tenantId, (Function4<String, String, String, WorkStatus, Mono<WorkStatus>>)((Function4)this::updateState), Cluster.autoWireInstance(), this::loadWorkStatus);
        WorkflowEngine old = this.engines.putIfAbsent(id, engine);
        if (old != null) {
            log.warn("workflow {} already exist , ignored", (Object)id);
            return;
        }
        log.info("workflow {} started", (Object)id);
        engine.execute(workflow).doOnTerminate(() -> this.removeEngine(id, false)).doOnCancel(() -> this.removeEngine(id, true)).doOnError(e -> log.error("workflow error for id :" + id, e)).subscribe();
    }

    private Mono loadTenantWorkflows(String tenantId) {
        log.info("loading workflow for tenantId {}", (Object)tenantId);
        return this.cluster.requestReply(null, this.domainOwner + "/" + this.domainContext + "/" + RootEntity.getVersion(WorkflowEntity.class) + "/WorkflowQueryService", (Object)QueryByIdRequest.builder().withQueryName("ListWorkflowEntity").withPagingReq(Query.PagingRequest.EMPTY).withTenantId(tenantId).withVersion("v1").build()).retryWhen(RetryUtil.wrap((Retry)Retry.any().exponentialBackoffWithJitter(Duration.ofSeconds(3L), Duration.ofSeconds(60L)))).cast(ResultSet.class).flatMapIterable(rs -> rs.getItems(WorkflowEntity.class)).filter(e -> ((WorkflowEntity)((Object)((Object)WorkflowEntity.class.cast(e)))).getStatus() == Work.Status.PENDING).doOnNext(e -> log.info("loading workflow {}", (Object)((WorkflowEntity)((Object)((Object)WorkflowEntity.class.cast(e)))).entityId())).doOnNext(s -> this.entityCache.put(((WorkflowEntity)((Object)((Object)WorkflowEntity.class.cast(s)))).entityId(), (WorkflowEntity)((Object)((Object)WorkflowEntity.class.cast(s))))).doOnNext(s -> this.autoStartWorkFlow((WorkflowEntity)((Object)((Object)WorkflowEntity.class.cast(s))))).doOnError(err -> log.error("Error loading workflows", err)).then();
    }

    private void autoStartWorkFlow(WorkflowEntity e) {
        Workflow workFlow = AbstractWork.fromJson(GsonCodec.toJsonObject((String)e.getContent())).asWorkflow();
        this.workFlows.put(e.entityId(), workFlow);
        this.init(e.entityId(), e.getTenantId());
    }

    public <T> Mono<T> executeAsync(String id, String name, Function<WorkContext, Mono<Object>> handler) {
        WorkflowEngine engine = this.engines.get(id);
        if (engine == null) {
            throw new WorkflowException("workflow engine {0} not found", id);
        }
        return engine.executeAsync(name, handler);
    }

    public <T> T execute(String id, String name, Function<WorkContext, Object> handler) {
        WorkflowEngine engine = this.engines.get(id);
        if (engine == null) {
            throw new WorkflowException("workflow engine {0} not found", id);
        }
        return engine.execute(name, handler);
    }

    public <T> Mono<T> executeAsyncNext(String id, Function<WorkContext, Mono<Object>> handler) {
        WorkflowEngine engine = this.engines.get(id);
        if (engine == null) {
            throw new WorkflowException("workflow engine {0} not found", id);
        }
        return engine.executeAsyncNext(handler);
    }

    public <T> T executeNext(String id, Function<WorkContext, Object> handler) {
        WorkflowEngine engine = this.engines.get(id);
        if (engine == null) {
            throw new WorkflowException("workflow engine {0} not found", id);
        }
        return engine.executeNext(handler);
    }

    private void removeEngine(String id, boolean cancel) {
        if (cancel) {
            log.info("workflow {} terminated . (canceled)", (Object)id);
        } else {
            log.info("workflow {} terminated .", (Object)id);
        }
        WorkflowEngine engine = this.engines.remove(id);
        this.entityCache.remove(id);
        if (engine != null) {
            // empty if block
        }
    }

    private Mono<WorkStatus> updateState(String workflowId, String id, String tenantId, WorkStatus status) {
        Command req = id.equals(workflowId) ? WorkflowCompleteRequest.builder().withWorkflowId(workflowId).withStatus(status.getStatus()).withTenantId(tenantId).withVersion("v1").withCommandName(CompleteWorkflow.class.getSimpleName()).withRootId(workflowId).withId(workflowId).build() : WorkflowUpdateRequest.builder().withWorkContext((WorkContext)status.getData()).withWorkflowId(workflowId).withWorkId(id).withTenantId(tenantId).withVersion("v1").withCommandName(UpdateWorkflow.class.getSimpleName()).withRootId(workflowId).withId(workflowId).build();
        return this.cluster.requestReply(this.domainOwner + "/" + this.domainContext + "/" + RootEntity.getVersion(WorkflowEntity.class) + "/WorkflowService", (Object)req).cast(WorkflowEntity.class).doOnNext(s -> this.entityCache.put(s.entityId(), (WorkflowEntity)((Object)s))).map(r -> status);
    }

    private Optional<WorkStatus> loadWorkStatus(String workflowId, String workId) {
        WorkflowEntity e = this.entityCache.get(workflowId);
        return e.getStatus(workId);
    }
}

