/*
 * Decompiled with CFR 0.152.
 */
package io.github.devlibx.easy.testing.kafka;

import ch.qos.logback.classic.Level;
import io.gitbub.devlibx.easy.helper.LoggingHelper;
import io.gitbub.devlibx.easy.helper.Safe;
import io.github.devlibx.easy.testing.kafka.IKafkaExtensionControl;
import io.github.devlibx.easy.testing.kafka.TestingKafkaConfig;
import java.util.Collection;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
import org.junit.jupiter.api.extension.ExecutionCondition;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.shaded.com.github.dockerjava.core.command.AbstrDockerCmd;
import org.testcontainers.utility.DockerImageName;

public class KafkaExtension
implements ParameterResolver,
BeforeAllCallback,
AfterAllCallback,
IKafkaExtensionControl,
BeforeEachCallback,
ExecutionCondition {
    private static final Logger log = LoggerFactory.getLogger(KafkaExtension.class);
    public static final String DISABLE_IF_KAFKA_NOT_RUNNING = "DISABLE_IF_KAFKA_NOT_RUNNING";
    private KafkaContainer kafkaContainer;
    private AdminClient client;

    public synchronized void beforeAll(ExtensionContext context) throws Exception {
        LoggingHelper.setupLogging();
        LoggingHelper.getLogger(AbstrDockerCmd.class).setLevel(Level.OFF);
        ExtensionContext.Store store = context.getRoot().getStore(ExtensionContext.Namespace.GLOBAL);
        LocalKafkaClientHolder kafkaHolder = (LocalKafkaClientHolder)store.getOrComputeIfAbsent(LocalKafkaClientHolder.class);
        if (kafkaHolder.isRunning()) {
            this.client = kafkaHolder.localKafkaClient;
        } else {
            DockerContainerKafkaHolder kafkaContainerHolder = (DockerContainerKafkaHolder)store.getOrComputeIfAbsent(DockerContainerKafkaHolder.class);
            if (kafkaContainerHolder.isRunning()) {
                this.kafkaContainer = kafkaContainerHolder.kafkaContainer;
            } else {
                kafkaContainerHolder.start();
                if (kafkaContainerHolder.isRunning()) {
                    this.kafkaContainer = kafkaContainerHolder.kafkaContainer;
                }
            }
        }
    }

    public synchronized void afterAll(ExtensionContext context) throws Exception {
    }

    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        return parameterContext.getParameter().getType() == TestingKafkaConfig.class || parameterContext.getParameter().getType() == Producer.class || parameterContext.getParameter().getType() == Consumer.class || parameterContext.getParameter().getType() == IKafkaExtensionControl.class;
    }

    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        if (parameterContext.getParameter().getType() == TestingKafkaConfig.class) {
            TestingKafkaConfig kafkaConfig = new TestingKafkaConfig();
            kafkaConfig.setRunning(false);
            if (this.isKafkaRunning()) {
                kafkaConfig.setBrokers(this.getKafkaUrl());
                kafkaConfig.setRunning(true);
            }
            return kafkaConfig;
        }
        if (parameterContext.getParameter().getType() == Producer.class) {
            if (this.isKafkaRunning()) {
                return this.createProducer();
            }
            return null;
        }
        if (parameterContext.getParameter().getType() == Consumer.class) {
            if (this.isKafkaRunning()) {
                return this.createConsumer();
            }
            return null;
        }
        if (parameterContext.getParameter().getType() == IKafkaExtensionControl.class) {
            return this;
        }
        return parameterContext.getParameter();
    }

    private Producer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.getKafkaUrl());
        properties.put("retries", (Object)1);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("acks", "all");
        return new KafkaProducer(properties);
    }

    private KafkaConsumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.getKafkaUrl());
        properties.put("group.id", UUID.randomUUID().toString());
        properties.put("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "true");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("max.poll.records", "10");
        return new KafkaConsumer(properties);
    }

    @Override
    public synchronized void stopIfRunning() {
        if (this.kafkaContainer != null) {
            Safe.safe(() -> this.kafkaContainer.stop());
            this.kafkaContainer = null;
        }
        if (this.client != null) {
            Safe.safe(() -> this.client.close());
            this.client = null;
        }
    }

    @Override
    public synchronized boolean isKafkaRunning() {
        if (this.kafkaContainer != null) {
            return this.kafkaContainer.isRunning();
        }
        if (this.client != null) {
            try {
                this.client.listTopics();
                return true;
            }
            catch (Exception e) {
                return false;
            }
        }
        return false;
    }

    private String getKafkaUrl() {
        if (this.kafkaContainer != null) {
            return this.kafkaContainer.getBootstrapServers();
        }
        if (this.client != null) {
            try {
                return "localhost:9092";
            }
            catch (Exception e) {
                return null;
            }
        }
        return null;
    }

    public void beforeEach(ExtensionContext context) throws Exception {
    }

    public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
        if (context.getTags() != null && context.getTags().contains(DISABLE_IF_KAFKA_NOT_RUNNING) && !this.isKafkaRunning()) {
            return ConditionEvaluationResult.disabled((String)"Test expected kafka to be running - we did not find local or docker kafka running");
        }
        return ConditionEvaluationResult.enabled((String)"");
    }

    public static class LocalKafkaClientHolder {
        private AdminClient localKafkaClient;

        public LocalKafkaClientHolder() {
            try {
                log.info("Try to create a client for local kafka - to see if we can use local kafka");
                HashMap<String, String> conf = new HashMap<String, String>();
                conf.put("bootstrap.servers", "localhost:9092");
                conf.put("request.timeout.ms", "5000");
                this.localKafkaClient = AdminClient.create(conf);
                DescribeClusterResult result = this.localKafkaClient.describeCluster();
                this.localKafkaClient.listTopics();
                log.info("local kafka available: {} {}", (Object)result, (Object)((Collection)result.nodes().get()).size());
            }
            catch (Exception e) {
                log.error("Kafka not running in localhost: we will try to use kafka on docker: e={}", (Object)e.getMessage());
                if (this.localKafkaClient != null) {
                    Safe.safe(() -> this.localKafkaClient.close());
                }
                this.localKafkaClient = null;
            }
        }

        public boolean isRunning() {
            if (this.localKafkaClient != null) {
                try {
                    this.localKafkaClient.listTopics();
                    return true;
                }
                catch (Exception e) {
                    return false;
                }
            }
            return false;
        }
    }

    public static class DockerContainerKafkaHolder {
        private KafkaContainer kafkaContainer;

        public DockerContainerKafkaHolder() {
            this.start();
        }

        public void start() {
            if (this.kafkaContainer != null) {
                Safe.safe(() -> this.kafkaContainer.stop());
            }
            try {
                log.info("Try to create a client for docker kafka - to see if we can use local kafka");
                this.kafkaContainer = (KafkaContainer)new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka:5.4.3")).withReuse(true);
                this.kafkaContainer.start();
                log.info("docker kafka available");
            }
            catch (Exception e) {
                log.error("failed to start kafka in docker: error={}", (Object)e.getMessage());
                this.kafkaContainer = null;
            }
        }

        public boolean isRunning() {
            try {
                if (this.kafkaContainer != null) {
                    log.info("Docker kafka running = {}", (Object)this.kafkaContainer.isRunning());
                }
                return this.kafkaContainer != null && this.kafkaContainer.isRunning();
            }
            catch (Exception e) {
                return false;
            }
        }
    }
}

