/*
 * Decompiled with CFR 0.152.
 */
package io.alauda.devops.java.client.extend.workqueue;

import io.alauda.devops.java.client.extend.workqueue.DefaultWorkQueue;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultWorkQueueTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultWorkQueueTest.class);

    @Test
    public void testMultiProducerAndConsumers() throws Exception {
        DefaultWorkQueue queue = new DefaultWorkQueue();
        int producerCount = 10;
        int consumerCount = 5;
        CountDownLatch producerLatch = new CountDownLatch(10);
        int i = 0;
        while (i < 10) {
            int num = i++;
            Thread t = new Thread(() -> {
                try {
                    for (int j = 0; j < 50; ++j) {
                        queue.add((Object)String.valueOf(num));
                        Thread.sleep(10L);
                    }
                }
                catch (Exception exception) {
                }
                finally {
                    producerLatch.countDown();
                }
            });
            t.start();
        }
        CountDownLatch consumerLatch = new CountDownLatch(5);
        int i2 = 0;
        while (i2 < 5) {
            int num = i2++;
            Thread t = new Thread(() -> {
                try {
                    while (true) {
                        String item = (String)queue.get();
                        Assert.assertNotEquals((String)"Got an item added after shutdown.", (Object)"added after shutdown!", (Object)item);
                        if (item == null) {
                            return;
                        }
                        LOGGER.info("Worker {}: begin processing {}", (Object)num, (Object)item);
                        Thread.sleep(50L);
                        LOGGER.info("Worker {}: done processing {}", (Object)num, (Object)item);
                        queue.done((Object)item);
                        continue;
                        break;
                    }
                }
                catch (Exception exception) {
                    return;
                }
                finally {
                    consumerLatch.countDown();
                }
            });
            t.start();
        }
        producerLatch.await();
        queue.shutDown();
        queue.add((Object)"added after shutdown!");
        consumerLatch.await();
    }

    @Test
    public void testAddWhileProcessing() throws Exception {
        Thread t;
        DefaultWorkQueue queue = new DefaultWorkQueue();
        int producerCount = 10;
        int consumerCount = 5;
        CountDownLatch producerLatch = new CountDownLatch(10);
        int i = 0;
        while (i < 10) {
            int num = i++;
            t = new Thread(() -> {
                queue.add((Object)String.valueOf(num));
                producerLatch.countDown();
            });
            t.start();
        }
        CountDownLatch consumerLatch = new CountDownLatch(5);
        for (int i2 = 0; i2 < 5; ++i2) {
            t = new Thread(() -> {
                HashMap<String, Integer> counters = new HashMap<String, Integer>();
                try {
                    while (true) {
                        String item;
                        if ((item = (String)queue.get()) == null) {
                            return;
                        }
                        counters.putIfAbsent(item, 1);
                        counters.computeIfPresent(item, (s, integer) -> (Integer)counters.get(s) + 1);
                        if ((Integer)counters.get(item) < 2) {
                            queue.add((Object)item);
                        }
                        queue.done((Object)item);
                        continue;
                        break;
                    }
                }
                catch (Exception exception) {
                    return;
                }
                finally {
                    consumerLatch.countDown();
                }
            });
            t.start();
        }
        producerLatch.await();
        queue.shutDown();
        consumerLatch.await();
    }

    @Test
    public void testLen() throws Exception {
        DefaultWorkQueue queue = new DefaultWorkQueue();
        queue.add((Object)"foo");
        Assert.assertEquals((long)1L, (long)queue.length());
        queue.add((Object)"bar");
        Assert.assertEquals((long)2L, (long)queue.length());
        queue.add((Object)"foo");
        Assert.assertEquals((long)2L, (long)queue.length());
    }

    @Test
    public void testReinsert() throws Exception {
        DefaultWorkQueue queue = new DefaultWorkQueue();
        queue.add((Object)"foo");
        String item = (String)queue.get();
        Assert.assertEquals((Object)"foo", (Object)item);
        queue.add((Object)item);
        queue.done((Object)item);
        item = (String)queue.get();
        Assert.assertEquals((Object)"foo", (Object)item);
        queue.done((Object)item);
        Assert.assertEquals((long)0L, (long)queue.length());
    }
}

