/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.converter.GenericMessageConverter;
import com.github.sonus21.rqueue.converter.RqueueRedisSerializer;
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.models.enums.PubSubType;
import com.github.sonus21.rqueue.models.event.RqueuePubSubEvent;
import com.github.sonus21.rqueue.models.request.PauseUnpauseQueueRequest;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.SerializationUtils;
import com.github.sonus21.rqueue.utils.StringUtils;
import java.time.Duration;
import java.util.UUID;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.Topic;

public class RqueueInternalPubSubChannel
implements InitializingBean {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueInternalPubSubChannel.class);
    private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
    private final RqueueMessageListenerContainer rqueueMessageListenerContainer;
    private final RqueueConfig rqueueConfig;
    private final RqueueRedisTemplate<String> stringRqueueRedisTemplate;
    private final RqueueRedisSerializer rqueueRedisSerializer;
    private final RqueueBeanProvider rqueueBeanProvider;
    private GenericMessageConverter.SmartMessageSerDes smartMessageSerDes;

    public RqueueInternalPubSubChannel(RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, RqueueMessageListenerContainer rqueueMessageListenerContainer, RqueueConfig rqueueConfig, RqueueRedisTemplate<String> stringRqueueRedisTemplate, RqueueBeanProvider rqueueBeanProvider) {
        this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
        this.rqueueMessageListenerContainer = rqueueMessageListenerContainer;
        this.rqueueConfig = rqueueConfig;
        this.stringRqueueRedisTemplate = stringRqueueRedisTemplate;
        this.rqueueBeanProvider = rqueueBeanProvider;
        this.rqueueRedisSerializer = new RqueueRedisSerializer();
    }

    public void afterPropertiesSet() throws Exception {
        String channel = this.rqueueConfig.getInternalCommChannelName();
        InternalMessageListener messageListener = new InternalMessageListener();
        this.rqueueRedisListenerContainerFactory.addMessageListener(messageListener, (Topic)new ChannelTopic(channel));
        this.smartMessageSerDes = new GenericMessageConverter.SmartMessageSerDes(SerializationUtils.createObjectMapper());
    }

    public void emitPauseUnpauseQueueEvent(PauseUnpauseQueueRequest pauseUnpauseQueueRequest) {
        this.publish(PubSubType.PAUSE_QUEUE, pauseUnpauseQueueRequest);
    }

    private void publish(PubSubType type, Object message) {
        byte[] data = this.rqueueRedisSerializer.serialize(message);
        RqueuePubSubEvent event = new RqueuePubSubEvent(type, RqueueConfig.getBrokerId(), new String(data));
        this.stringRqueueRedisTemplate.getRedisTemplate().convertAndSend(this.rqueueConfig.getInternalCommChannelName(), (Object)event);
    }

    public void emitQueueConfigUpdateEvent(PauseUnpauseQueueRequest request) {
        this.publish(PubSubType.QUEUE_CRUD, request.getName());
    }

    class InternalMessageListener
    implements MessageListener {
        InternalMessageListener() {
        }

        public void onMessage(Message message, byte[] pattern) {
            byte[] body = message.getBody();
            if (SerializationUtils.isEmpty(body)) {
                log.error("Empty message received on channel: {}, pattern: {}", (Object)new String(message.getChannel()), (Object)new String(pattern));
                return;
            }
            this.processEvent(body);
        }

        private void processEvent(byte[] body) {
            log.debug("Message on internal channel {}", (Object)new String(body));
            RqueuePubSubEvent rqueuePubSubEvent = RqueueInternalPubSubChannel.this.smartMessageSerDes.deserialize(body, RqueuePubSubEvent.class);
            if (rqueuePubSubEvent == null) {
                log.error("Invalid message on pub-sub channel {}", (Object)new String(body));
                return;
            }
            switch (rqueuePubSubEvent.getType()) {
                case PAUSE_QUEUE: {
                    PauseUnpauseQueueRequest request = rqueuePubSubEvent.messageAs(RqueueInternalPubSubChannel.this.smartMessageSerDes, PauseUnpauseQueueRequest.class);
                    this.handlePauseEvent(request);
                    break;
                }
                case QUEUE_CRUD: {
                    String queue = rqueuePubSubEvent.messageAs(RqueueInternalPubSubChannel.this.smartMessageSerDes, String.class);
                    RqueueInternalPubSubChannel.this.rqueueBeanProvider.getRqueueSystemConfigDao().clearCacheByName(queue);
                    break;
                }
                default: {
                    log.error("Unknown event type {}", (Object)rqueuePubSubEvent);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handlePauseEvent(PauseUnpauseQueueRequest request) {
            if (request == null || StringUtils.isEmpty(request.getName())) {
                log.error("Invalid message payload {}", (Object)request);
                return;
            }
            String lockKey = Constants.getQueueCrudLockKey(RqueueInternalPubSubChannel.this.rqueueConfig, request.getName());
            String lockValue = UUID.randomUUID().toString();
            try {
                boolean acquired = RqueueInternalPubSubChannel.this.rqueueBeanProvider.getRqueueLockManager().acquireLock(lockKey, lockValue, Duration.ofMillis(100L));
                if (acquired) {
                    RqueueInternalPubSubChannel.this.rqueueMessageListenerContainer.pauseUnpauseQueue(request.getName(), request.isPause());
                }
            }
            finally {
                RqueueInternalPubSubChannel.this.rqueueBeanProvider.getRqueueLockManager().releaseLock(lockKey, lockValue);
            }
        }
    }
}

