/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.examples.runnable;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import com.fasterxml.jackson.dataformat.cbor.CBORParser;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.datatype.json.JSON;
import com.pushtechnology.diffusion.examples.runnable.AbstractClient;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConsumingJson
extends AbstractClient {
    private static final CBORFactory CBOR_FACTORY = new CBORFactory();
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper((JsonFactory)CBOR_FACTORY);
    private static final Logger LOG = LoggerFactory.getLogger(ConsumingJson.class);
    private static final TypeReference<Map<String, BigInteger>> INT_MAP_TYPE = new TypeReference<Map<String, BigInteger>>(){};

    public ConsumingJson(String url, String principal) {
        super(url, principal);
    }

    @Override
    public void onStarted(Session session) {
        Topics topics = (Topics)session.feature(Topics.class);
        topics.addStream(">json/random", JSON.class, (Topics.ValueStream)new Topics.ValueStream.Default<JSON>(){

            public void onValue(String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) {
                try {
                    CBORParser parser = CBOR_FACTORY.createParser(newValue.asInputStream());
                    Map map = (Map)OBJECT_MAPPER.readValue((JsonParser)parser, INT_MAP_TYPE);
                    parser.close();
                    LOG.info("New timestamp {}", map.get("timestamp"));
                }
                catch (IOException e) {
                    LOG.warn("Failed to transform value '{}'", (Object)newValue, (Object)e);
                }
            }
        });
        topics.subscribe("json/random").whenComplete((voidResult, exception) -> {
            if (exception != null) {
                LOG.info("subscription failed", exception);
            }
        });
    }

    public static void main(String[] args) throws InterruptedException {
        ConsumingJson client = new ConsumingJson("ws://diffusion.example.com:80", "auth");
        client.start("auth_secret");
        client.waitForStopped();
    }
}

