/*
 * Decompiled with CFR 0.152.
 */
package io.github.opensabe.common.buffer;

import com.google.common.collect.Lists;
import com.google.errorprone.annotations.ForOverride;
import io.github.opensabe.common.buffer.BufferedElement;
import io.github.opensabe.common.buffer.observation.BatchBufferQueueBatchManipulateObservationDocumentation;
import io.github.opensabe.common.executor.ThreadPoolFactory;
import io.github.opensabe.common.executor.ThreadPoolFactoryGracefulShutDownHandler;
import io.github.opensabe.common.observation.UnifiedObservationFactory;
import io.micrometer.observation.Observation;
import io.micrometer.tracing.TraceContext;
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class BatchBufferedQueue<E extends BufferedElement> {
    @Generated
    private static final Logger log = LogManager.getLogger(BatchBufferedQueue.class);
    protected static final int DEFAULT_QUEUE_COUNT = 1;
    protected static final int DEFAULT_QUEUE_SIZE = 0x100000;
    protected static final int DEFAULT_BATCH_SIZE = 2048;
    protected static final long DEFAULT_POLL_WAIT_TIME_IN_MILLIS = 1000L;
    protected static final long DEFAULT_MAX_WAIT_TIME_IN_MILLIS = 1000L;
    private final AtomicInteger counter = new AtomicInteger(0);
    private MpscBlockingConsumerArrayQueue<E>[] mpscBlockingConsumerArrayQueues;
    private ExecutorService[] executorServices;
    @Autowired
    private ThreadPoolFactory threadPoolFactory;
    @Autowired
    private UnifiedObservationFactory unifiedObservationFactory;
    @Autowired
    private ThreadPoolFactoryGracefulShutDownHandler threadPoolFactoryGracefulShutDownHandler;

    private static int getNearest2Power(int n) {
        if ((n & n - 1) == 0) {
            return n;
        }
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return ++n;
    }

    @ForOverride
    protected int queueCount() {
        return 1;
    }

    @ForOverride
    protected int queueSize() {
        return 0x100000;
    }

    @ForOverride
    protected int batchSize() {
        return 2048;
    }

    @ForOverride
    protected long pollWaitTimeInMillis() {
        return 1000L;
    }

    @ForOverride
    protected long maxWaitTimeInMillis() {
        return 1000L;
    }

    protected abstract Comparator<E> comparator();

    protected abstract void batchManipulate(List<E> var1);

    protected void beforeExecute(List<E> batch) {
    }

    protected void afterBatchFinish(List<E> batch) {
    }

    protected void afterBatchError(List<E> batch, Throwable throwable) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void manipulate(List<E> batch) {
        Observation observation = BatchBufferQueueBatchManipulateObservationDocumentation.DEFAULT_OBSERVATION_DOCUMENTATION.observation(this.unifiedObservationFactory.getObservationRegistry());
        if (CollectionUtils.isEmpty(batch)) {
            log.debug("BatchBufferedQueue: empty queue");
            return;
        }
        log.info("BatchBufferedQueue: current batch size: {}", (Object)batch.size());
        try {
            observation.start();
            TraceContext traceContext = UnifiedObservationFactory.getTraceContext((Observation)observation);
            batch.forEach(e -> e.beforeElementManipulate(traceContext.spanId()));
            batch = batch.stream().sorted(this.comparator()).collect(Collectors.toList());
            this.beforeExecute(batch);
            this.batchManipulate(batch);
            batch.forEach(BufferedElement::afterElementManipulate);
            this.afterBatchFinish(batch);
        }
        catch (Throwable e2) {
            batch.forEach(element -> element.afterElementManipulateError(e2));
            this.afterBatchError(batch, e2);
            observation.error(e2);
        }
        finally {
            observation.stop();
        }
    }

    @PostConstruct
    public void init() {
        int queueCount = BatchBufferedQueue.getNearest2Power(this.queueCount());
        int queueSize = BatchBufferedQueue.getNearest2Power(this.queueSize());
        int batchSize = BatchBufferedQueue.getNearest2Power(this.batchSize());
        long pollWaitTimeInMillis = this.pollWaitTimeInMillis();
        long maxWaitTimeInMillis = this.maxWaitTimeInMillis();
        String name = this.getClass().getSimpleName();
        this.mpscBlockingConsumerArrayQueues = new MpscBlockingConsumerArrayQueue[queueCount];
        this.executorServices = new ExecutorService[queueCount];
        for (int i = 0; i < queueCount; ++i) {
            this.mpscBlockingConsumerArrayQueues[i] = new MpscBlockingConsumerArrayQueue(queueSize);
            this.executorServices[i] = this.threadPoolFactory.createSingleThreadPoolExecutor(name + "-" + i);
            int finalI = i;
            this.executorServices[i].submit(() -> {
                BufferedElement queueElement;
                ArrayList batch;
                Thread thread = Thread.currentThread();
                while (!thread.isInterrupted() && !this.threadPoolFactoryGracefulShutDownHandler.isShuttingDown()) {
                    try {
                        BufferedElement queueElement2;
                        batch = Lists.newArrayList();
                        long start = System.currentTimeMillis();
                        while (batch.size() < batchSize && (queueElement2 = (BufferedElement)this.mpscBlockingConsumerArrayQueues[finalI].poll(pollWaitTimeInMillis, TimeUnit.MILLISECONDS)) != null) {
                            batch.add(queueElement2);
                            log.info("BatchBufferedQueue: {} origin traceId: {} spanId: {} add to batch", (Object)name, (Object)queueElement2.traceId(), (Object)queueElement2.spanId());
                            if (System.currentTimeMillis() - start <= maxWaitTimeInMillis) continue;
                            break;
                        }
                        this.manipulate(batch);
                    }
                    catch (Throwable e) {
                        log.fatal("BatchBufferedQueue: {} error: {}", (Object)name, (Object)e.getMessage(), (Object)e);
                    }
                }
                log.info("BatchBufferedQueue: {} before shutting down, drain all remaining: {}", (Object)name, (Object)this.mpscBlockingConsumerArrayQueues[finalI].size());
                batch = Lists.newArrayList();
                while ((queueElement = (BufferedElement)this.mpscBlockingConsumerArrayQueues[finalI].relaxedPoll()) != null) {
                    batch.add(queueElement);
                    log.info("BatchBufferedQueue: {} origin traceId: {} spanId: {} add to batch", (Object)name, (Object)queueElement.traceId(), (Object)queueElement.spanId());
                    if (batch.size() < batchSize) continue;
                    this.manipulate(batch);
                    batch.clear();
                }
                if (CollectionUtils.isNotEmpty((Collection)batch)) {
                    this.manipulate(batch);
                }
                log.info("BatchBufferedQueue {} shutdown", (Object)name);
            });
        }
    }

    public void submit(E e) {
        Observation observation = this.unifiedObservationFactory.getCurrentOrCreateEmptyObservation();
        TraceContext traceContext = UnifiedObservationFactory.getTraceContext((Observation)observation);
        ((BufferedElement)e).setSubmitInfo(traceContext == null ? null : traceContext.traceId(), traceContext == null ? null : traceContext.spanId());
        log.info("BatchBufferedQueue-sumbit: {} -> {}", (Object)this.getClass().getSimpleName(), (Object)((BufferedElement)e).hashKey());
        String hashKey = ((BufferedElement)e).hashKey();
        int length = this.mpscBlockingConsumerArrayQueues.length;
        int idx = StringUtils.isNotBlank((CharSequence)hashKey) ? hashKey.hashCode() & length - 1 : Math.abs(this.counter.incrementAndGet() & length - 1);
        this.mpscBlockingConsumerArrayQueues[idx].offer(e);
    }
}

