/*
 * Decompiled with CFR 0.152.
 */
package no.skatteetaten.fastsetting.formueinntekt.felles.task.processor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.Task;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.TaskConsumer;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.TaskCreation;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.TaskDecision;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.TaskSink;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.TaskSupplement;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskContext;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskContextCompletion;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskProcessor;

public class TaskConsumerContext<TRANSACTION, EXCEPTION extends Exception, SUPPLEMENT extends TaskSupplement>
implements TaskConsumer,
TaskContext<TRANSACTION, EXCEPTION, SUPPLEMENT> {
    private final Set<Task> tasks;
    private final TaskSink<TRANSACTION, EXCEPTION> taskSink;
    private final Function<String, Optional<TaskProcessor>> processors;
    private final BiFunction<Task, TaskCreation, Stream<TaskCreation>> postprocessor;
    private final Map<String, List<Definition>> topics = new ConcurrentHashMap<String, List<Definition>>();

    public TaskConsumerContext(Set<Task> tasks, TaskSink<TRANSACTION, EXCEPTION> taskSink, Function<String, Optional<TaskProcessor>> processors) {
        this.tasks = tasks;
        this.taskSink = taskSink;
        this.processors = processors;
        this.postprocessor = (task, creation) -> Stream.of(creation);
    }

    public TaskConsumerContext(Set<Task> tasks, TaskSink<TRANSACTION, EXCEPTION> taskSink, Function<String, Optional<TaskProcessor>> processors, BiFunction<Task, TaskCreation, Stream<TaskCreation>> postprocessor) {
        this.tasks = tasks;
        this.taskSink = taskSink;
        this.processors = processors;
        this.postprocessor = postprocessor;
    }

    public void pushByTask(Set<Task> tasks, String topic, TaskSink.Insertion insertion, Function<Task, Collection<TaskCreation>> resolver) {
        if (!this.tasks.containsAll(tasks)) {
            throw new IllegalArgumentException("Cannot write tasks for unknown tasks: " + tasks.stream().filter(task -> !this.tasks.contains(task)).collect(Collectors.toList()));
        }
        this.topics.computeIfAbsent(topic, ignored -> new CopyOnWriteArrayList()).add(new Definition(tasks, insertion, resolver));
    }

    @Override
    public CompletionStage<TaskContextCompletion<TRANSACTION, EXCEPTION>> apply(Map<Task, TaskDecision> decisions, Executor executor, SUPPLEMENT supplement) {
        CompletableFuture<TaskContextCompletion<TRANSACTION, EXCEPTION>> future = new CompletableFuture<TaskContextCompletion<TRANSACTION, EXCEPTION>>();
        if (this.topics.isEmpty()) {
            future.complete(transaction -> decisions);
        } else {
            executor.execute(() -> {
                try {
                    Set succeeded = decisions.entrySet().stream().filter(entry -> ((TaskDecision)entry.getValue()).isContinued()).map(Map.Entry::getKey).collect(Collectors.toSet());
                    Map<String, List> resolved = this.topics.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((List)entry.getValue()).stream().flatMap(definition -> definition.resolve(succeeded, this.postprocessor).stream()).collect(Collectors.toList())));
                    if (resolved.values().stream().mapToLong(Collection::size).sum() == 0L) {
                        future.complete(transaction -> decisions);
                    } else {
                        supplement.register(TopicsToResume.class, (Object)new TopicsToResume(resolved.keySet()));
                        future.complete(transaction -> {
                            for (Map.Entry entry : resolved.entrySet()) {
                                Iterator it = ((List)entry.getValue()).iterator();
                                Resolved current = (Resolved)it.next();
                                TaskSink.Insertion insertion = current.getInsertion();
                                ArrayList<TaskCreation> creations = new ArrayList<TaskCreation>(resolved.size());
                                creations.add(current.getCreation());
                                while (it.hasNext()) {
                                    current = (Resolved)it.next();
                                    if (current.getInsertion() != insertion) {
                                        this.taskSink.push(transaction, (String)entry.getKey(), insertion, creations);
                                        insertion = current.getInsertion();
                                        creations.clear();
                                    }
                                    creations.add(current.getCreation());
                                }
                                this.taskSink.push(transaction, (String)entry.getKey(), insertion, creations);
                            }
                            return decisions;
                        });
                    }
                }
                catch (Throwable t) {
                    future.completeExceptionally(t);
                }
            });
        }
        return future;
    }

    @Override
    public void onAfterTransaction(Executor executor, SUPPLEMENT supplement) {
        supplement.probe(TopicsToResume.class).ifPresent(resume -> executor.execute(() -> resume.getTopics().forEach(topic -> this.processors.apply((String)topic).ifPresent(TaskProcessor::resume))));
    }

    static class Definition {
        private final Set<Task> tasks;
        private final TaskSink.Insertion insertion;
        private final Function<Task, Collection<TaskCreation>> resolver;

        Definition(Set<Task> tasks, TaskSink.Insertion insertion, Function<Task, Collection<TaskCreation>> resolver) {
            this.tasks = tasks;
            this.insertion = insertion;
            this.resolver = resolver;
        }

        List<Resolved> resolve(Set<Task> succeeded, BiFunction<Task, TaskCreation, Stream<TaskCreation>> postprocessor) {
            return this.tasks.stream().filter(succeeded::contains).flatMap(task -> this.resolver.apply((Task)task).stream().flatMap(creation -> ((Stream)postprocessor.apply((Task)task, (TaskCreation)creation)).map(transformed -> new Resolved((Task)task, this.insertion, (TaskCreation)transformed)))).collect(Collectors.toList());
        }
    }

    static class TopicsToResume {
        private final Set<String> topics;

        TopicsToResume(Set<String> topics) {
            this.topics = topics;
        }

        Set<String> getTopics() {
            return this.topics;
        }
    }

    static class Resolved {
        private final Task task;
        private final TaskSink.Insertion insertion;
        private final TaskCreation creation;

        Resolved(Task task, TaskSink.Insertion insertion, TaskCreation creation) {
            this.task = task;
            this.creation = creation;
            this.insertion = insertion;
        }

        Task getTask() {
            return this.task;
        }

        TaskSink.Insertion getInsertion() {
            return this.insertion;
        }

        TaskCreation getCreation() {
            return this.creation;
        }
    }
}

