/*
 * Decompiled with CFR 0.152.
 */
package com.cloudimpl.outstack.spring.service;

import com.cloudimpl.outstack.common.CloudMessage;
import com.cloudimpl.outstack.common.RouterType;
import com.cloudimpl.outstack.core.CloudUtil;
import com.cloudimpl.outstack.core.annon.CloudFunction;
import com.cloudimpl.outstack.core.annon.Router;
import com.cloudimpl.outstack.runtime.EventRepositoryFactory;
import com.cloudimpl.outstack.runtime.EventRepositoy;
import com.cloudimpl.outstack.runtime.common.StreamProcessor;
import com.cloudimpl.outstack.runtime.domainspec.Entity;
import com.cloudimpl.outstack.runtime.domainspec.Query;
import com.cloudimpl.outstack.runtime.repo.RepoStreamingReq;
import com.cloudimpl.outstack.runtime.repo.StreamEvent;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

@CloudFunction(name="RepositoryStreamingService")
@Router(routerType=RouterType.NODE_ID)
public class RepositoryStreamingService
implements Function<CloudMessage, Flux> {
    private static final Logger log = LoggerFactory.getLogger(RepositoryStreamingService.class);

    @Override
    public Flux<StreamEvent> apply(CloudMessage t) {
        RepoStreamingReq req = (RepoStreamingReq)t.data();
        StreamProcessor eventStream = EventRepositoryFactory.getEventStream();
        Flux eventsFlux = eventStream.flux().filter(e -> {
            Entity entity = (Entity)e.getEvent();
            Optional listOptional = req.getResources(e.getEvent().getClass().getName());
            if (listOptional.isPresent()) {
                List list = (List)listOptional.get();
                for (RepoStreamingReq.ResourceInfo info : list) {
                    if (info.getTenantId() != null && info.getTenantId().equals("*")) {
                        if (info.getEntityId().equals("*")) {
                            return true;
                        }
                        if (!info.getEntityId().equals(info.getEntityId())) continue;
                        return true;
                    }
                    if (info.getTenantId() == null && entity.getTenantId() == null) {
                        if (info.getEntityId().equals("*")) {
                            return true;
                        }
                        if (!info.getEntityId().equals(info.getEntityId())) continue;
                        return true;
                    }
                    if (info.getTenantId() == null || entity.getTenantId() == null || !info.getTenantId().equals(entity.getTenantId())) continue;
                    if (info.getEntityId().equals("*")) {
                        return true;
                    }
                    if (!info.getEntityId().equals(info.getEntityId())) continue;
                    return true;
                }
            }
            return false;
        }).cast(StreamEvent.class);
        return Flux.merge((Publisher[])new Publisher[]{this.subscribeToRootResources(req), eventsFlux});
    }

    private Flux<StreamEvent> subscribeToRootResources(RepoStreamingReq req) {
        return Flux.fromIterable((Iterable)req.getInitialDownloadResources()).map(res -> this.getEventRepo(res.getEntityType()).getAllByRootType(CloudUtil.classForName((String)res.getEntityType()), res.getTenantId(), Query.PagingRequest.EMPTY)).flatMapIterable(rs -> rs.getItems()).map(r -> new StreamEvent(StreamEvent.Action.ADD, r));
    }

    private EventRepositoy getEventRepo(String rootType) {
        return (EventRepositoy)EventRepositoryFactory.getRepository((Class)CloudUtil.classForName((String)rootType)).orElseThrow(() -> new RuntimeException("repository " + rootType + " not found"));
    }
}

