/*
 * Decompiled with CFR 0.152.
 */
package com.selectdb.load.async;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.selectdb.exceptions.SelectdbRuntimeException;
import com.selectdb.load.CopyExecutor;
import com.selectdb.load.LoadOptions;
import com.selectdb.load.StageLoad;
import com.selectdb.load.async.DefaultThreadFactory;
import com.selectdb.load.async.RecordBuffer;
import com.selectdb.model.BaseResponse;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectdbStageLoadAsync
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SelectdbStageLoadAsync.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final byte[] lineDelimiter;
    private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
    private String uploadUrl;
    private String hostPort;
    private final String username;
    private final String password;
    private final String db;
    private final String table;
    private final Map<String, String> loadProps;
    private List<String> fileList = new CopyOnWriteArrayList<String>();
    private RecordBuffer buffer;
    private LoadOptions loadOptions;
    private ExecutorService loadExecutorService;
    private StageLoadAsyncExecutor loadAsyncExecutor;
    private BlockingQueue<RecordBuffer> writeQueue;
    private BlockingQueue<RecordBuffer> readQueue;
    private AtomicBoolean started;
    private AtomicReference<Throwable> exception = new AtomicReference<Object>(null);

    public SelectdbStageLoadAsync(LoadOptions loadOptions) {
        this.hostPort = String.format("%s:%s", loadOptions.getHost(), loadOptions.getHttpPort());
        this.db = loadOptions.getDatabase();
        this.table = loadOptions.getTable();
        this.username = loadOptions.getUsername();
        this.password = loadOptions.getPassword();
        this.uploadUrl = String.format(UPLOAD_URL_PATTERN, this.hostPort);
        this.loadProps = loadOptions.getPropertyMap();
        this.lineDelimiter = this.loadProps.getOrDefault("file.line_delimiter", "\n").getBytes(StandardCharsets.UTF_8);
        this.loadOptions = loadOptions;
        this.writeQueue = new ArrayBlockingQueue<RecordBuffer>(loadOptions.getFlushQueueSize());
        LOG.info("init RecordBuffer capacity {}, count {}", (Object)loadOptions.getBufferFlushMaxBytes(), (Object)loadOptions.getFlushQueueSize());
        for (int index = 0; index < loadOptions.getFlushQueueSize(); ++index) {
            this.writeQueue.add(new RecordBuffer(this.lineDelimiter, loadOptions.getBufferFlushMaxBytes()));
        }
        this.readQueue = new LinkedBlockingDeque<RecordBuffer>();
        this.loadAsyncExecutor = new StageLoadAsyncExecutor(loadOptions);
        this.started = new AtomicBoolean(false);
        this.open();
    }

    public synchronized void open() {
        if (this.started.compareAndSet(false, true)) {
            this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1), new DefaultThreadFactory("upload-executor"), new ThreadPoolExecutor.AbortPolicy());
            this.loadExecutorService.execute(this.loadAsyncExecutor);
        }
    }

    public String getHostPort() {
        return this.hostPort;
    }

    public synchronized void writeRecord(byte[] record) throws InterruptedException {
        this.checkFlushException();
        if (this.buffer == null) {
            this.buffer = this.takeRecordFromWriteQueue();
        }
        this.buffer.insert(record);
        if ((double)this.buffer.getBufferSizeBytes() >= (double)this.loadOptions.getBufferFlushMaxBytes() * 0.8 || this.loadOptions.getBufferFlushMaxRows() != 0 && this.buffer.getNumOfRecords() >= this.loadOptions.getBufferFlushMaxRows()) {
            this.flush(false);
        }
    }

    public synchronized void flush(boolean waitUtilDone) throws InterruptedException {
        this.checkFlushException();
        if (this.buffer != null && !this.buffer.isEmpty()) {
            String fileName = UUID.randomUUID().toString();
            this.buffer.setFileName(fileName);
            RecordBuffer tmpBuff = this.buffer;
            this.readQueue.put(tmpBuff);
            this.buffer = null;
        }
        if (waitUtilDone) {
            this.waitAsyncLoadFinish();
        }
    }

    private void putRecordToWriteQueue(RecordBuffer buffer) {
        try {
            this.writeQueue.put(buffer);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Failed to recycle a buffer to queue");
        }
    }

    private RecordBuffer takeRecordFromWriteQueue() {
        this.checkFlushException();
        try {
            return this.writeQueue.take();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Failed to take a buffer from queue");
        }
    }

    private void checkFlushException() {
        if (this.exception.get() != null) {
            this.close();
            throw new SelectdbRuntimeException(this.exception.get());
        }
    }

    private void waitAsyncLoadFinish() throws InterruptedException {
        for (int i = 0; i < this.loadOptions.getFlushQueueSize() + 1; ++i) {
            RecordBuffer empty = this.takeRecordFromWriteQueue();
            this.readQueue.put(empty);
        }
    }

    public void close() {
        LOG.info("Closing StageLoadAsyncExecutor");
        this.loadExecutorService.shutdown();
        this.started.set(false);
        this.exception.set(null);
        this.writeQueue.clear();
        this.readQueue.clear();
    }

    class StageLoadAsyncExecutor
    implements Runnable {
        private final StageLoad stageLoad;
        private final CopyExecutor copyExecutor;

        public StageLoadAsyncExecutor(LoadOptions loadOptions) {
            this.stageLoad = new StageLoad(loadOptions);
            this.copyExecutor = new CopyExecutor(loadOptions);
        }

        @Override
        public void run() {
            LOG.info("StageLoadAsyncExecutor start");
            while (SelectdbStageLoadAsync.this.started.get()) {
                RecordBuffer buffer = null;
                try {
                    buffer = (RecordBuffer)SelectdbStageLoadAsync.this.readQueue.poll(2000L, TimeUnit.MILLISECONDS);
                    if (buffer == null || buffer.getFileName() == null) continue;
                    this.stageLoad.uploadToStorage(buffer.getFileName(), buffer);
                    BaseResponse copyResp = this.copyExecutor.execute(buffer.getFileName());
                    if (BaseResponse.isSuccess(copyResp)) continue;
                    throw new SelectdbRuntimeException("Failed to stage load, with response: " + copyResp);
                }
                catch (Exception e) {
                    LOG.error("worker running error", (Throwable)e);
                    SelectdbStageLoadAsync.this.exception.set(e);
                    break;
                }
                finally {
                    if (buffer == null) continue;
                    buffer.clear();
                    SelectdbStageLoadAsync.this.putRecordToWriteQueue(buffer);
                }
            }
            LOG.info("StageLoadAsyncExecutor stop");
        }
    }
}

