/*
 * Decompiled with CFR 0.152.
 */
package de.quantummaid.eventmaid.messagebus;

import de.quantummaid.eventmaid.channel.Channel;
import de.quantummaid.eventmaid.filtering.Filter;
import de.quantummaid.eventmaid.filtering.FilterActions;
import de.quantummaid.eventmaid.identification.CorrelationId;
import de.quantummaid.eventmaid.identification.MessageId;
import de.quantummaid.eventmaid.internal.exceptions.BubbleUpWrappedException;
import de.quantummaid.eventmaid.messagebus.MessageBus;
import de.quantummaid.eventmaid.messagebus.MessageBusStatusInformation;
import de.quantummaid.eventmaid.messagebus.exception.MessageBusExceptionListener;
import de.quantummaid.eventmaid.messagebus.internal.MessageBusStatusInformationAdapter;
import de.quantummaid.eventmaid.messagebus.internal.brokering.MessageBusBrokerStrategy;
import de.quantummaid.eventmaid.messagebus.internal.correlationids.CorrelationBasedSubscriptions;
import de.quantummaid.eventmaid.messagebus.internal.exception.ExceptionListenerHandler;
import de.quantummaid.eventmaid.messagebus.internal.statistics.ChannelBasedMessageBusStatisticsCollector;
import de.quantummaid.eventmaid.processingcontext.EventType;
import de.quantummaid.eventmaid.processingcontext.ProcessingContext;
import de.quantummaid.eventmaid.subscribing.ConsumerSubscriber;
import de.quantummaid.eventmaid.subscribing.Subscriber;
import de.quantummaid.eventmaid.subscribing.SubscriptionId;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Generated;

