/*
 * Decompiled with CFR 0.152.
 */
package org.cometd.benchmark.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicStampedReference;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import org.cometd.bayeux.ChannelId;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.benchmark.Atomics;
import org.cometd.benchmark.BenchmarkHelper;
import org.cometd.benchmark.MonitoringQueuedThreadPool;
import org.cometd.benchmark.SystemTimer;
import org.cometd.client.BayeuxClient;
import org.cometd.client.ext.AckExtension;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.cometd.common.Jackson1JSONContextClient;
import org.cometd.websocket.client.JettyWebSocketTransport;
import org.cometd.websocket.client.WebSocketTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.ZeroMasker;

public class BayeuxLoadClient {
    private static final String ID_FIELD = "ID";
    private static final String START_FIELD = "start";
    private final SystemTimer systemTimer = SystemTimer.detect();
    private final Random random = new Random();
    private final BenchmarkHelper helper = new BenchmarkHelper();
    private final AtomicLong ids = new AtomicLong();
    private final List<LoadBayeuxClient> bayeuxClients = Collections.synchronizedList(new ArrayList());
    private final ConcurrentMap<String, ChannelId> channelIds = new ConcurrentHashMap<String, ChannelId>();
    private final ConcurrentMap<Integer, AtomicInteger> rooms = new ConcurrentHashMap<Integer, AtomicInteger>();
    private final AtomicLong start = new AtomicLong();
    private final AtomicLong end = new AtomicLong();
    private final AtomicLong responses = new AtomicLong();
    private final AtomicLong messages = new AtomicLong();
    private final AtomicLong minWallLatency = new AtomicLong();
    private final AtomicLong maxWallLatency = new AtomicLong();
    private final AtomicLong totWallLatency = new AtomicLong();
    private final AtomicLong minLatency = new AtomicLong();
    private final AtomicLong maxLatency = new AtomicLong();
    private final AtomicLong totLatency = new AtomicLong();
    private final AtomicStampedReference<String> maxTime = new AtomicStampedReference<Object>(null, 0);
    private final ConcurrentMap<Long, AtomicLong> wallLatencies = new ConcurrentHashMap<Long, AtomicLong>();
    private final Map<String, AtomicStampedReference<Long>> sendTimes = new ConcurrentHashMap<String, AtomicStampedReference<Long>>();
    private final Map<String, AtomicStampedReference<List<Long>>> arrivalTimes = new ConcurrentHashMap<String, AtomicStampedReference<List<Long>>>();
    private ScheduledExecutorService scheduler;
    private MonitoringQueuedThreadPool threadPool;
    private HttpClient httpClient;
    private WebSocketContainer webSocketContainer;
    private WebSocketClient webSocketClient;

    public static void main(String[] args) throws Exception {
        BayeuxLoadClient client = new BayeuxLoadClient();
        client.run();
    }

    public long getResponses() {
        return this.responses.get();
    }

    public long getMessages() {
        return this.messages.get();
    }

