/*
 * Decompiled with CFR 0.152.
 */
package org.skd.loadcode;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class H2MessageQueue {
    private static final String DB_URL = "jdbc:h2:file:./message_db";
    private static final String USER = "sa";
    private static final String PASSWORD = "sa";

    public H2MessageQueue() {
        try {
            Class.forName("org.h2.Driver");
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("H2 Driver not found!", e);
        }
    }

    private Connection getConnection() throws SQLException {
        return DriverManager.getConnection(DB_URL, "sa", "sa");
    }

    private void ensureTopicTableExists(String topic) {
        String sql = "CREATE TABLE IF NOT EXISTS " + topic + " (id IDENTITY PRIMARY KEY, data VARCHAR(255) NOT NULL, status VARCHAR(20) DEFAULT 'PENDING')";
        String indexSql = "CREATE INDEX IF NOT EXISTS idx_status ON " + topic + " (status)";
        try (Connection conn = this.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate(sql);
            stmt.executeUpdate(indexSql);
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public void insert(String topic, String data) {
        this.ensureTopicTableExists(topic);
        String sql = "INSERT INTO " + topic + " (data, status) VALUES (?, 'PENDING')";
        try (Connection conn = this.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql);){
            stmt.setString(1, data);
            stmt.executeUpdate();
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public synchronized List<String> read(String topic, int batchSize) {
        this.ensureTopicTableExists(topic);
        ArrayList<String> messages = new ArrayList<String>();
        String sqlSelect = "SELECT id, data FROM " + topic + " WHERE status = 'PENDING' ORDER BY id LIMIT ? FOR UPDATE SKIP LOCKED";
        String sqlUpdate = "UPDATE " + topic + " SET status = 'READ' WHERE id = ?";
        try (Connection conn = this.getConnection();){
            conn.setAutoCommit(false);
            try (PreparedStatement stmtSelect = conn.prepareStatement(sqlSelect);
                 PreparedStatement stmtUpdate = conn.prepareStatement(sqlUpdate);){
                stmtSelect.setInt(1, batchSize);
                ResultSet rs = stmtSelect.executeQuery();
                while (rs.next()) {
                    int id = rs.getInt("id");
                    String data = rs.getString("data");
                    messages.add(data);
                    stmtUpdate.setInt(1, id);
                    stmtUpdate.executeUpdate();
                }
                conn.commit();
            }
            catch (SQLException e) {
                conn.rollback();
                e.printStackTrace();
            }
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
        return messages;
    }

    public synchronized void delete(String topic, String data) {
        this.ensureTopicTableExists(topic);
        String sql = "DELETE FROM " + topic + " WHERE data = ?";
        try (Connection conn = this.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql);){
            stmt.setString(1, data);
            stmt.executeUpdate();
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public void reset(String topic, String data) {
        this.ensureTopicTableExists(topic);
        String sql = "UPDATE " + topic + " SET status = 'PENDING' WHERE data = ?";
        try (Connection conn = this.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql);){
            stmt.setString(1, data);
            stmt.executeUpdate();
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public synchronized void bulkInsert(String topic, List<String> messages) {
        this.ensureTopicTableExists(topic);
        String sql = "INSERT INTO " + topic + " (data, status) VALUES (?, 'PENDING')";
        try (Connection conn = this.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql);){
            conn.setAutoCommit(false);
            for (String data : messages) {
                stmt.setString(1, data);
                stmt.addBatch();
            }
            stmt.executeBatch();
            conn.commit();
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public int getAvailableRecords(String topic) {
        this.ensureTopicTableExists(topic);
        String sql = "SELECT COUNT(*) FROM " + topic + " WHERE status = 'PENDING'";
        try (Connection conn = this.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql);){
            ResultSet rs = stmt.executeQuery();
            if (!rs.next()) return 0;
            int n = rs.getInt(1);
            return n;
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
        return 0;
    }

    public static void main(String[] args) {
        AtomicInteger y = new AtomicInteger(1);
        ExecutorService executorService = Executors.newFixedThreadPool(100);
        int i = 0;
        while (i < 100) {
            H2MessageQueue h2MessageQueue = new H2MessageQueue();
            int finalI = i++;
            executorService.submit(() -> {
                int x = finalI;
                while (true) {
                    List<String> datared;
                    if ((datared = h2MessageQueue.read("forupdate", 1)).size() <= 0) {
                        continue;
                    }
                    System.out.println(datared);
                }
            });
        }
    }
}

