/*
 * Decompiled with CFR 0.152.
 */
package com.github.ddth.queue.impl;

import com.github.ddth.kafka.KafkaClient;
import com.github.ddth.kafka.KafkaMessage;
import com.github.ddth.queue.IQueue;
import com.github.ddth.queue.IQueueMessage;
import java.util.Collection;
import java.util.Date;

public abstract class KafkaQueue
implements IQueue {
    private KafkaClient kafkaClient;
    private boolean myOwnKafkaClient = true;
    private String zkConnString = "localhost:2181/kafka";
    private String topicName = "ddth-queue";
    private KafkaClient.ProducerType producerType = KafkaClient.ProducerType.SYNC_LEADER_ACK;

    public KafkaClient.ProducerType getProducerType() {
        return this.producerType;
    }

    public KafkaQueue setProducerType(KafkaClient.ProducerType producerType) {
        this.producerType = producerType;
        return this;
    }

    public String getZkConnString() {
        return this.zkConnString;
    }

    public KafkaQueue setZkConnString(String zkConnString) {
        this.zkConnString = zkConnString;
        return this;
    }

    public String getTopicName() {
        return this.topicName;
    }

    public KafkaQueue setTopicName(String topicName) {
        this.topicName = topicName;
        return this;
    }

    protected KafkaClient getKafkaClient() {
        return this.kafkaClient;
    }

    public KafkaQueue setKafkaClient(KafkaClient kafkaClient) {
        this.kafkaClient = kafkaClient;
        this.myOwnKafkaClient = false;
        return this;
    }

    public KafkaQueue init() throws Exception {
        if (this.kafkaClient == null) {
            this.kafkaClient = new KafkaClient(this.zkConnString);
            this.kafkaClient.init();
            this.myOwnKafkaClient = true;
        }
        return this;
    }

    public void destroy() {
        if (this.kafkaClient != null && this.myOwnKafkaClient) {
            this.kafkaClient.destroy();
            this.kafkaClient = null;
        }
    }

    protected abstract byte[] serialize(IQueueMessage var1);

    protected abstract IQueueMessage deserialize(byte[] var1);

    protected boolean putToQueue(IQueueMessage msg) {
        byte[] msgData = this.serialize(msg);
        KafkaMessage kmsg = new KafkaMessage(this.topicName, msg.qId().toString(), msgData);
        this.kafkaClient.sendMessage(this.producerType, kmsg);
        return true;
    }

    @Override
    public boolean queue(IQueueMessage msg) {
        Date now = new Date();
        msg.qNumRequeues(0).qOriginalTimestamp(now).qTimestamp(now);
        return this.putToQueue(msg);
    }

    @Override
    public boolean requeue(IQueueMessage msg) {
        Date now = new Date();
        msg.qIncNumRequeues().qTimestamp(now);
        return this.putToQueue(msg);
    }

    @Override
    public boolean requeueSilent(IQueueMessage msg) {
        return this.putToQueue(msg);
    }

    @Override
    public void finish(IQueueMessage msg) {
    }

    @Override
    public IQueueMessage take() {
        throw new UnsupportedOperationException("Method [take] has not implemented yet!");
    }

    @Override
    public Collection<IQueueMessage> getOrphanMessages(long thresholdTimestampMs) {
        throw new UnsupportedOperationException("Method [getOrphanMessages] has not implemented yet!");
    }

    @Override
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage msg) {
        throw new UnsupportedOperationException("Method [moveFromEphemeralToQueueStorage] has not implemented yet!");
    }

    @Override
    public int queueSize() {
        return -1;
    }

    @Override
    public int ephemeralSize() {
        return -1;
    }
}

