/*
 * Decompiled with CFR 0.152.
 */
package org.tinystruct.transfer;

import java.util.ArrayDeque;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.tinystruct.AbstractApplication;
import org.tinystruct.ApplicationException;
import org.tinystruct.data.component.Builder;
import org.tinystruct.system.ApplicationManager;
import org.tinystruct.system.annotation.Action;
import org.tinystruct.transfer.Maps;
import org.tinystruct.transfer.MessageQueue;
import org.tinystruct.valve.Lock;
import org.tinystruct.valve.Watcher;

public class DistributedMessageQueue
extends AbstractApplication
implements MessageQueue<String> {
    protected static final int DEFAULT_MESSAGE_POOL_SIZE = 10;
    private static final long TIMEOUT = 100L;
    protected final Map<String, BlockingQueue<Builder>> groups = Maps.GROUPS;
    protected final Map<String, Queue<Builder>> list = Maps.LIST;
    protected final Map<String, Set<String>> sessions = Maps.SESSIONS;
    private final Lock lock = Watcher.getInstance().acquire();
    private ExecutorService service;
    private static final Logger logger = Logger.getLogger(DistributedMessageQueue.class.getName());

    @Override
    public void init() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.service != null) {
                this.service.shutdown();
                try {
                    if (!this.service.awaitTermination(5L, TimeUnit.SECONDS)) {
                        this.service.shutdownNow();
                    }
                }
                catch (InterruptedException e) {
                    this.service.shutdownNow();
                    Thread.currentThread().interrupt();
                    logger.log(Level.SEVERE, "Service shutdown interrupted", e);
                }
            }
        }));
    }

    @Override
    @Action(value="message/put")
    public final String put(Object groupId, String sessionId, String message) {
        if (groupId == null || message == null || message.isEmpty()) {
            return "{}";
        }
        Builder builder = new Builder();
        builder.put("user", (Object)("user_" + sessionId));
        builder.put("time", (Object)System.nanoTime());
        builder.put("message", (Object)this.filter(message));
        builder.put("session_id", (Object)sessionId);
        return this.save(groupId, builder);
    }

    public final String save(Object groupId, Builder builder) {
        return this.save(groupId, builder, null);
    }

    public final String save(final Object groupId, Builder builder, final Runnable listener) {
        String group = groupId.toString();
        this.groups.computeIfAbsent(group, k -> new ArrayBlockingQueue(10));
        try {
            this.groups.get(group).put(builder);
            final BlockingQueue<Builder> messages = this.groups.get(groupId.toString());
            this.getService().execute(new Runnable(){

                @Override
                public void run() {
                    Builder message = (Builder)messages.poll();
                    if (message == null) {
                        return;
                    }
                    DistributedMessageQueue.this.copy(groupId, message);
                    if (listener != null) {
                        DistributedMessageQueue.this.getService().execute(listener);
                    }
                }
            });
            return builder.toString();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.log(Level.SEVERE, "Message save interrupted", e);
            return "{}";
        }
    }

    private ExecutorService getService() {
        return this.service != null ? this.service : (this.service = new ThreadPoolExecutor(10, 20, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(true), new ThreadPoolExecutor.CallerRunsPolicy()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Action(value="message/take")
    public final String take(String sessionId) throws ApplicationException {
        Queue<Builder> messages = this.list.get(sessionId);
        Builder message = messages.poll();
        if (message != null) {
            return message.toString();
        }
        long startTime = System.currentTimeMillis();
        try {
            this.lock.tryLock(100L, TimeUnit.MILLISECONDS);
            while ((message = messages.poll()) == null && System.currentTimeMillis() - startTime <= 100L) {
            }
        }
        finally {
            this.lock.unlock();
        }
        return message != null ? message.toString() : "{}";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void copy(Object groupId, Builder builder) {
        Set<String> groupSessions = this.sessions.get(groupId.toString());
        if (groupSessions == null || groupSessions.isEmpty()) {
            return;
        }
        for (String sessionId : groupSessions) {
            Queue<Builder> sessionQueue = this.list.get(sessionId);
            if (sessionQueue == null) continue;
            try {
                this.lock.lock();
                if (sessionQueue.offer(builder)) continue;
                logger.warning("Failed to copy message to session: " + sessionId);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    protected final void wakeup() {
    }

    protected String filter(String text) {
        return text;
    }

    @Override
    public String version() {
        return "Talk core version:1.0 stable; Released on 2023-07-24";
    }

    @Action(value="message/testing")
    public boolean testing(final int n) throws ApplicationException {
        this.sessions.put("[M001]", Set.of("{A}", "{B}"));
        this.groups.put("[M001]", new ArrayBlockingQueue(10));
        this.list.put("{A}", new ArrayDeque());
        this.list.put("{B}", new ArrayDeque());
        this.getService().execute(new Runnable(){
            int i = 0;

            @Override
            public void run() {
                while (this.i++ < n) {
                    try {
                        ApplicationManager.call("message/put/[M001]/{A}/A post " + this.i, null);
                        Thread.sleep(1L);
                    }
                    catch (ApplicationException e) {
                        logger.log(Level.SEVERE, e.getMessage(), e);
                    }
                    catch (InterruptedException e) {
                        logger.log(Level.SEVERE, e.getMessage(), e);
                    }
                }
            }
        });
        this.getService().execute(new Runnable(){
            int i = 0;

            @Override
            public void run() {
                while (this.i++ < n) {
                    try {
                        ApplicationManager.call("message/put/[M001]/{B}/B post " + this.i, null);
                        Thread.sleep(1L);
                    }
                    catch (ApplicationException e) {
                        logger.log(Level.SEVERE, e.getMessage(), e);
                    }
                    catch (InterruptedException e) {
                        logger.log(Level.SEVERE, e.getMessage(), e);
                    }
                }
            }
        });
        this.getService().execute(new Runnable(){

            @Override
            public void run() {
                System.out.println("[A] is started...");
                while (true) {
                    try {
                        while (true) {
                            System.out.println("**A**:" + String.valueOf(ApplicationManager.call("message/take/{A}", null)));
                            Thread.sleep(1L);
                        }
                    }
                    catch (ApplicationException e) {
                        logger.log(Level.SEVERE, e.getMessage(), e);
                        continue;
                    }
                    catch (InterruptedException e) {
                        logger.log(Level.SEVERE, e.getMessage(), e);
                        continue;
                    }
                    break;
                }
            }
        });
        this.getService().execute(new Runnable(){

            @Override
            public void run() {
                System.out.println("[B] is started...");
                while (true) {
                    try {
                        while (true) {
                            System.out.println("**B**:" + String.valueOf(ApplicationManager.call("message/take/{B}", null)));
                            Thread.sleep(1L);
                        }
                    }
                    catch (ApplicationException e) {
                        logger.log(Level.SEVERE, e.getMessage(), e);
                        continue;
                    }
                    catch (InterruptedException e) {
                        logger.log(Level.SEVERE, e.getMessage(), e);
                        continue;
                    }
                    break;
                }
            }
        });
        return true;
    }
}

