/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx.aws;

import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.github.davidmoten.rx.aws.SqsMessage;
import com.github.davidmoten.util.Preconditions;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Observer;
import rx.functions.Func0;
import rx.observables.SyncOnSubscribe;

public final class Sqs {
    private Sqs() {
    }

    public static SqsBuilder queueName(String queueName) {
        return new SqsBuilder(queueName);
    }

    static Observable<SqsMessage> messages(Func0<AmazonSQSClient> sqsClientFactory, Optional<Func0<AmazonS3Client>> s3ClientFactory, String queueName, Optional<String> bucketName) {
        Preconditions.checkNotNull(sqsClientFactory);
        Preconditions.checkNotNull(s3ClientFactory);
        Preconditions.checkNotNull((Object)queueName);
        Preconditions.checkNotNull(bucketName);
        return Observable.using(sqsClientFactory, sqs -> Sqs.createObservable(sqs, s3ClientFactory, sqsClientFactory, queueName, bucketName), sqs -> sqs.shutdown());
    }

    private static Observable<SqsMessage> createObservable(AmazonSQSClient sqs, Optional<Func0<AmazonS3Client>> s3ClientFactory, Func0<AmazonSQSClient> sqsClientFactory, String queueName, Optional<String> bucketName) {
        return Observable.using(() -> s3ClientFactory.map(Func0::call), s3 -> Sqs.createObservable(sqs, s3ClientFactory, sqsClientFactory, queueName, bucketName, s3), s3 -> s3.ifPresent(AmazonWebServiceClient::shutdown));
    }

    private static Observable<SqsMessage> createObservable(final AmazonSQSClient sqs, Optional<Func0<AmazonS3Client>> s3ClientFactory, Func0<AmazonSQSClient> sqsClientFactory, final String queueName, final Optional<String> bucketName, final Optional<AmazonS3Client> s3) {
        final SqsMessage.Service service = new SqsMessage.Service(s3ClientFactory, sqsClientFactory, s3, sqs, queueName, bucketName);
        return Observable.create((SyncOnSubscribe)new SyncOnSubscribe<State, SqsMessage>(){
            private ReceiveMessageRequest request;
            private String queueUrl;

            protected State generateState() {
                this.queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
                this.request = new ReceiveMessageRequest(this.queueUrl).withWaitTimeSeconds(Integer.valueOf(20)).withMaxNumberOfMessages(Integer.valueOf(10));
                return new State(new LinkedList<Message>());
            }

            protected State next(State state, Observer<? super SqsMessage> observer) {
                Queue<Message> q = state.queue;
                AtomicReference<SqsMessage> next = new AtomicReference<SqsMessage>();
                while (next.get() == null) {
                    while (q.isEmpty()) {
                        ReceiveMessageResult result = sqs.receiveMessage(this.request);
                        q.addAll(result.getMessages());
                    }
                    this.getNextMessage(sqs, bucketName, s3, service, observer, q, next);
                }
                observer.onNext(next.get());
                return state;
            }

            private void getNextMessage(AmazonSQSClient sqs2, Optional<String> bucketName2, Optional<AmazonS3Client> s32, SqsMessage.Service service2, Observer<? super SqsMessage> observer, Queue<Message> queue, AtomicReference<SqsMessage> next) {
                Message message = queue.poll();
                if (bucketName2.isPresent()) {
                    String s3Id = message.getBody();
                    if (!s32.get().doesObjectExist(bucketName2.get(), s3Id)) {
                        sqs2.deleteMessage(this.queueUrl, message.getReceiptHandle());
                    } else {
                        S3Object object = s32.get().getObject(bucketName2.get(), s3Id);
                        byte[] content = Sqs.readAndClose((InputStream)object.getObjectContent());
                        long timestamp = object.getObjectMetadata().getLastModified().getTime();
                        SqsMessage mb = new SqsMessage(message.getReceiptHandle(), content, timestamp, Optional.of(s3Id), service2);
                        next.set(mb);
                    }
                } else {
                    SqsMessage mb = new SqsMessage(message.getReceiptHandle(), message.getBody().getBytes(StandardCharsets.UTF_8), System.currentTimeMillis(), Optional.empty(), service2);
                    next.set(mb);
                }
            }
        });
    }

    static byte[] readAndClose(InputStream is) {
        Preconditions.checkNotNull((Object)is);
        try {
            int n;
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] bytes = new byte[8192];
            while ((n = is.read(bytes)) != -1) {
                bos.write(bytes, 0, n);
            }
            return bos.toByteArray();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static final class State {
        final Queue<Message> queue;

        public State(Queue<Message> queue) {
            this.queue = queue;
        }
    }

    public static final class ViaS3Builder {
        private final SqsBuilder sqsBuilder;

        public ViaS3Builder(SqsBuilder sqsBuilder) {
            this.sqsBuilder = sqsBuilder;
        }

        public SqsBuilder s3Factory(Func0<AmazonS3Client> s3Factory) {
            this.sqsBuilder.s3 = Optional.of(s3Factory);
            return this.sqsBuilder;
        }
    }

    public static final class SqsBuilder {
        private final String queueName;
        private Func0<AmazonSQSClient> sqs = null;
        private Optional<Func0<AmazonS3Client>> s3 = Optional.empty();
        private Optional<String> bucketName = Optional.empty();

        SqsBuilder(String queueName) {
            Preconditions.checkNotNull((Object)queueName);
            this.queueName = queueName;
        }

        public ViaS3Builder bucketName(String bucketName) {
            this.bucketName = Optional.of(bucketName);
            return new ViaS3Builder(this);
        }

        public SqsBuilder sqsFactory(Func0<AmazonSQSClient> sqsFactory) {
            this.sqs = sqsFactory;
            return this;
        }

        public Observable<SqsMessage> messages() {
            return Sqs.messages(this.sqs, this.s3, this.queueName, this.bucketName);
        }
    }
}