    public void run() throws Exception {
        System.err.println("detecting timer resolution...");
        System.err.printf("native timer resolution: %d \u00b5s%n", this.systemTimer.getNativeResolution());
        System.err.printf("emulated timer resolution: %d \u00b5s%n", this.systemTimer.getEmulatedResolution());
        System.err.println();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        String host = System.getProperty("cometd.server", "localhost");
        System.err.printf("server [%s]: ", host);
        String value = console.readLine().trim();
        if (value.length() == 0) {
            value = host;
        }
        host = value;
        int port = Integer.parseInt(System.getProperty("cometd.port", "8080"));
        System.err.printf("port [%d]: ", port);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = String.valueOf(port);
        }
        port = Integer.parseInt(value);
        ClientTransportType clientTransportType = ClientTransportType.LONG_POLLING;
        System.err.printf("transports:%n", new Object[0]);
        for (ClientTransportType type : ClientTransportType.values()) {
            System.err.printf("  %d - %s%n", type.ordinal(), type.getName());
        }
        System.err.printf("transport [%d]: ", clientTransportType.ordinal());
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = String.valueOf(clientTransportType.ordinal());
        }
        clientTransportType = ClientTransportType.values()[Integer.parseInt(value)];
        boolean ssl = false;
        System.err.printf("use ssl [%b]: ", ssl);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = String.valueOf(ssl);
        }
        ssl = Boolean.parseBoolean(value);
        int maxThreads = Integer.parseInt(System.getProperty("cometd.threads", "256"));
        System.err.printf("max threads [%d]: ", maxThreads);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = String.valueOf(maxThreads);
        }
        maxThreads = Integer.parseInt(value);
        String contextPath = "/cometd";
        System.err.printf("context [%s]: ", contextPath);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = contextPath;
        }
        String uri = value + "/cometd";
        String url = (ssl ? "https" : "http") + "://" + host + ":" + port + uri;
        String channel = System.getProperty("cometd.channel", "/chat/demo");
        System.err.printf("channel [%s]: ", channel);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = channel;
        }
        channel = value;
        int rooms = Integer.parseInt(System.getProperty("cometd.rooms", "100"));
        System.err.printf("rooms [%d]: ", rooms);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = String.valueOf(rooms);
        }
        rooms = Integer.parseInt(value);
        int roomsPerClient = 10;
        System.err.printf("rooms per client [%d]: ", roomsPerClient);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = String.valueOf(roomsPerClient);
        }
        roomsPerClient = Integer.parseInt(value);
        boolean recordLatencyDetails = true;
        System.err.printf("record latency details [%b]: ", recordLatencyDetails);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = String.valueOf(recordLatencyDetails);
        }
        recordLatencyDetails = Boolean.parseBoolean(value);
        boolean enableAckExtension = false;
        System.err.printf("enable ack extension [%b]: ", enableAckExtension);
        value = console.readLine().trim();
        if (value.length() == 0) {
            value = String.valueOf(enableAckExtension);
        }
        enableAckExtension = Boolean.parseBoolean(value);
        this.scheduler = Executors.newScheduledThreadPool(8);
        MBeanContainer mbeanContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
        mbeanContainer.beanAdded(null, (Object)this);
        this.threadPool = new MonitoringQueuedThreadPool(maxThreads);
        this.threadPool.setDaemon(true);
        this.threadPool.start();
        mbeanContainer.beanAdded(null, (Object)this.threadPool);
        this.httpClient = new HttpClient();
        this.httpClient.addBean((Object)mbeanContainer);
        this.httpClient.setMaxConnectionsPerDestination(60000);
        this.httpClient.setMaxRequestsQueuedPerDestination(10000);
        this.httpClient.setExecutor((Executor)this.threadPool);
        this.httpClient.setIdleTimeout(10000L);
        this.httpClient.start();
        mbeanContainer.beanAdded(null, (Object)this.httpClient);
        this.webSocketContainer = ContainerProvider.getWebSocketContainer();
        this.httpClient.addBean((Object)this.webSocketContainer, true);
        mbeanContainer.beanAdded(null, (Object)this.webSocketContainer);
        this.webSocketClient = new WebSocketClient();
        this.webSocketClient.setExecutor((Executor)this.threadPool);
        this.webSocketClient.setMasker((Masker)new ZeroMasker());
        this.webSocketClient.getPolicy().setInputBufferSize(8192);
        this.webSocketClient.addBean((Object)mbeanContainer);
        this.webSocketClient.start();
        mbeanContainer.beanAdded(null, (Object)this.webSocketClient);
        HandshakeListener handshakeListener = new HandshakeListener(channel, rooms, roomsPerClient);
        DisconnectListener disconnectListener = new DisconnectListener();
        LatencyListener latencyListener = new LatencyListener(recordLatencyDetails);
        LoadBayeuxClient statsClient = new LoadBayeuxClient(url, this.scheduler, this.newClientTransport(clientTransportType), null, false);
        statsClient.handshake();
        int clients = 100;
        int batchCount = 1000;
        int batchSize = 10;
        long batchPause = 10000L;
        int messageSize = 50;
        boolean randomize = false;
        while (true) {
            int maxRetries;
            int i;
            System.err.println("-----");
            System.err.printf("clients [%d]: ", clients);
            value = console.readLine();
            if (value == null) break;
            if ((value = value.trim()).length() == 0) {
                value = String.valueOf(clients);
            }
            clients = Integer.parseInt(value);
            System.err.println("Waiting for clients to be ready...");
            int currentClients = this.bayeuxClients.size();
            if (currentClients < clients) {
                for (i = 0; i < clients - currentClients; ++i) {
                    LoadBayeuxClient client = new LoadBayeuxClient(url, this.scheduler, this.newClientTransport(clientTransportType), latencyListener, enableAckExtension);
                    client.getChannel("/meta/handshake").addListener((ClientSessionChannel.ClientSessionChannelListener)handshakeListener);
                    client.getChannel("/meta/disconnect").addListener((ClientSessionChannel.ClientSessionChannelListener)disconnectListener);
                    client.handshake();
                    if (i % 10 != 0) continue;
                    Thread.sleep(50L);
                }
            } else if (currentClients > clients) {
                for (i = 0; i < currentClients - clients; ++i) {
                    LoadBayeuxClient client = this.bayeuxClients.get(currentClients - i - 1);
                    client.disconnect();
                }
            }
            int retries = maxRetries = 50;
            int lastSize = 0;
            int currentSize = this.bayeuxClients.size();
            while (currentSize != clients) {
                Thread.sleep(250L);
                System.err.printf("Waiting for clients %d/%d%n", currentSize, clients);
                if (lastSize == currentSize) {
                    if (--retries == 0) {
                        break;
                    }
                } else {
                    lastSize = currentSize;
                    retries = maxRetries;
                }
                currentSize = this.bayeuxClients.size();
            }
            if (currentSize != clients) {
                System.err.printf("Clients not ready, only %d/%d%n", currentSize, clients);
                break;
            }
            if (currentSize == 0) {
                System.err.println("All clients disconnected, exiting");
                break;
            }
            System.err.printf("Clients ready: %d%n", clients);
            this.reset();
            System.err.printf("batch count [%d]: ", batchCount);
            value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(batchCount);
            }
            batchCount = Integer.parseInt(value);
            System.err.printf("batch size [%d]: ", batchSize);
            value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(batchSize);
            }
            batchSize = Integer.parseInt(value);
            System.err.printf("batch pause (\u00b5s) [%d]: ", batchPause);
            value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(batchPause);
            }
            batchPause = Long.parseLong(value);
            System.err.printf("message size [%d]: ", messageSize);
            value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(messageSize);
            }
            messageSize = Integer.parseInt(value);
            String chat = "";
            for (int i2 = 0; i2 < messageSize; ++i2) {
                chat = chat + "x";
            }
            System.err.printf("randomize sends [%b]: ", randomize);
            value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(randomize);
            }
            randomize = Boolean.parseBoolean(value);
            statsClient.begin();
            this.helper.startStatistics();
            System.err.printf("Testing %d clients in %d rooms, %d rooms/client%n", this.bayeuxClients.size(), rooms, roomsPerClient);
            System.err.printf("Sending %d batches of %dx%d bytes messages every %d \u00b5s%n", batchCount, batchSize, messageSize, batchPause);
            long start = System.nanoTime();
            long expected = this.runBatches(batchCount, batchSize, batchPause, chat, randomize, channel);
            long end = System.nanoTime();
            this.helper.stopStatistics();
            long elapsedNanos = end - start;
            if (elapsedNanos > 0L) {
                System.err.printf("Outgoing: Elapsed = %d ms | Rate = %d messages/s - %d requests/s - ~%.3f Mib/s%n", TimeUnit.NANOSECONDS.toMillis(elapsedNanos), (long)(batchCount * batchSize) * 1000L * 1000L * 1000L / elapsedNanos, (long)batchCount * 1000L * 1000L * 1000L / elapsedNanos, Float.valueOf((float)(batchCount * batchSize * messageSize) * 8.0f * 1000.0f * 1000.0f * 1000.0f / (float)elapsedNanos / 1024.0f / 1024.0f));
            }
            this.waitForMessages(expected);
            statsClient.end();
            this.printReport(expected, messageSize);
            this.reset();
        }
        statsClient.disconnect(1000L);
        this.webSocketClient.stop();
        this.httpClient.stop();
        this.threadPool.stop();
        this.scheduler.shutdown();
        this.scheduler.awaitTermination(1000L, TimeUnit.MILLISECONDS);
    }

    private long runBatches(int batchCount, int batchSize, long batchPause, String chat, boolean randomize, String channel) {
        int clientIndex = -1;
        long expected = 0L;
        for (int i = 0; i < batchCount; ++i) {
            if (randomize) {
                clientIndex = this.nextRandom(this.bayeuxClients.size());
            } else if (++clientIndex >= this.bayeuxClients.size()) {
                clientIndex = 0;
            }
            LoadBayeuxClient client = this.bayeuxClients.get(clientIndex);
            expected += this.sendBatches(batchSize, batchPause, chat, channel, client);
        }
        return expected;
    }

    private long sendBatches(int batchSize, long batchPause, String chat, String channel, LoadBayeuxClient client) {
        long expected = 0L;
        client.startBatch();
        for (int b = 0; b < batchSize; ++b) {
            int room = -1;
            AtomicInteger clientsPerRoom = null;
            while (clientsPerRoom == null || clientsPerRoom.get() == 0) {
                room = this.nextRandom(this.rooms.size());
                clientsPerRoom = (AtomicInteger)this.rooms.get(room);
            }
            HashMap<String, Object> message = new HashMap<String, Object>(5);
            message.put("room", room);
            message.put("user", ((Object)((Object)client)).hashCode());
            message.put("chat", chat);
            message.put(START_FIELD, System.nanoTime());
            message.put(ID_FIELD, String.valueOf(this.ids.incrementAndGet()));
            ClientSessionChannel clientChannel = client.getChannel(this.getChannelId(channel + "/" + room));
            clientChannel.publish(message);
            clientChannel.release();
            expected += (long)clientsPerRoom.get();
        }
        client.endBatch();
        if (batchPause > 0L) {
            this.systemTimer.sleep(batchPause);
        }
        return expected;
    }

    private ClientTransport newClientTransport(ClientTransportType clientTransportType) {
        switch (clientTransportType) {
            case LONG_POLLING: {
                HashMap<String, Object> options = new HashMap<String, Object>();
                options.put("jsonContext", new Jackson1JSONContextClient());
                options.put("maxNetworkDelay", 5000L);
                return new LongPollingTransport(options, this.httpClient);
            }
            case JSR_WEBSOCKET: {
                HashMap<String, Object> options = new HashMap<String, Object>();
                options.put("jsonContext", new Jackson1JSONContextClient());
                options.put("maxNetworkDelay", 5000L);
                options.put("idleTimeout", 20000L + this.httpClient.getIdleTimeout());
                return new WebSocketTransport(options, this.scheduler, this.webSocketContainer);
            }
            case JETTY_WEBSOCKET: {
                HashMap<String, Object> options = new HashMap<String, Object>();
                options.put("jsonContext", new Jackson1JSONContextClient());
                options.put("maxNetworkDelay", 5000L);
                options.put("idleTimeout", 20000L + this.httpClient.getIdleTimeout());
                return new JettyWebSocketTransport(options, this.scheduler, this.webSocketClient);
            }
        }
        throw new IllegalArgumentException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int nextRandom(int limit) {
        BayeuxLoadClient bayeuxLoadClient = this;
        synchronized (bayeuxLoadClient) {
            return this.random.nextInt(limit);
        }
    }

    private void updateLatencies(long startTime, long sendTime, long arrivalTime, long endTime, boolean recordDetails) {
        long wallLatency = TimeUnit.MICROSECONDS.toNanos(TimeUnit.NANOSECONDS.toMicros(endTime - startTime));
        long latency = TimeUnit.MICROSECONDS.toNanos(TimeUnit.NANOSECONDS.toMicros(arrivalTime - sendTime));
        Atomics.updateMin((AtomicLong)this.minWallLatency, (long)wallLatency);
        Atomics.updateMax((AtomicLong)this.maxWallLatency, (long)wallLatency);
        this.totWallLatency.addAndGet(wallLatency);
        Atomics.updateMin((AtomicLong)this.minLatency, (long)latency);
        Atomics.updateMax((AtomicLong)this.maxLatency, (long)latency);
        this.totLatency.addAndGet(latency);
        if (recordDetails) {
            AtomicLong count = (AtomicLong)this.wallLatencies.get(wallLatency);
            if (count == null) {
                count = new AtomicLong();
                AtomicLong existing = this.wallLatencies.putIfAbsent(wallLatency, count);
                if (existing != null) {
                    count = existing;
                }
            }
            count.incrementAndGet();
        }
    }

    private boolean waitForMessages(long expected) throws InterruptedException {
        int maxRetries;
        long arrived = this.messages.get();
        long lastArrived = 0L;
        int retries = maxRetries = 20;
        while (arrived < expected) {
            System.err.printf("Waiting for messages to arrive %d/%d%n", arrived, expected);
            Thread.sleep(500L);
            if (lastArrived == arrived) {
                if (--retries == 0) {
                    break;
                }
            } else {
                lastArrived = arrived;
                retries = maxRetries;
            }
            arrived = this.messages.get();
        }
        if (arrived < expected) {
            System.err.printf("Interrupting wait for messages %d/%d%n", arrived, expected);
            return false;
        }
        System.err.printf("All messages arrived %d/%d%n", arrived, expected);
        return true;
    }

    public void printReport(long expectedCount, int messageSize) {
        long messageCount = this.messages.get();
        System.err.printf("Messages - Success/Expected = %d/%d%n", messageCount, expectedCount);
        long elapsedNanos = this.end.get() - this.start.get();
        if (elapsedNanos > 0L) {
            System.err.printf("Incoming - Elapsed = %d ms | Rate = %d messages/s - %d responses/s(%.2f%%) - ~%.3f Mib/s%n", TimeUnit.NANOSECONDS.toMillis(elapsedNanos), messageCount * 1000L * 1000L * 1000L / elapsedNanos, this.responses.get() * 1000L * 1000L * 1000L / elapsedNanos, Float.valueOf(100.0f * (float)this.responses.get() / (float)messageCount), Float.valueOf((float)(messageCount * (long)messageSize) * 8.0f * 1000.0f * 1000.0f * 1000.0f / (float)elapsedNanos / 1024.0f / 1024.0f));
        }
        if (this.wallLatencies.size() > 1) {
            TreeMap<Long, AtomicLong> sortedWallLatencies = new TreeMap<Long, AtomicLong>(this.wallLatencies);
            this.wallLatencies.clear();
            long messages = 0L;
            long maxLatencyBucketFrequency = 0L;
            long previousLatency = 0L;
            long latencyAt50thPercentile = 0L;
            long latencyAt99thPercentile = 0L;
            long[] latencyBucketFrequencies = new long[20];
            long minWallLatency = this.minWallLatency.get();
            long latencyRange = this.maxWallLatency.get() - minWallLatency;
            Iterator entries = sortedWallLatencies.entrySet().iterator();
            while (entries.hasNext()) {
                Map.Entry entry = entries.next();
                long latency = (Long)entry.getKey();
                Long bucketIndex = latencyRange == 0L ? 0L : (latency - minWallLatency) * (long)latencyBucketFrequencies.length / latencyRange;
                int index = bucketIndex.intValue() == latencyBucketFrequencies.length ? latencyBucketFrequencies.length - 1 : bucketIndex.intValue();
                long value = ((AtomicLong)entry.getValue()).get();
                messages += value;
                int n = index;
                latencyBucketFrequencies[n] = latencyBucketFrequencies[n] + value;
                if (latencyBucketFrequencies[index] > maxLatencyBucketFrequency) {
                    maxLatencyBucketFrequency = latencyBucketFrequencies[index];
                }
                if (latencyAt50thPercentile == 0L && messages > messageCount / 2L) {
                    latencyAt50thPercentile = (previousLatency + latency) / 2L;
                }
                if (latencyAt99thPercentile == 0L && messages > messageCount - messageCount / 100L) {
                    latencyAt99thPercentile = (previousLatency + latency) / 2L;
                }
                previousLatency = latency;
                entries.remove();
            }
            if (messages != messageCount) {
                System.err.printf("Counted messages (%d) != Latency messages sum (%d)%n", messageCount, messages);
            }
            System.err.println("Messages - Wall Latency Distribution Curve (X axis: Frequency, Y axis: Latency):");
            double percentile = 0.0;
            for (int i = 0; i < latencyBucketFrequencies.length; ++i) {
                int j;
                int value;
                long latencyBucketFrequency = latencyBucketFrequencies[i];
                int n = value = maxLatencyBucketFrequency == 0L ? 0 : Math.round((float)latencyBucketFrequency * (float)latencyBucketFrequencies.length / (float)maxLatencyBucketFrequency);
                if (value == latencyBucketFrequencies.length) {
                    --value;
                }
                for (j = 0; j < value; ++j) {
                    System.err.print(" ");
                }
                System.err.print("@");
                for (j = value + 1; j < latencyBucketFrequencies.length; ++j) {
                    System.err.print(" ");
                }
                System.err.print("  _  ");
                double percentage = 100.0 * (double)latencyBucketFrequency / (double)messages;
                System.err.print(TimeUnit.NANOSECONDS.toMillis(latencyRange * (long)(i + 1) / (long)latencyBucketFrequencies.length + minWallLatency));
                System.err.printf(" ms (%d, %.2f%%)", latencyBucketFrequency, percentage);
                double last = percentile;
                percentile += percentage;
                if (last < 50.0 && percentile >= 50.0) {
                    System.err.print(" ^50%");
                }
                if (last < 85.0 && percentile >= 85.0) {
                    System.err.print(" ^85%");
                }
                if (last < 95.0 && percentile >= 95.0) {
                    System.err.print(" ^95%");
                }
                if (last < 99.0 && percentile >= 99.0) {
                    System.err.print(" ^99%");
                }
                if (last < 99.9 && percentile >= 99.9) {
                    System.err.print(" ^99.9%");
                }
                System.err.println();
            }
            System.err.printf("Slowest Message ID = %s time = %d ms%n", this.maxTime.getReference(), this.maxTime.getStamp());
            System.err.printf("Messages - Wall Latency 50th%%/99th%% = %d/%d ms%n", TimeUnit.NANOSECONDS.toMillis(latencyAt50thPercentile), TimeUnit.NANOSECONDS.toMillis(latencyAt99thPercentile));
        }
        System.err.printf("Messages - Wall Latency Min/Ave/Max = %d/%d/%d ms%n", TimeUnit.NANOSECONDS.toMillis(this.minWallLatency.get()), messageCount == 0L ? -1L : TimeUnit.NANOSECONDS.toMillis(this.totWallLatency.get() / messageCount), TimeUnit.NANOSECONDS.toMillis(this.maxWallLatency.get()));
        System.err.printf("Messages - Network Latency Min/Ave/Max = %d/%d/%d ms%n", TimeUnit.NANOSECONDS.toMillis(this.minLatency.get()), messageCount == 0L ? -1L : TimeUnit.NANOSECONDS.toMillis(this.totLatency.get() / messageCount), TimeUnit.NANOSECONDS.toMillis(this.maxLatency.get()));
        System.err.printf("Thread Pool - Tasks = %d | Concurrent Threads max = %d | Queue Size max = %d | Queue Latency avg/max = %d/%d ms | Task Latency avg/max = %d/%d ms%n", this.threadPool.getTasks(), this.threadPool.getMaxActiveThreads(), this.threadPool.getMaxQueueSize(), TimeUnit.NANOSECONDS.toMillis(this.threadPool.getAverageQueueLatency()), TimeUnit.NANOSECONDS.toMillis(this.threadPool.getMaxQueueLatency()), TimeUnit.NANOSECONDS.toMillis(this.threadPool.getAverageTaskLatency()), TimeUnit.NANOSECONDS.toMillis(this.threadPool.getMaxTaskLatency()));
    }

    private void reset() {
        this.threadPool.reset();
        this.start.set(0L);
        this.end.set(0L);
        this.responses.set(0L);
        this.messages.set(0L);
        this.minWallLatency.set(Long.MAX_VALUE);
        this.maxWallLatency.set(0L);
        this.totWallLatency.set(0L);
        this.minLatency.set(Long.MAX_VALUE);
        this.maxLatency.set(0L);
        this.totLatency.set(0L);
        this.maxTime.set(null, 0);
        this.wallLatencies.clear();
        this.sendTimes.clear();
        this.arrivalTimes.clear();
    }

    private ChannelId getChannelId(String channelName) {
        ChannelId existing;
        ChannelId result = (ChannelId)this.channelIds.get(channelName);
        if (result == null && (existing = this.channelIds.putIfAbsent(channelName, result = new ChannelId(channelName))) != null) {
            result = existing;
        }
        return result;
    }

    private static enum ClientTransportType {
        LONG_POLLING("long-polling"),
        JSR_WEBSOCKET("jsr-websocket"),
        JETTY_WEBSOCKET("jetty-websocket");

        private final String name;

        private ClientTransportType(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }
    }

    private class LoadBayeuxClient
    extends BayeuxClient {
        private final List<Integer> subscriptions;
        private final ClientSessionChannel.MessageListener latencyListener;

        private LoadBayeuxClient(String url, ScheduledExecutorService scheduler, ClientTransport transport, ClientSessionChannel.MessageListener listener, boolean enableAckExtension) {
            super(url, scheduler, transport, new ClientTransport[0]);
            this.subscriptions = new ArrayList<Integer>();
            this.latencyListener = listener;
            if (enableAckExtension) {
                this.addExtension((ClientSession.Extension)new AckExtension());
            }
        }

        public void init(String channel, int room) {
            AtomicInteger clientsPerRoom;
            if (this.latencyListener != null) {
                this.getChannel(BayeuxLoadClient.this.getChannelId(channel + "/" + room)).subscribe(this.latencyListener);
            }
            if ((clientsPerRoom = (AtomicInteger)BayeuxLoadClient.this.rooms.get(room)) == null) {
                clientsPerRoom = new AtomicInteger();
                AtomicInteger existing = BayeuxLoadClient.this.rooms.putIfAbsent(room, clientsPerRoom);
                if (existing != null) {
                    clientsPerRoom = existing;
                }
            }
            clientsPerRoom.incrementAndGet();
            this.subscriptions.add(room);
        }

        public void destroy() {
            for (Integer room : this.subscriptions) {
                AtomicInteger clientsPerRoom = (AtomicInteger)BayeuxLoadClient.this.rooms.get(room);
                clientsPerRoom.decrementAndGet();
            }
            this.subscriptions.clear();
        }

        public void begin() throws InterruptedException {
            this.notifyServer("/service/statistics/start");
        }

        public void end() throws InterruptedException {
            this.notifyServer("/service/statistics/stop");
        }

        private void notifyServer(String channelName) throws InterruptedException {
            final CountDownLatch latch = new CountDownLatch(1);
            ClientSessionChannel channel = this.getChannel(channelName);
            channel.publish(new HashMap(1), new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    latch.countDown();
                }
            });
            latch.await();
        }

        public void onSending(List<? extends Message> messages) {
            long now = System.nanoTime();
            for (Message message : messages) {
                Map data = message.getDataAsMap();
                if (data == null || !message.getChannelId().isBroadcast()) continue;
                int room = (Integer)data.get("room");
                int clientsInRoom = ((AtomicInteger)BayeuxLoadClient.this.rooms.get(room)).get();
                String id = (String)data.get(BayeuxLoadClient.ID_FIELD);
                BayeuxLoadClient.this.sendTimes.put(id, new AtomicStampedReference<Long>(now, clientsInRoom));
                BayeuxLoadClient.this.arrivalTimes.put(id, new AtomicStampedReference(Collections.synchronizedList(new LinkedList()), clientsInRoom));
            }
        }

        public void onMessages(List<Message.Mutable> messages) {
            long now = System.nanoTime();
            boolean response = false;
            for (Message message : messages) {
                Map data = message.getDataAsMap();
                if (data == null) continue;
                response = true;
                String id = (String)data.get(BayeuxLoadClient.ID_FIELD);
                ((List)((AtomicStampedReference)BayeuxLoadClient.this.arrivalTimes.get(id)).getReference()).add(now);
            }
            if (response) {
                BayeuxLoadClient.this.responses.incrementAndGet();
            }
        }
    }

    private class LatencyListener
    implements ClientSessionChannel.MessageListener {
        private final boolean recordDetails;

        public LatencyListener(boolean recordDetails) {
            this.recordDetails = recordDetails;
        }

        public void onMessage(ClientSessionChannel channel, Message message) {
            long arrivalTime;
            long sendTime;
            String id;
            long endTime;
            long startTime;
            Map data = message.getDataAsMap();
            if (data != null) {
                startTime = ((Number)data.get(BayeuxLoadClient.START_FIELD)).longValue();
                endTime = System.nanoTime();
                if (BayeuxLoadClient.this.start.get() == 0L) {
                    BayeuxLoadClient.this.start.set(endTime);
                }
                BayeuxLoadClient.this.end.set(endTime);
                BayeuxLoadClient.this.messages.incrementAndGet();
                id = (String)data.get(BayeuxLoadClient.ID_FIELD);
                AtomicStampedReference sendTimeRef = (AtomicStampedReference)BayeuxLoadClient.this.sendTimes.get(id);
                sendTime = (Long)sendTimeRef.getReference();
                if (Atomics.decrement((AtomicStampedReference)sendTimeRef) == 0) {
                    BayeuxLoadClient.this.sendTimes.remove(id);
                }
                AtomicStampedReference arrivalTimeRef = (AtomicStampedReference)BayeuxLoadClient.this.arrivalTimes.get(id);
                arrivalTime = (Long)((List)arrivalTimeRef.getReference()).remove(0);
                if (Atomics.decrement((AtomicStampedReference)arrivalTimeRef) == 0) {
                    BayeuxLoadClient.this.arrivalTimes.remove(id);
                }
            } else {
                throw new IllegalStateException("No 'data' field in message " + message);
            }
            long delayMs = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
            Atomics.updateMax((AtomicStampedReference)BayeuxLoadClient.this.maxTime, (Object)id, (int)((int)delayMs));
            BayeuxLoadClient.this.updateLatencies(startTime, sendTime, arrivalTime, endTime, this.recordDetails);
        }
    }

    private class DisconnectListener
    implements ClientSessionChannel.MessageListener {
        private DisconnectListener() {
        }

        public void onMessage(ClientSessionChannel channel, Message message) {
            if (message.isSuccessful()) {
                LoadBayeuxClient client = (LoadBayeuxClient)channel.getSession();
                BayeuxLoadClient.this.bayeuxClients.remove((Object)client);
                client.destroy();
            }
        }
    }

    private class HandshakeListener
    implements ClientSessionChannel.MessageListener {
        private static final String SESSION_ID_ATTRIBUTE = "handshook";
        private final String channel;
        private final int rooms;
        private final int roomsPerClient;

        private HandshakeListener(String channel, int rooms, int roomsPerClient) {
            this.channel = channel;
            this.rooms = rooms;
            this.roomsPerClient = roomsPerClient;
        }

        public void onMessage(ClientSessionChannel channel, Message message) {
            if (message.isSuccessful()) {
                final LoadBayeuxClient client = (LoadBayeuxClient)channel.getSession();
                String sessionId = (String)client.getAttribute(SESSION_ID_ATTRIBUTE);
                if (sessionId == null) {
                    client.setAttribute(SESSION_ID_ATTRIBUTE, client.getId());
                    BayeuxLoadClient.this.bayeuxClients.add(client);
                    client.batch(new Runnable(){

                        @Override
                        public void run() {
                            ArrayList<Integer> roomsSubscribedTo = new ArrayList<Integer>();
                            for (int j = 0; j < HandshakeListener.this.roomsPerClient; ++j) {
                                int room = BayeuxLoadClient.this.nextRandom(HandshakeListener.this.rooms);
                                while (roomsSubscribedTo.contains(room)) {
                                    room = BayeuxLoadClient.this.nextRandom(HandshakeListener.this.rooms);
                                }
                                roomsSubscribedTo.add(room);
                                client.init(HandshakeListener.this.channel, room);
                            }
                        }
                    });
                } else {
                    System.err.printf("Second handshake for client %s: old session %s, new session %s%n", this, sessionId, client.getId());
                }
            }
        }
    }
}

