/*
 * Decompiled with CFR 0.152.
 */
package org.tinystruct.transfer;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.tinystruct.AbstractApplication;
import org.tinystruct.ApplicationException;
import org.tinystruct.data.component.Builder;
import org.tinystruct.system.ApplicationManager;
import org.tinystruct.transfer.MessageQueue;

public class DistributedMessageQueue
extends AbstractApplication
implements MessageQueue<String> {
    protected static final int DEFAULT_MESSAGE_POOL_SIZE = 10;
    private static final long TIMEOUT = 10000L;
    protected final Map<String, BlockingQueue<Builder>> groups = new ConcurrentHashMap<String, BlockingQueue<Builder>>();
    protected final Map<String, Queue<Builder>> list = new ConcurrentHashMap<String, Queue<Builder>>();
    private final Lock lock = new ReentrantLock();
    private final Condition consumer = this.lock.newCondition();
    private ExecutorService service;

    public static void main(String[] args) throws ApplicationException {
        new DistributedMessageQueue().testing(100);
    }

    @Override
    public void init() {
        this.setAction("message/update", "take");
        this.setAction("message/save", "put");
        this.setAction("message/version", "version");
        this.setAction("message/testing", "testing");
        if (this.service != null) {
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

                @Override
                public void run() {
                    DistributedMessageQueue.this.service.shutdown();
                    while (true) {
                        try {
                            do {
                                System.out.println("Waiting for the service to terminate...");
                            } while (!DistributedMessageQueue.this.service.awaitTermination(5L, TimeUnit.SECONDS));
                            System.out.println("Service will be terminated soon.");
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                            continue;
                        }
                        break;
                    }
                }
            }));
        }
    }

    @Override
    public String put(Object groupId, String sessionId, String message) {
        if (groupId != null && message != null && !message.isEmpty()) {
            Builder builder = new Builder();
            builder.put("user", "user_" + sessionId);
            builder.put("time", System.nanoTime());
            builder.put("message", this.filter(message));
            builder.put("session_id", sessionId);
            return this.save(groupId, builder);
        }
        return "{}";
    }

    public final String save(final Object groupId, Builder builder) {
        if (this.groups.get(groupId) == null) {
            this.groups.put(groupId.toString(), new ArrayBlockingQueue(10));
        }
        try {
            this.groups.get(groupId).put(builder);
            final BlockingQueue<Builder> messages = this.groups.get(groupId);
            this.getService().execute(new Runnable(){

                @Override
                public void run() {
                    Builder message = (Builder)messages.poll();
                    if (message == null) {
                        return;
                    }
                    DistributedMessageQueue.this.copy(groupId, message);
                }
            });
            return builder.toString();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            return "{}";
        }
    }

    private ExecutorService getService() {
        return this.service != null ? this.service : new ThreadPoolExecutor(0, 10, 10000L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
    }

    @Override
    public final String take(String sessionId) throws ApplicationException {
        Queue<Builder> messages = this.list.get(sessionId);
        Builder message = messages.poll();
        if (message != null) {
            return message.toString();
        }
        long startTime = System.currentTimeMillis();
        while ((message = messages.poll()) == null && System.currentTimeMillis() - startTime <= 10000L) {
            this.lock.lock();
            try {
                this.consumer.await(10000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new ApplicationException(e.getMessage(), e);
            }
            finally {
                this.lock.unlock();
            }
        }
        return message != null ? message.toString() : "{}";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void copy(Object meetingCode, Builder builder) {
        Set<Map.Entry<String, Queue<Builder>>> set = this.list.entrySet();
        Iterator iterator = set.iterator();
        this.lock.lock();
        try {
            while (iterator.hasNext()) {
                Map.Entry list = (Map.Entry)iterator.next();
                ((Queue)list.getValue()).add(builder);
                this.consumer.signalAll();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    protected final void wakeup() {
        this.lock.lock();
        try {
            this.consumer.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    protected String filter(String text) {
        return text;
    }

    @Override
    public String version() {
        return "Talk core version:1.0 stable; Released on 2017-07-24";
    }

    public boolean testing(final int n) throws ApplicationException {
        this.groups.put("[M001]", new ArrayBlockingQueue(10));
        this.list.put("{A}", new ArrayDeque());
        this.list.put("{B}", new ArrayDeque());
        this.getService().execute(new Runnable(){

            @Override
            public void run() {
                int i = 0;
                while (i++ < n) {
                    try {
                        ApplicationManager.call("custom.application.talk/save/[M001]/{A}/A post " + i, null);
                        Thread.sleep(1L);
                    }
                    catch (ApplicationException e) {
                        e.printStackTrace();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        this.getService().execute(new Runnable(){

            @Override
            public void run() {
                int i = 0;
                while (i++ < n) {
                    try {
                        ApplicationManager.call("custom.application.talk/save/[M001]/{B}/B post " + i, null);
                        Thread.sleep(1L);
                    }
                    catch (ApplicationException e) {
                        e.printStackTrace();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        this.getService().execute(new Runnable(){

            @Override
            public void run() {
                System.out.println("[A] is started...");
                while (true) {
                    try {
                        while (true) {
                            System.out.println("**A**:" + ApplicationManager.call("custom.application.talk/update/{A}", null));
                            Thread.sleep(1L);
                        }
                    }
                    catch (ApplicationException e) {
                        e.printStackTrace();
                        continue;
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                        continue;
                    }
                    break;
                }
            }
        });
        this.getService().execute(new Runnable(){

            @Override
            public void run() {
                System.out.println("[B] is started...");
                while (true) {
                    try {
                        while (true) {
                            System.out.println("**B**:" + ApplicationManager.call("custom.application.talk/update/{B}", null));
                            Thread.sleep(1L);
                        }
                    }
                    catch (ApplicationException e) {
                        e.printStackTrace();
                        continue;
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                        continue;
                    }
                    break;
                }
            }
        });
        return true;
    }
}

