/*
 * Decompiled with CFR 0.152.
 */
package com.cloudimpl.outstack.workflow.example;

import java.time.Duration;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

public class FluxTest {
    static Set<String> set = new ConcurrentSkipListSet<String>();

    public static void main(String[] args) throws InterruptedException {
        Mono m1 = Mono.delay((Duration)Duration.ofSeconds(1L)).map(l -> "m1").doOnNext(System.out::println).doOnCancel(() -> System.out.println("m1 cancel"));
        Mono m2 = Mono.delay((Duration)Duration.ofSeconds(2L)).map(l -> "m2").doOnNext(System.out::println).doOnCancel(() -> System.out.println("m2 cancel"));
        Mono m3 = Mono.delay((Duration)Duration.ofSeconds(3L)).map(l -> "m3").doOnNext(System.out::println).doOnCancel(() -> System.out.println("m3 cancel"));
        Mono m4 = Mono.delay((Duration)Duration.ofSeconds(4L)).map(l -> "m4").doOnNext(System.out::println).doOnCancel(() -> System.out.println("m4 cancel"));
        Mono all = m1.flatMap(m -> m2).flatMap(m -> m3).flatMap(m -> m4).doOnCancel(() -> System.out.println("final cancel"));
        Disposable hnd = all.doOnNext(System.out::println).subscribe();
        Thread.sleep(5000L);
        hnd.dispose();
        Thread.sleep(100000000L);
    }

    private static Mono<String> retryWrap(String s) {
        return Mono.defer(() -> FluxTest.work(s)).doOnError(err -> System.out.println("errr: " + err.getMessage())).retry(10L);
    }

    public static Mono<String> work(String s) {
        if (set.add(s)) {
            return Mono.delay((Duration)Duration.ofSeconds(1L)).flatMap(l -> Mono.error(() -> new RuntimeException("work " + s + " error")));
        }
        return Mono.delay((Duration)Duration.ofSeconds(new Random(System.currentTimeMillis()).nextInt(10))).map(l -> s + "done");
    }
}

