/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.streaming.client.IMessage;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class TopicPoller {
    BlockingQueue<List<IMessage>> queue;
    List<IMessage> cache = new ArrayList<IMessage>();

    public TopicPoller(BlockingQueue<List<IMessage>> queue) {
        this.queue = queue;
    }

    public void setQueue(BlockingQueue<List<IMessage>> queue) {
        this.queue = queue;
    }

    public ArrayList<IMessage> poll(long timeout) {
        return this.poll(timeout, 1);
    }

    public ArrayList<IMessage> poll(long timeout, int size) {
        if (size <= 0) {
            throw new IllegalArgumentException("Size must be greater than zero");
        }
        if (timeout < 0L) {
            throw new IllegalArgumentException("timeout must be greater than or equal to zero");
        }
        ArrayList<IMessage> list = new ArrayList<IMessage>(this.cache);
        this.cache.clear();
        if (timeout == 0L) {
            List tmp;
            while (list.size() < size && (tmp = (List)this.queue.poll()) != null) {
                list.addAll(tmp);
            }
        } else {
            LocalTime end = LocalTime.now().plusNanos(timeout * 1000000L);
            while (list.size() < size && LocalTime.now().isBefore(end)) {
                try {
                    long milliSeconds = ChronoUnit.MILLIS.between(LocalTime.now(), end);
                    if (milliSeconds > 0L) {
                        List<IMessage> tmp = this.queue.poll(milliSeconds, TimeUnit.MILLISECONDS);
                        if (tmp == null) continue;
                        list.addAll(tmp);
                        continue;
                    }
                    break;
                }
                catch (InterruptedException e) {
                    return list;
                }
            }
        }
        return list;
    }

    public IMessage take() {
        while (true) {
            if (!this.cache.isEmpty()) {
                IMessage message = this.cache.get(0);
                this.cache.remove(0);
                return message;
            }
            try {
                List<IMessage> tmp = this.queue.take();
                if (tmp == null) continue;
                this.cache.addAll(tmp);
            }
            catch (InterruptedException e) {
                return null;
            }
        }
    }
}

