/*
 * Decompiled with CFR 0.152.
 */
package io.github.kbridge.listener;

import io.github.kbridge.props.ChannelNames;
import io.github.kbridge.transform.KafkaPayloadTransformer;
import io.github.kbridge.util.TransformerUtil;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(name={"spring.kafka.consumer.value-deserializer"}, havingValue="org.apache.kafka.common.serialization.StringDeserializer")
public class CommonStringKafkaListener {
    private static final Logger log = LoggerFactory.getLogger(CommonStringKafkaListener.class);
    @Autowired
    private TransformerUtil transformerUtil;
    @Autowired
    private MessagingTemplate messagingTemplate;

    @KafkaListener(topics={"${app.kafka.topics}"}, groupId="ami-topic-id", autoStartup="${app.kafka.enabled:false}", concurrency="${app.kafka.threds:3}")
    public void fromKafka(ConsumerRecord<String, String> consumerRecord) {
        String topic = consumerRecord.topic();
        List<KafkaPayloadTransformer> transformers = this.transformerUtil.findKafkaTransformer(topic);
        for (KafkaPayloadTransformer transformer : transformers) {
            Object tranformToMq = transformer.tranformStringToMq(consumerRecord);
            if (Objects.isNull(tranformToMq)) {
                throw new RuntimeException("KafkaPayloadTransformer.transferToMq or KafkaPayloadTransformer.transformStringToMq must be ovveride properly");
            }
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("kafkatopic", topic);
            headers.put("mqTopic", transformer.mqTopic());
            GenericMessage genericMessage = new GenericMessage(tranformToMq, headers);
            this.messagingTemplate.send(ChannelNames.TO_MQ.bean(), (Message)genericMessage);
            log.debug("sent to channel: {} ", (Object)ChannelNames.TO_MQ.bean());
        }
    }
}

