/*
 * Decompiled with CFR 0.152.
 */
package com.cloudimpl.outstack.workflow;

import com.cloudimpl.outstack.common.FluxProcessor;
import com.cloudimpl.outstack.common.MonoFuture;
import com.cloudimpl.outstack.common.RetryUtil;
import com.cloudimpl.outstack.workflow.ExternalTrigger;
import com.cloudimpl.outstack.workflow.Work;
import com.cloudimpl.outstack.workflow.WorkContext;
import com.cloudimpl.outstack.workflow.WorkStatus;
import com.cloudimpl.outstack.workflow.Workflow;
import com.cloudimpl.outstack.workflow.WorkflowException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.function.Function4;
import reactor.retry.Retry;

public class WorkflowEngine {
    private static final Logger log = LoggerFactory.getLogger(WorkflowEngine.class);
    private Workflow mainFlow;
    private WorkContext context;
    private Map<String, ExternalTrigger> triggers;
    private Set<String> triggerNames;
    private Function4<String, String, String, WorkStatus, Mono<WorkStatus>> updateStateHandler;
    private BiFunction<String, String, Optional<WorkStatus>> workStatusLoader;
    private String id;
    private Set<String> activeTriggers;
    private FluxProcessor<MonoFuture> dbWriter;
    private Disposable dbWriterHnd;
    private String tenantId;
    private Work.Status status;
    private Disposable workflowHnd;
    private Consumer<Object> autoWireHandler;

    public WorkflowEngine(String id) {
        this(id, null, (Function4<String, String, String, WorkStatus, Mono<WorkStatus>>)((Function4)WorkflowEngine::dummyStateUpdater), s -> {}, WorkflowEngine::dummyWorkStatusLoader);
    }

    public WorkflowEngine(String id, String tenantId, Function4<String, String, String, WorkStatus, Mono<WorkStatus>> updateStateHandler, Consumer<Object> autoWireHandler, BiFunction<String, String, Optional<WorkStatus>> workStatusLoader) {
        this.id = id;
        this.tenantId = tenantId;
        this.triggers = new ConcurrentHashMap<String, ExternalTrigger>();
        this.triggerNames = new HashSet<String>();
        this.activeTriggers = new ConcurrentSkipListSet<String>();
        this.updateStateHandler = updateStateHandler;
        this.autoWireHandler = autoWireHandler;
        this.status = Work.Status.PENDING;
        this.workStatusLoader = workStatusLoader;
    }

    public String getId() {
        return this.id;
    }

    public Mono<WorkStatus> execute(Workflow workFlow) {
        workFlow.setEngine(this);
        workFlow.setHandlers(this::stateUpdater, this.autoWireHandler, this::workStatusLoader);
        if (this.mainFlow != null || this.context != null) {
            return Mono.error(() -> new WorkflowException("workflow is already executed ", new Object[0]));
        }
        this.status = Work.Status.RUNNING;
        this.context = new WorkContext();
        this.mainFlow = workFlow;
        this.dbWriter = new FluxProcessor("workflow engine " + this.id);
        this.dbWriterHnd = this.dbWriter.flux("workflow engine " + this.id).publishOn(Schedulers.parallel()).flatMap(s -> this.writeToDB((String)s.getParam(0), (WorkStatus)s.getParam(1)).doOnNext(k -> s.submit(k)), 1).doOnTerminate(() -> this.mainFlow.log("db writer closing down on terminate", new Object[0])).doOnCancel(() -> this.mainFlow.log("db writer closing down on cancel", new Object[0])).subscribe();
        MonoFuture future = MonoFuture.create((Object[])new Object[0]);
        this.workflowHnd = this.run().doOnSuccess(s -> {
            future.submit(s);
            log.info("workflow engine {} succesfully completed", (Object)this.id);
        }).doOnCancel(() -> {
            future.submit((Object)WorkStatus.publish(Work.Status.CANCELLED));
            log.info("workflow engine {} cancelled", (Object)this.id);
        }).subscribe();
        return future.get().flatMap(s -> this.stateUpdater(this.id, (WorkStatus)s)).doOnNext(s -> this.dbWriterHnd.dispose());
    }

    public Work.Status getStatus() {
        return this.status;
    }

    public void cancel() {
        this.workflowHnd.dispose();
    }

    public <T> Mono<T> executeAsync(String name, Function<WorkContext, Mono<Object>> handler) {
        ExternalTrigger trigger = this.triggers.get(name);
        if (trigger == null) {
            return Mono.error(() -> new WorkflowException("external trigger {0} not found", name));
        }
        if (trigger.isGate()) {
            throw new WorkflowException("wait trigger {0} not found .found {1}", name, trigger.getClass().getName());
        }
        return trigger.triggerAsync(handler);
    }

    public <T> T execute(String name, Function<WorkContext, Object> handler) {
        ExternalTrigger trigger = this.triggers.get(name);
        if (trigger == null) {
            throw new WorkflowException("external trigger {0} not found", name);
        }
        if (trigger.isGate()) {
            throw new WorkflowException("wait trigger {0} not found .found {1}", name, trigger.getClass().getName());
        }
        return trigger.trigger(handler);
    }

