/*
 * Decompiled with CFR 0.152.
 */
package com.selectdb.load.async;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.selectdb.exceptions.ParseDataException;
import com.selectdb.load.LoadOptions;
import com.selectdb.load.async.DefaultThreadFactory;
import com.selectdb.load.async.SelectdbAsyncClient;
import com.selectdb.load.async.SelectdbStageLoadAsync;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectdbAsyncClientInternal
implements SelectdbAsyncClient,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(SelectdbAsyncClientInternal.class);
    private static final long serialVersionUID = 1L;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final LoadOptions loadOptions;
    private final SelectdbStageLoadAsync stageLoadAsync;
    private final String lineDelimiter;
    private final long flushIntervalMs;
    private transient ScheduledExecutorService scheduledExecutorService;
    private volatile transient Exception flushException = null;
    private AtomicBoolean started;

    public SelectdbAsyncClientInternal(LoadOptions loadOptions) {
        this.loadOptions = loadOptions;
        this.stageLoadAsync = new SelectdbStageLoadAsync(loadOptions);
        this.lineDelimiter = loadOptions.getPropertyMap().getOrDefault("file.line_delimiter", "\n");
        this.flushIntervalMs = loadOptions.getBufferFlushIntervalMs();
        this.started = new AtomicBoolean(false);
        this.open();
    }

    private void intervalFlush() {
        try {
            LOG.info("interval flush triggered.");
            this.stageLoadAsync.flush(false);
        }
        catch (Exception e) {
            this.flushException = e;
        }
    }

    private void open() {
        if (this.started.compareAndSet(false, true)) {
            this.stageLoadAsync.open();
            this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory("stage-load-flush-interval"));
            this.scheduledExecutorService.scheduleWithFixedDelay(this::intervalFlush, this.flushIntervalMs, this.flushIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void writeRow(byte[] data) {
        this.checkFlushException();
        try {
            this.stageLoadAsync.writeRecord(data);
        }
        catch (Exception e) {
            LOG.error("Failed to write data, ", (Throwable)e);
            this.close();
            throw new RuntimeException("write data failed");
        }
    }

    @Override
    public <T> void writeRows(List<T> dataList) {
        this.checkFlushException();
        if (dataList == null || dataList.isEmpty()) {
            return;
        }
        try {
            for (int index = 0; index < dataList.size(); ++index) {
                T item = dataList.get(index);
                String data = OBJECT_MAPPER.writeValueAsString(item);
                this.writeRow(data.getBytes(StandardCharsets.UTF_8));
                dataList.set(index, null);
            }
        }
        catch (JsonProcessingException e) {
            throw new ParseDataException("Parsed error, data converted to json string failed.");
        }
    }

    @Override
    public void flush() {
        this.checkFlushException();
        try {
            this.stageLoadAsync.flush(true);
        }
        catch (Exception e) {
            LOG.error("Failed to flush data, ", (Throwable)e);
            this.close();
            throw new RuntimeException("flush data failed");
        }
    }

    @Override
    public void close() {
        if (this.started.compareAndSet(true, false)) {
            LOG.info("Closing SelectdbAsyncClient.");
            this.flushException = null;
            this.scheduledExecutorService.shutdownNow();
            this.stageLoadAsync.close();
        }
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            this.close();
            throw new RuntimeException("Writing records to stage load failed.", this.flushException);
        }
    }
}

