/*
 * Decompiled with CFR 0.152.
 */
package nva.commons.core.parallel;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import nva.commons.core.attempt.Failure;
import nva.commons.core.attempt.Try;
import nva.commons.core.parallel.ParallelExecutionException;

public class ParallelMapper<I, O> {
    public static final int DEFAULT_BATCH_SIZE = 100;
    private final List<Callable<O>> actions;
    private final int batchSize;
    private final List<Future<O>> futures;
    private final Function<I, O> mappingFunction;

    public ParallelMapper(Collection<I> inputs, Function<I, O> function) {
        this((Stream)inputs.stream().parallel(), function, 100);
    }

    public ParallelMapper(Collection<I> inputs, Function<I, O> mappingFunction, int batchSize) {
        this((Stream)inputs.stream().parallel(), mappingFunction, batchSize);
    }

    public ParallelMapper(Stream<I> inputs, Function<I, O> mappingFunction) {
        this(inputs, mappingFunction, 100);
    }

    public ParallelMapper(Stream<I> inputs, Function<I, O> mappingFunction, int batchSize) {
        this.mappingFunction = mappingFunction;
        this.actions = inputs.map(this::toCallable).collect(Collectors.toList());
        this.batchSize = batchSize;
        this.futures = new ArrayList<Future<O>>();
    }

    public ParallelMapper<I, O> map() throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int index = 0; index < this.actions.size(); index += this.batchSize) {
            this.executeBatch(executor, index);
        }
        return this;
    }

    public List<O> getSuccesses() {
        return this.getCompleted().filter(Try::isSuccess).map(Try::orElseThrow).collect(Collectors.toList());
    }

    public List<ParallelExecutionException> getExceptions() {
        return this.getCompleted().filter(Try::isFailure).map(Try::getException).map(this::getExceptionWithInputObject).map(exception -> (ParallelExecutionException)exception).collect(Collectors.toList());
    }

    private void executeBatch(ExecutorService executor, int index) throws InterruptedException {
        List<Callable<O>> actionsForExecution = this.actions.subList(index, this.endIndex(index));
        List<Future<O>> executed = executor.invokeAll(actionsForExecution);
        this.futures.addAll(executed);
    }

    private int endIndex(int index) {
        return Math.min(this.actions.size(), index + this.batchSize);
    }

    private Stream<Try<O>> getCompleted() {
        return ((Stream)this.futures.stream().parallel()).filter(Future::isDone).map(Try.attempt(Future::get));
    }

    private Throwable getExceptionWithInputObject(Exception exception) {
        return exception.getCause();
    }

    private Callable<O> toCallable(I input) {
        return () -> Try.attempt(() -> this.mappingFunction.apply(input)).orElseThrow(fail -> this.captureAllExceptionsAndAddInputObject(input, (Failure<O>)fail));
    }

    private ParallelExecutionException captureAllExceptionsAndAddInputObject(I input, Failure<O> fail) {
        return new ParallelExecutionException(input, fail.getException());
    }
}