    public void openGate(String name, Consumer<WorkContext> consumer) {
        ExternalTrigger trigger = this.triggers.get(name);
        if (trigger == null) {
            throw new WorkflowException("gate trigger {0} not found", name);
        }
        if (!trigger.isGate()) {
            throw new WorkflowException("gate trigger {0} not found .found {1}", name, trigger.getClass().getName());
        }
        Function<WorkContext, Object> fun = c -> {
            consumer.accept((WorkContext)c);
            return "";
        };
        trigger.trigger(fun);
    }

    public Mono<Void> openGateAsync(String name, Consumer<WorkContext> consumer) {
        ExternalTrigger trigger = this.triggers.get(name);
        if (trigger == null) {
            return Mono.error(() -> new WorkflowException("get trigger {0} not found", name));
        }
        if (!trigger.isGate()) {
            return Mono.error(() -> new WorkflowException("gate trigger {0} not found .found {1}", name, trigger.getClass().getName()));
        }
        Function<WorkContext, Mono<Object>> fun = c -> {
            consumer.accept((WorkContext)c);
            return Mono.just((Object)true);
        };
        return trigger.triggerAsync(fun).then();
    }

    public <T> Mono<T> executeAsyncNext(Function<WorkContext, Mono<Object>> handler) {
        List active = this.activeTriggers.stream().collect(Collectors.toList());
        if (active.isEmpty()) {
            return Mono.error(() -> new WorkflowException("no active external trigger found for workflow id {0}", this.id));
        }
        if (active.size() > 1) {
            return Mono.error(() -> new WorkflowException("more than 1 active external trigger found for workflow id {0} - [{1}]", this.id, active.stream().collect(Collectors.joining(","))));
        }
        ExternalTrigger trigger = this.triggers.get(active.get(0));
        if (trigger == null) {
            return Mono.error(() -> new WorkflowException("external trigger {0} not found", active.get(0)));
        }
        if (trigger.isGate()) {
            return Mono.error(() -> new WorkflowException("gate trigger {0} not support anonymous execution", active.get(0)));
        }
        return trigger.triggerAsync(handler);
    }

    public <T> T executeNext(Function<WorkContext, Object> handler) {
        List active = this.activeTriggers.stream().collect(Collectors.toList());
        if (active.isEmpty()) {
            throw new WorkflowException("no active external trigger found for workflow id {0}", this.id);
        }
        if (active.size() > 1) {
            throw new WorkflowException("more than 1 active external trigger found for workflow id {0} - [{1}]", this.id, active.stream().collect(Collectors.joining(",")));
        }
        ExternalTrigger trigger = this.triggers.get(active.get(0));
        if (trigger == null) {
            throw new WorkflowException("external trigger {0} not found", active.get(0));
        }
        if (trigger.isGate()) {
            throw new WorkflowException("gate trigger {0} not support anonymous execution", active.get(0));
        }
        return trigger.trigger(handler);
    }

    protected ExternalTrigger getExternalTrigger(String name) {
        return this.triggers.get(name);
    }

    private Mono<WorkStatus> run() {
        return this.mainFlow.execute(this.context);
    }

    protected void registerExternalTrigger(String name, ExternalTrigger trigger) {
        this.triggers.put(name, trigger);
        this.activeTriggers.add(name);
    }

    protected void checkTriggerDuplicate(String name) {
        if (!this.triggerNames.add(name)) {
            throw new WorkflowException("duplicate name {0} found in external trigger in workflow {1} ", name, this.id);
        }
    }

    public static Mono<WorkStatus> dummyStateUpdater(String workflowId, String workId, String tenantId, WorkStatus result) {
        System.out.println("db write : " + workflowId + " workId : " + workId + "result : " + result.getStatus() + " context : " + result.getData());
        return Mono.just((Object)result);
    }

    public static Optional<WorkStatus> dummyWorkStatusLoader(String workFlowId, String workId) {
        return Optional.empty();
    }

    public Optional<WorkStatus> workStatusLoader(String workId) {
        return this.workStatusLoader.apply(this.id, workId);
    }

    private Mono<WorkStatus> stateUpdater(String id, WorkStatus result) {
        if (id.equals(this.id)) {
            this.mainFlow.log("workflow engine completing , writing final state", new Object[0]);
        }
        MonoFuture future = MonoFuture.create((Object[])new Object[]{id, result});
        this.dbWriter.add((Object)future);
        return future.get();
    }

    private Mono<WorkStatus> writeToDB(String id, WorkStatus result) {
        return Mono.defer(() -> (Mono)this.updateStateHandler.apply((Object)this.id, (Object)id, (Object)this.tenantId, (Object)result)).doOnError(err -> log.error("Error writing to DB for workflow " + this.id, err)).retryWhen(RetryUtil.wrap((Retry)Retry.any().exponentialBackoffWithJitter(Duration.ofSeconds(3L), Duration.ofSeconds(60L))));
    }

    protected void removeActiveTrigger(String name) {
        this.activeTriggers.remove(name);
        this.triggers.remove(name);
    }
}

