/*
 * Decompiled with CFR 0.152.
 */
package com.webull.openapi.samples.data;

import com.webull.openapi.core.common.dict.Category;
import com.webull.openapi.core.common.dict.SubscribeType;
import com.webull.openapi.core.execption.ClientException;
import com.webull.openapi.core.execption.ServerException;
import com.webull.openapi.core.logger.Logger;
import com.webull.openapi.core.logger.LoggerFactory;
import com.webull.openapi.core.serialize.JsonSerializer;
import com.webull.openapi.core.utils.GUID;
import com.webull.openapi.data.quotes.subsribe.IDataStreamingClient;
import com.webull.openapi.data.quotes.subsribe.message.MarketData;
import java.util.HashSet;

public class DataStreamingClientAsync {
    private static final Logger logger = LoggerFactory.getLogger(DataStreamingClientAsync.class);

    public static void main(String[] args) {
        HashSet<String> symbols = new HashSet<String>();
        symbols.add("AAPL");
        HashSet<String> subTypes = new HashSet<String>();
        subTypes.add(SubscribeType.SNAPSHOT.name());
        subTypes.add(SubscribeType.QUOTE.name());
        subTypes.add(SubscribeType.TICK.name());
        String category = Category.US_STOCK.name();
        String depth = "1";
        boolean overnightRequired = false;
        try (IDataStreamingClient client = IDataStreamingClient.builder().appKey("<your_app_key>").appSecret("<your_app_secret>").sessionId(GUID.get()).regionId("<your_region_id>").onMessage(DataStreamingClientAsync::handleMarketData).addSubscription(symbols, category, subTypes, depth, Boolean.valueOf(overnightRequired)).build();){
            long elapsed;
            client.connectBlocking();
            client.subscribeAsync();
            long ticker = 30L;
            int waitTime = 1;
            long startTime = System.currentTimeMillis();
            while (true) {
                if ((elapsed = (System.currentTimeMillis() - startTime) / 1000L) >= ticker) break;
                logger.info("Waiting {} seconds before remove subscription... (elapsed {}s / {}s)", new Object[]{waitTime, elapsed, ticker});
                Thread.sleep((long)waitTime * 1000L);
            }
            logger.info("Wait completed, start remove subscription...");
            client.removeSubscriptionAsync(symbols, category, subTypes);
            logger.info("Asynchronous call to cancel subscription succeeded.");
            startTime = System.currentTimeMillis();
            while (true) {
                if ((elapsed = (System.currentTimeMillis() - startTime) / 1000L) >= ticker) break;
                logger.info("Waiting {} seconds before subscription... (elapsed {}s / {}s)", new Object[]{waitTime, elapsed, ticker});
                Thread.sleep((long)waitTime * 1000L);
            }
            logger.info("Wait completed, start subscribing...");
            client.addSubscriptionAsync(symbols, category, subTypes, depth, Boolean.valueOf(overnightRequired));
            logger.info("Asynchronous call to subscribe succeeded.");
            startTime = System.currentTimeMillis();
            while (true) {
                if ((elapsed = (System.currentTimeMillis() - startTime) / 1000L) >= ticker) break;
                logger.info("Waiting {} seconds before disconnect... (elapsed {}s / {}s)", new Object[]{waitTime, elapsed, ticker});
                Thread.sleep((long)waitTime * 1000L);
            }
            logger.info("Wait completed, start disconnect...");
            client.disconnectAsync();
            logger.info("Asynchronous call to disconnect succeeded.");
        }
        catch (ClientException ex) {
            logger.error("Client error", (Throwable)ex);
        }
        catch (ServerException ex) {
            logger.error("Sever error", (Throwable)ex);
        }
        catch (Exception ex) {
            logger.error("Unknown error", (Throwable)ex);
        }
    }

    private static void handleMarketData(MarketData marketData) {
        logger.info("Received market data: {}", (Object)JsonSerializer.toJson((Object)marketData));
    }
}

