/*
 * Decompiled with CFR 0.152.
 */
package org.cg.common.flume;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.security.Security;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.cg.common.flume.InternalHttpServer;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
import org.jboss.netty.handler.ssl.SslHandler;

public abstract class AbstractAvroSource
extends AbstractSource
implements EventDrivenSource,
Configurable {
    protected static final Log logger = LogFactory.getLog(AbstractAvroSource.class);
    private static final String PORT_KEY = "port";
    private static final String BIND_KEY = "bind";
    private static final String PROTOCOL_KEY = "protocol";
    private static final String COMPRESSION_TYPE = "compression-type";
    private static final String SSL_KEY = "ssl";
    private static final String KEYSTORE_KEY = "keystore";
    private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
    private static final String KEYSTORE_TYPE_KEY = "keystore-type";
    public static final String HTTP_PROTOCOL = "http";
    private static final String HTTP_CONNECTIONS = "http.connections";
    private static final String THREADS = "threads";
    private int port;
    private String bindAddress;
    private String compressionType;
    private String keystore;
    private String keystorePassword;
    private String keystoreType;
    private boolean enableSsl = false;
    private String protocol;
    private int httpConnections = 10;
    private Server nettyServer;
    private InternalHttpServer httpServer;
    private int maxThreads;
    protected SourceCounter sourceCounter;
    private ScheduledExecutorService connectionCountUpdater;
    private Class rpcProtocol;

    public AbstractAvroSource(Class rpcProtocol) {
        this.rpcProtocol = rpcProtocol;
    }

    public void configure(Context context) {
        Configurables.ensureRequiredNonNull((Context)context, (String[])new String[]{PORT_KEY, BIND_KEY});
        this.port = context.getInteger(PORT_KEY);
        this.bindAddress = context.getString(BIND_KEY);
        this.protocol = context.getString(PROTOCOL_KEY, HTTP_PROTOCOL);
        this.compressionType = context.getString(COMPRESSION_TYPE, "none");
        this.httpConnections = context.getInteger(HTTP_CONNECTIONS, Integer.valueOf(10));
        try {
            this.maxThreads = context.getInteger(THREADS, Integer.valueOf(0));
        }
        catch (NumberFormatException e) {
            logger.warn((Object)"can not parse max thread configuration", (Throwable)e);
        }
        this.enableSsl = context.getBoolean(SSL_KEY, Boolean.valueOf(false));
        this.keystore = context.getString(KEYSTORE_KEY);
        this.keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
        this.keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
        if (this.enableSsl) {
            Preconditions.checkNotNull((Object)this.keystore, (Object)"keystore must be specified when SSL is enabled");
            Preconditions.checkNotNull((Object)this.keystorePassword, (Object)"keystore-password must be specified when SSL is enabled");
            try {
                KeyStore ks = KeyStore.getInstance(this.keystoreType);
                ks.load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
            }
            catch (Exception ex) {
                throw new FlumeException("flow source configured with invalid keystore: " + this.keystore, (Throwable)ex);
            }
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(this.getName());
        }
        this.customConfig(context);
    }

    public abstract void customConfig(Context var1);

    public void start() {
        logger.info((Object)String.format("avro source server %s starting ...", new Object[]{this}));
        if (HTTP_PROTOCOL.equals(this.protocol)) {
            this.startHttp();
        } else {
            this.startNetty();
        }
        this.sourceCounter.start();
        super.start();
        logger.info((Object)String.format("avro source server %s started", new Object[]{this}));
    }

    public void startNetty() {
        SpecificResponder responder = new SpecificResponder(this.rpcProtocol, (Object)this);
        NioServerSocketChannelFactory socketChannelFactory = this.initSocketChannelFactory();
        ChannelPipelineFactory pipelineFactory = this.initChannelPipelineFactory();
        this.nettyServer = new NettyServer((Responder)responder, new InetSocketAddress(this.bindAddress, this.port), (ChannelFactory)socketChannelFactory, pipelineFactory, null);
        this.connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
        this.nettyServer.start();
        logger.info((Object)"before starting NettyServer");
        final NettyServer srv = (NettyServer)this.nettyServer;
        this.connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                AbstractAvroSource.this.sourceCounter.setOpenConnectionCount(Long.valueOf(srv.getNumActiveConnections()).longValue());
            }
        }, 0L, 60L, TimeUnit.SECONDS);
    }

    public void startHttp() {
        SpecificResponder responder = new SpecificResponder(this.rpcProtocol, (Object)this);
        try {
            this.httpServer = new InternalHttpServer((Responder)responder, this.port, this.httpConnections);
        }
        catch (IOException e) {
            logger.error((Object)"failed to start http server", (Throwable)e);
        }
        this.httpServer.start();
    }

    private NioServerSocketChannelFactory initSocketChannelFactory() {
        NioServerSocketChannelFactory socketChannelFactory = this.maxThreads <= 0 ? new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newCachedThreadPool()) : new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newFixedThreadPool(this.maxThreads));
        return socketChannelFactory;
    }

    private ChannelPipelineFactory initChannelPipelineFactory() {
        boolean enableCompression = this.compressionType.equalsIgnoreCase("deflate");
        Object pipelineFactory = enableCompression || this.enableSsl ? new SSLCompressionChannelPipelineFactory(enableCompression, this.enableSsl, this.keystore, this.keystorePassword, this.keystoreType) : new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline();
            }
        };
        return pipelineFactory;
    }

    public void stop() {
        logger.info((Object)String.format("avro source server {%s} stopping: {%s}", new Object[]{this.getName(), this}));
        if (HTTP_PROTOCOL.equals(this.protocol)) {
            this.httpStop();
        } else {
            this.nettyStop();
        }
        super.stop();
        logger.info((Object)String.format("avro source server {%s} stopped. Metrics: {%s}", this.getName(), this.sourceCounter));
    }

    public void httpStop() {
        this.httpServer.close();
        try {
            this.httpServer.join();
        }
        catch (InterruptedException e) {
            logger.info((Object)("avro source server " + this.getName() + ": Interrupted while waiting " + "for Avro server to stop. Exiting. Exception follows."), (Throwable)e);
        }
    }

    public void nettyStop() {
        this.nettyServer.close();
        this.sourceCounter.stop();
        this.connectionCountUpdater.shutdown();
        while (!this.connectionCountUpdater.isTerminated()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException ex) {
                logger.error((Object)"Interrupted while waiting for connection count executor to terminate", (Throwable)ex);
                Throwables.propagate((Throwable)ex);
            }
        }
        try {
            this.nettyServer.join();
        }
        catch (InterruptedException e) {
            logger.info((Object)("avro source server " + this.getName() + ": Interrupted while waiting " + "for Avro server to stop. Exiting. Exception follows."), (Throwable)e);
        }
    }

    public String toString() {
        return "avro source server " + this.getName() + ": { bindAddress: " + this.bindAddress + ", port: " + this.port + " }";
    }

    protected static class SSLCompressionChannelPipelineFactory
    implements ChannelPipelineFactory {
        private boolean enableCompression;
        private boolean enableSsl;
        private String keystore;
        private String keystorePassword;
        private String keystoreType;

        public SSLCompressionChannelPipelineFactory(boolean enableCompression, boolean enableSsl, String keystore, String keystorePassword, String keystoreType) {
            this.enableCompression = enableCompression;
            this.enableSsl = enableSsl;
            this.keystore = keystore;
            this.keystorePassword = keystorePassword;
            this.keystoreType = keystoreType;
        }

        private SSLContext createServerSSLContext() {
            try {
                KeyStore ks = KeyStore.getInstance(this.keystoreType);
                ks.load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
                KeyManagerFactory kmf = KeyManagerFactory.getInstance(this.getAlgorithm());
                kmf.init(ks, this.keystorePassword.toCharArray());
                SSLContext serverContext = SSLContext.getInstance("TLS");
                serverContext.init(kmf.getKeyManagers(), null, null);
                return serverContext;
            }
            catch (Exception e) {
                throw new Error("Failed to initialize the server-side SSLContext", e);
            }
        }

        private String getAlgorithm() {
            String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm");
            if (algorithm == null) {
                algorithm = "SunX509";
            }
            return algorithm;
        }

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            if (this.enableCompression) {
                ZlibEncoder encoder = new ZlibEncoder(6);
                pipeline.addFirst("deflater", (ChannelHandler)encoder);
                pipeline.addFirst("inflater", (ChannelHandler)new ZlibDecoder());
            }
            if (this.enableSsl) {
                SSLEngine sslEngine = this.createServerSSLContext().createSSLEngine();
                sslEngine.setUseClientMode(false);
                pipeline.addFirst(AbstractAvroSource.SSL_KEY, (ChannelHandler)new SslHandler(sslEngine));
            }
            return pipeline;
        }
    }
}

