/*
 * Decompiled with CFR 0.152.
 */
package com.cloudimpl.outstack.runtime.common;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class StreamProcessor<T> {
    private final List<FluxSink<T>> list = new CopyOnWriteArrayList<FluxSink<T>>();
    private final Flux<T> flux = Flux.create(emitter -> {
        System.out.println("subscription added:" + Thread.currentThread().getName());
        consumer.accept((FluxSink<T>)emitter);
        this.list.add((FluxSink<T>)emitter);
        emitter.onCancel(() -> this.remove((FluxSink)emitter));
        emitter.onDispose(() -> this.remove((FluxSink)emitter));
    });

    public StreamProcessor() {
        this(t -> {});
    }

    public StreamProcessor(Consumer<FluxSink<T>> consumer) {
    }

    public void add(T t) {
        this.list.forEach(sink -> sink.next(t));
    }

    public Flux<T> flux() {
        return this.flux;
    }

    private void remove(FluxSink sink) {
        System.out.println("remove from :" + Thread.currentThread().getName());
        this.list.remove(sink);
    }
}

