/*
 * Decompiled with CFR 0.152.
 */
package no.skatteetaten.fastsetting.formueinntekt.felles.task.processor;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.Task;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.TaskDecision;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.TaskResult;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.TaskSource;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskDispatcher;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskHandler;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskLimiter;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskListener;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskProcessor;

public class TaskManager<TRANSACTION, EXCEPTION extends Exception, TRACE>
implements TaskProcessor {
    private final String topic;
    private final Executor executor;
    private final TaskSource<TRANSACTION, EXCEPTION> source;
    private final TaskLimiter limiter;
    private final TaskDispatcher<TRANSACTION, EXCEPTION> dispatcher;
    private final Function<String, ? extends TaskHandler<TRANSACTION, EXCEPTION>> handlerFactory;
    private final TaskListener<TRACE> listener;
    private final TaskSource.Order order;
    private final int size;
    private final long pause;
    private final TimeUnit timeUnit;
    private final AtomicBoolean alive = new AtomicBoolean();
    private final AtomicReference<Thread> current = new AtomicReference();
    private volatile CountDownLatch completionLatch = new CountDownLatch(0);
    private volatile Runnable continuation = () -> {};

    public TaskManager(String topic, Executor executor, TaskSource<TRANSACTION, EXCEPTION> source, TaskDispatcher<TRANSACTION, EXCEPTION> dispatcher, TaskLimiter limiter, Function<String, ? extends TaskHandler<TRANSACTION, EXCEPTION>> handlerFactory, TaskListener<TRACE> listener, TaskSource.Order order, int size, long pause, TimeUnit timeUnit) {
        this.topic = topic;
        this.executor = executor;
        this.source = source;
        this.limiter = limiter;
        this.dispatcher = dispatcher;
        this.handlerFactory = handlerFactory;
        this.listener = listener;
        this.order = order;
        this.size = size;
        this.pause = pause;
        this.timeUnit = timeUnit;
    }

    @Override
    public synchronized boolean start(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        if (this.alive.get()) {
            return false;
        }
        CountDownLatch startLatch = new CountDownLatch(1);
        this.completionLatch = new CountDownLatch(1);
        this.executor.execute(() -> {
            if (!this.alive.compareAndSet(false, true)) throw new IllegalStateException("Process for " + this.topic + " is already running");
            this.current.set(Thread.currentThread());
            startLatch.countDown();
            try (TaskHandler handler = this.handlerFactory.apply(this.topic);){
                CyclicBarrier barrier = new CyclicBarrier(2);
                this.continuation = barrier::reset;
                while (this.current.get() != null && !Thread.interrupted()) {
                    TaskLimiter.Token token = this.limiter.request(this.topic);
                    try {
                        Set tasks;
                        try {
                            tasks = this.dispatcher.apply(transaction -> this.source.poll(transaction, this.topic, this.order, this.size));
                        }
                        catch (Throwable throwable2) {
                            this.listener.onFatal(this.topic, throwable2);
                            token.release();
                            continue;
                        }
                        if (tasks.isEmpty()) {
                            this.listener.onEmpty(this.topic);
                            token.release();
                            try {
                                barrier.await(this.pause, this.timeUnit);
                                throw new IllegalStateException();
                            }
                            catch (InterruptedException ignored) {
                                Thread.currentThread().interrupt();
                                return;
                            }
                            catch (BrokenBarrierException ignored) {
                                continue;
                            }
                            catch (TimeoutException ignored) {
                                barrier.reset();
                                continue;
                            }
                        }
                        TRACE trace = this.listener.onStart(this.topic, Collections.unmodifiableSet(tasks));
                        try {
                            handler.accept(Collections.unmodifiableSet(tasks), callback -> {
                                try {
                                    this.listener.onCallback(this.topic, trace, Collections.unmodifiableSet(tasks));
                                    Map committed = this.dispatcher.apply(transaction -> {
                                        HashMap<Task, TaskDecision> decisions = new HashMap<Task, TaskDecision>(callback.complete(transaction));
                                        if (!tasks.containsAll(decisions.keySet())) {
                                            throw new IllegalStateException("Received decisions for unknown tasks: " + decisions.keySet().stream().filter(task -> !tasks.contains(task)).sorted().collect(Collectors.toList()));
                                        }
                                        if (!decisions.keySet().containsAll(tasks)) {
                                            tasks.stream().filter(task -> !decisions.containsKey(task)).forEach(task -> decisions.put((Task)task, new TaskDecision(TaskResult.FAILURE, "Task was not included in task processor's result set")));
                                        }
                                        this.source.complete(transaction, this.topic, decisions);
                                        return decisions;
                                    });
                                    try {
                                        try {
                                            token.release();
                                        }
                                        finally {
                                            this.listener.onComplete(this.topic, trace, Collections.unmodifiableMap(committed));
                                        }
                                    }
                                    catch (Throwable throwable) {}
                                }
                                catch (Throwable throwable) {
                                    this.doFailAll(trace, tasks, throwable, token);
                                }
                            }, throwable -> this.doFailAll(trace, tasks, (Throwable)throwable, token));
                        }
                        catch (Throwable throwable3) {
                            this.doFailAll(trace, tasks, throwable3, token);
                        }
                        finally {
                            this.listener.onDispatched(this.topic, trace, Collections.unmodifiableSet(tasks));
                        }
                    }
                    catch (Throwable throwable4) {
                        try {
                            token.release();
                            throw throwable4;
                        }
                        catch (Throwable suppressed) {
                            throwable4.addSuppressed(suppressed);
                        }
                        throw throwable4;
                        return;
                    }
                }
            }
            catch (Throwable throwable5) {
                this.listener.onFatal(this.topic, throwable5);
                return;
            }
            finally {
                this.current.set(null);
                this.alive.set(false);
                this.completionLatch.countDown();
            }
        });
        try {
            if (!startLatch.await(timeout, timeUnit)) {
                throw new TimeoutException();
            }
        }
        catch (Throwable t) {
            Thread thread = this.current.getAndSet(null);
            thread.interrupt();
            throw t;
        }
        return true;
    }

    @Override
    public synchronized boolean stop(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        if (!this.alive.get()) {
            return false;
        }
        Thread thread = this.current.getAndSet(null);
        if (thread != null) {
            thread.interrupt();
        }
        this.continuation.run();
        this.continuation = () -> {};
        if (!this.completionLatch.await(timeout, timeUnit)) {
            throw new TimeoutException();
        }
        return true;
    }

    @Override
    public boolean resume() {
        if (!this.alive.get()) {
            return false;
        }
        this.continuation.run();
        return true;
    }

    @Override
    public boolean isActive() {
        return this.alive.get();
    }

    @Override
    public String getTopic() {
        return this.topic;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doFailAll(TRACE trace, Set<Task> tasks, Throwable throwable, TaskLimiter.Token token) {
        try {
            this.dispatcher.accept(transaction -> this.source.complete(transaction, this.topic, tasks.stream().collect(Collectors.toMap(Function.identity(), task -> new TaskDecision(throwable)))));
        }
        catch (Throwable fatal) {
            fatal.addSuppressed(throwable);
            this.listener.onFatal(this.topic, fatal);
        }
        finally {
            try {
                try {
                    token.release();
                }
                finally {
                    this.listener.onError(this.topic, trace, Collections.unmodifiableSet(tasks), throwable);
                }
            }
            catch (Throwable throwable2) {}
        }
    }
}