final class MessageBusImpl
implements MessageBus {
    private final Channel<Object> acceptingChannel;
    private final MessageBusBrokerStrategy brokerStrategy;
    private final CorrelationBasedSubscriptions correlationBasedSubscriptions;
    private final ExceptionListenerHandler exceptionListenerHandler;
    private final MessageBusStatusInformationAdapter statusInformationAdapter;

    MessageBusImpl(Channel<Object> acceptingChannel, MessageBusBrokerStrategy brokerStrategy, CorrelationBasedSubscriptions correlationBasedSubscriptions, ExceptionListenerHandler exceptionListenerHandler) {
        this.acceptingChannel = acceptingChannel;
        this.brokerStrategy = brokerStrategy;
        this.correlationBasedSubscriptions = correlationBasedSubscriptions;
        this.exceptionListenerHandler = exceptionListenerHandler;
        ChannelBasedMessageBusStatisticsCollector statisticsCollector = ChannelBasedMessageBusStatisticsCollector.channelBasedMessageBusStatisticsCollector(acceptingChannel);
        this.statusInformationAdapter = MessageBusStatusInformationAdapter.statusInformationAdapter(statisticsCollector, brokerStrategy, exceptionListenerHandler);
    }

    @Override
    public MessageId send(EventType eventType, Object object) {
        ProcessingContext<Object> processingContext = ProcessingContext.processingContext(eventType, object);
        return this.send(processingContext);
    }

    @Override
    public MessageId send(EventType eventType, Object object, CorrelationId correlationId) {
        ProcessingContext<Object> processingContext = ProcessingContext.processingContext(eventType, object, correlationId);
        return this.send(processingContext);
    }

    @Override
    public MessageId send(ProcessingContext<Object> processingContext) {
        try {
            return this.acceptingChannel.send(processingContext);
        }
        catch (BubbleUpWrappedException e) {
            throw (RuntimeException)e.getCause();
        }
    }

    @Override
    public SubscriptionId subscribe(EventType eventType, Consumer<Object> consumer) {
        ConsumerSubscriber<Object> subscriber = ConsumerSubscriber.consumerSubscriber(consumer);
        return this.subscribe(eventType, subscriber);
    }

    @Override
    public SubscriptionId subscribe(EventType eventType, Subscriber<Object> subscriber) {
        this.brokerStrategy.addSubscriber(eventType, subscriber);
        return subscriber.getSubscriptionId();
    }

    @Override
    public SubscriptionId subscribe(CorrelationId correlationId, Consumer<ProcessingContext<Object>> consumer) {
        ConsumerSubscriber<ProcessingContext<Object>> subscriber = ConsumerSubscriber.consumerSubscriber(consumer);
        return this.subscribe(correlationId, subscriber);
    }

    @Override
    public SubscriptionId subscribe(CorrelationId correlationId, Subscriber<ProcessingContext<Object>> subscriber) {
        return this.correlationBasedSubscriptions.addCorrelationBasedSubscriber(correlationId, subscriber);
    }

    @Override
    public SubscriptionId subscribeRaw(EventType eventType, Consumer<ProcessingContext<Object>> consumer) {
        ConsumerSubscriber<ProcessingContext<Object>> subscriber = ConsumerSubscriber.consumerSubscriber(consumer);
        return this.subscribeRaw(eventType, subscriber);
    }

    @Override
    public SubscriptionId subscribeRaw(EventType eventType, Subscriber<ProcessingContext<Object>> subscriber) {
        this.brokerStrategy.addRawSubscriber(eventType, subscriber);
        return subscriber.getSubscriptionId();
    }

    @Override
    public void unsubcribe(SubscriptionId subscriptionId) {
        this.brokerStrategy.removeSubscriber(subscriptionId);
        this.correlationBasedSubscriptions.unsubscribe(subscriptionId);
    }

    @Override
    public void add(Filter<Object> filter) {
        this.acceptingChannel.addProcessFilter(new FilterAdapter(filter));
    }

    @Override
    public void add(Filter<Object> filter, int position) {
        this.acceptingChannel.addProcessFilter(new FilterAdapter(filter), position);
    }

    @Override
    public void addRaw(Filter<ProcessingContext<Object>> filter) {
        this.acceptingChannel.addProcessFilter(filter);
    }

    @Override
    public void addRaw(Filter<ProcessingContext<Object>> filter, int position) {
        this.acceptingChannel.addProcessFilter(filter, position);
    }

    @Override
    public List<Filter<Object>> getFilter() {
        LinkedList<Filter<Object>> filters = new LinkedList<Filter<Object>>();
        List<Filter<ProcessingContext<Object>>> processFilter = this.acceptingChannel.getProcessFilter();
        for (Filter<ProcessingContext<Object>> filter : processFilter) {
            if (filter instanceof FilterAdapter) {
                Filter<Object> originalFilter = ((FilterAdapter)filter).delegate;
                filters.add(originalFilter);
                continue;
            }
            throw new IllegalStateException("Unexpected type of filter. Was the list of filter tampered with?");
        }
        return filters;
    }

    @Override
    public void remove(Filter<Object> filter) {
        List<Filter<ProcessingContext<Object>>> processFilter = this.acceptingChannel.getProcessFilter();
        for (Filter filter2 : processFilter) {
            if (!(filter2 instanceof FilterAdapter)) continue;
            FilterAdapter filterAdapter = (FilterAdapter)filter2;
            if (!filterAdapter.delegate.equals(filter)) continue;
            this.acceptingChannel.removeProcessFilter(filter2);
        }
    }

    @Override
    public SubscriptionId onException(EventType eventType, MessageBusExceptionListener exceptionListener) {
        return this.exceptionListenerHandler.register(eventType, exceptionListener);
    }

    @Override
    public SubscriptionId onException(CorrelationId correlationId, MessageBusExceptionListener exceptionListener) {
        return this.exceptionListenerHandler.register(correlationId, exceptionListener);
    }

    @Override
    public void unregisterExceptionListener(SubscriptionId subscriptionId) {
        this.exceptionListenerHandler.unregister(subscriptionId);
    }

    @Override
    public MessageBusStatusInformation getStatusInformation() {
        return this.statusInformationAdapter;
    }

    @Override
    public void close(boolean finishRemainingTasks) {
        this.acceptingChannel.close(finishRemainingTasks);
    }

    @Override
    public void close() {
        this.close(true);
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) {
        return true;
    }

    @Override
    public boolean isClosed() {
        return true;
    }

    private static final class FilterAdapter
    implements Filter<ProcessingContext<Object>> {
        private final Filter<Object> delegate;

        @Override
        public void apply(final ProcessingContext<Object> processingContext, final FilterActions<ProcessingContext<Object>> filterActions) {
            final Object originalPayload = processingContext.getPayload();
            this.delegate.apply(originalPayload, new FilterActions<Object>(){

                @Override
                public void block(Object message) {
                    if (originalPayload != message) {
                        processingContext.setPayload(message);
                    }
                    filterActions.block(processingContext);
                }

                @Override
                public void pass(Object message) {
                    if (originalPayload != message) {
                        processingContext.setPayload(message);
                    }
                    filterActions.pass(processingContext);
                }
            });
        }

        @Generated
        private FilterAdapter(Filter<Object> delegate) {
            this.delegate = delegate;
        }
    }
}

