/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.db.batch.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.ConnectionConfig;
import io.cdap.plugin.DBConfig;
import io.cdap.plugin.DBManager;
import io.cdap.plugin.DBRecord;
import io.cdap.plugin.DBUtils;
import io.cdap.plugin.DriverCleanup;
import io.cdap.plugin.FieldCase;
import io.cdap.plugin.StructuredRecordUtils;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.ReferenceBatchSource;
import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.db.batch.TransactionIsolationLevel;
import io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsource")
@Name(value="Database")
@Description(value="Reads from a database table(s) using a configurable SQL query. Outputs one record for each row returned by the query.")
public class DBSource
extends ReferenceBatchSource<LongWritable, DBRecord, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(DBSource.class);
    private static final Pattern CONDITIONS_AND = Pattern.compile("\\$conditions (and|or)\\s+", 2);
    private static final Pattern AND_CONDITIONS = Pattern.compile("\\s+(and|or) \\$conditions", 2);
    private static final Pattern WHERE_CONDITIONS = Pattern.compile("\\s+where \\$conditions", 2);
    private final DBSourceConfig sourceConfig;
    private final DBManager dbManager;
    private Class<? extends Driver> driverClass;

    public DBSource(DBSourceConfig sourceConfig) {
        super(new ReferencePluginConfig(sourceConfig.referenceName));
        this.sourceConfig = sourceConfig;
        this.dbManager = new DBManager(sourceConfig);
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.sourceConfig.validate(collector);
        Class<? extends Driver> driverClass = this.dbManager.validateJDBCPluginPipeline(pipelineConfigurer, this.getJDBCPluginId(), collector);
        collector.getOrThrowException();
        Schema configuredSchema = this.sourceConfig.getSchema(collector);
        if (configuredSchema != null) {
            pipelineConfigurer.getStageConfigurer().setOutputSchema(configuredSchema);
        } else if (!this.sourceConfig.containsMacro("importQuery")) {
            try {
                pipelineConfigurer.getStageConfigurer().setOutputSchema(this.getSchema(driverClass));
            }
            catch (IllegalAccessException | InstantiationException e) {
                collector.addFailure(String.format("Failed to instantiate JDBC driver: %s", e.getMessage()), null);
            }
            catch (SQLException e) {
                collector.addFailure(String.format("Encountered SQL error while getting query schema: %s", e.getMessage()), null);
            }
        }
    }

    public void prepareRun(BatchSourceContext context) {
        FailureCollector collector = context.getFailureCollector();
        this.sourceConfig.validate(collector);
        collector.getOrThrowException();
        LOG.debug("pluginType = {}; pluginName = {}; connectionString = {}; importQuery = {}; boundingQuery = {}; transaction isolation level: {}", new Object[]{this.sourceConfig.jdbcPluginType, this.sourceConfig.jdbcPluginName, this.sourceConfig.connectionString, this.sourceConfig.getImportQuery(), this.sourceConfig.getBoundingQuery(), this.sourceConfig.transactionIsolationLevel});
        Configuration hConf = new Configuration();
        hConf.clear();
        Class driverClass = context.loadPluginClass(this.getJDBCPluginId());
        if (this.sourceConfig.user == null && this.sourceConfig.password == null) {
            DBConfiguration.configureDB((Configuration)hConf, (String)driverClass.getName(), (String)this.sourceConfig.connectionString);
        } else {
            DBConfiguration.configureDB((Configuration)hConf, (String)driverClass.getName(), (String)this.sourceConfig.connectionString, (String)this.sourceConfig.user, (String)this.sourceConfig.password);
        }
        DataDrivenETLDBInputFormat.setInput(hConf, DBRecord.class, this.sourceConfig.getImportQuery(), this.sourceConfig.getBoundingQuery(), this.sourceConfig.getEnableAutoCommit());
        if (this.sourceConfig.transactionIsolationLevel != null) {
            hConf.set("io.cdap.hydrator.db.plugin.transaction.isolation.level", this.sourceConfig.transactionIsolationLevel);
        }
        if (this.sourceConfig.connectionArguments != null) {
            hConf.set("io.cdap.hydrator.db.connection.arguments", this.sourceConfig.connectionArguments);
        }
        if (this.sourceConfig.numSplits == null || this.sourceConfig.numSplits != 1) {
            if (!this.sourceConfig.getImportQuery().contains("$CONDITIONS")) {
                throw new IllegalArgumentException(String.format("Import Query %s must contain the string '$CONDITIONS'.", this.sourceConfig.importQuery));
            }
            hConf.set("mapreduce.jdbc.input.orderby", this.sourceConfig.splitBy);
        }
        if (this.sourceConfig.numSplits != null) {
            hConf.setInt("mapreduce.job.maps", this.sourceConfig.numSplits.intValue());
        }
        if (this.sourceConfig.schema != null) {
            hConf.set("io.cdap.hydrator.db.override.schema", this.sourceConfig.schema);
        }
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, this.sourceConfig.referenceName);
        lineageRecorder.createExternalDataset(this.sourceConfig.getSchema(collector));
        context.setInput(Input.of((String)this.sourceConfig.referenceName, (InputFormatProvider)new SourceInputFormatProvider(DataDrivenETLDBInputFormat.class, hConf)));
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        this.driverClass = context.loadPluginClass(this.getJDBCPluginId());
    }

    public void transform(KeyValue<LongWritable, DBRecord> input, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit((Object)StructuredRecordUtils.convertCase(((DBRecord)input.getValue()).getRecord(), FieldCase.toFieldCase(this.sourceConfig.columnNameCase)));
    }

    public void destroy() {
        try {
            DBUtils.cleanup(this.driverClass);
        }
        finally {
            this.dbManager.destroy();
        }
    }

    private String getJDBCPluginId() {
        return String.format("%s.%s.%s", "source", this.sourceConfig.jdbcPluginType, this.sourceConfig.jdbcPluginName);
    }

    /*
     * Loose catch block
     */
    private Schema getSchema(Class<? extends Driver> driverClass) throws IllegalAccessException, SQLException, InstantiationException {
        DriverCleanup driverCleanup = this.loadPluginClassAndGetDriver(driverClass);
        try {
            try (Connection connection = this.getConnection();){
                String query = this.sourceConfig.importQuery;
                Statement statement = connection.createStatement();
                statement.setMaxRows(1);
                if (query.contains("$CONDITIONS")) {
                    query = DBSource.removeConditionsClause(query);
                }
                ResultSet resultSet = statement.executeQuery(query);
                Schema schema = Schema.recordOf((String)"outputSchema", DBUtils.getSchemaFields(resultSet));
                return schema;
            }
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            driverCleanup.destroy();
        }
    }

    @VisibleForTesting
    static String removeConditionsClause(String importQueryString) {
        String query = importQueryString;
        query = CONDITIONS_AND.matcher(query).replaceAll("");
        query = AND_CONDITIONS.matcher(query).replaceAll("");
        query = WHERE_CONDITIONS.matcher(query).replaceAll("");
        return query;
    }

    private DriverCleanup loadPluginClassAndGetDriver(Class<? extends Driver> driverClass) throws IllegalAccessException, InstantiationException, SQLException {
        if (driverClass == null) {
            throw new InstantiationException(String.format("Unable to load Driver class with plugin type %s and plugin name %s", this.sourceConfig.jdbcPluginType, this.sourceConfig.jdbcPluginName));
        }
        try {
            return DBUtils.ensureJDBCDriverIsAvailable(driverClass, this.sourceConfig.connectionString, this.sourceConfig.jdbcPluginType, this.sourceConfig.jdbcPluginName);
        }
        catch (IllegalAccessException | InstantiationException | SQLException e) {
            LOG.error("Unable to load or register driver {}", driverClass, (Object)e);
            throw e;
        }
    }

    private Connection getConnection() throws SQLException {
        Properties properties = ConnectionConfig.getConnectionArguments(this.sourceConfig.connectionArguments, this.sourceConfig.user, this.sourceConfig.password);
        return DriverManager.getConnection(this.sourceConfig.connectionString, properties);
    }

    public static class DBSourceConfig
    extends DBConfig {
        public static final String IMPORT_QUERY = "importQuery";
        public static final String BOUNDING_QUERY = "boundingQuery";
        public static final String SPLIT_BY = "splitBy";
        public static final String NUM_SPLITS = "numSplits";
        public static final String SCHEMA = "schema";
        public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
        @Name(value="importQuery")
        @Description(value="The SELECT query to use to import data from the specified table. You can specify an arbitrary number of columns to import, or import all columns using *. The Query should contain the '$CONDITIONS' string unless numSplits is set to one. For example, 'SELECT * FROM table WHERE $CONDITIONS'. The '$CONDITIONS' stringwill be replaced by 'splitBy' field limits specified by the bounding query.")
        @Macro
        String importQuery;
        @Nullable
        @Name(value="boundingQuery")
        @Description(value="Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. This is required unless numSplits is set to one.")
        @Macro
        String boundingQuery;
        @Nullable
        @Name(value="splitBy")
        @Description(value="Field Name which will be used to generate splits. This is required unless numSplits is set to one.")
        @Macro
        String splitBy;
        @Nullable
        @Name(value="numSplits")
        @Description(value="The number of splits to generate. If set to one, the boundingQuery is not needed, and no $CONDITIONS string needs to be specified in the importQuery. If not specified, the execution framework will pick a value.")
        @Macro
        Integer numSplits;
        @Nullable
        @Name(value="transactionIsolationLevel")
        @Description(value="The transaction isolation level for queries run by this sink. Defaults to TRANSACTION_SERIALIZABLE. See java.sql.Connection#setTransactionIsolation for more details. The Phoenix jdbc driver will throw an exception if the Phoenix database does not have transactions enabled and this setting is set to true. For drivers like that, this should be set to TRANSACTION_NONE.")
        @Macro
        public String transactionIsolationLevel;
        @Nullable
        @Name(value="schema")
        @Description(value="The schema of records output by the source. This will be used in place of whatever schema comes back from the query. This should only be used if there is a bug in your jdbc driver. For example, if a column is not correctly getting marked as nullable.")
        String schema;

        @Nullable
        private String getImportQuery() {
            return this.cleanQuery(this.importQuery);
        }

        private String getBoundingQuery() {
            return this.cleanQuery(this.boundingQuery);
        }

        private void validate(FailureCollector collector) {
            boolean hasOneSplit = false;
            if (!this.containsMacro(NUM_SPLITS) && this.numSplits != null) {
                if (this.numSplits < 1) {
                    collector.addFailure("Number of Splits must be a positive number.", null).withConfigProperty(NUM_SPLITS);
                }
                if (this.numSplits == 1) {
                    hasOneSplit = true;
                }
            }
            if (!this.containsMacro(TRANSACTION_ISOLATION_LEVEL) && this.transactionIsolationLevel != null) {
                TransactionIsolationLevel.validate(this.transactionIsolationLevel, collector);
            }
            if (!this.containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty((String)this.importQuery)) {
                collector.addFailure("Import Query must be specified.", null).withConfigProperty(IMPORT_QUERY);
            }
            if (!(hasOneSplit || this.containsMacro(IMPORT_QUERY) || Strings.isNullOrEmpty((String)this.importQuery) || this.getImportQuery().contains("$CONDITIONS"))) {
                collector.addFailure("Invalid Import Query.", String.format("Import Query %s must contain the string '$CONDITIONS'.", this.importQuery)).withConfigProperty(IMPORT_QUERY);
            }
            if (!hasOneSplit && !this.containsMacro(SPLIT_BY) && Strings.isNullOrEmpty((String)this.splitBy)) {
                collector.addFailure("Split-By Field Name must be specified if Number of Splits is not set to 1.", null).withConfigProperty(SPLIT_BY).withConfigProperty(NUM_SPLITS);
            }
            if (!hasOneSplit && !this.containsMacro(BOUNDING_QUERY) && Strings.isNullOrEmpty((String)this.boundingQuery)) {
                collector.addFailure("Bounding Query must be specified if Number of Splits is not set to 1.", null).withConfigProperty(BOUNDING_QUERY).withConfigProperty(NUM_SPLITS);
            }
        }

        @Nullable
        private Schema getSchema(FailureCollector collector) {
            try {
                return Strings.isNullOrEmpty((String)this.schema) ? null : Schema.parseJson((String)this.schema);
            }
            catch (IOException e) {
                collector.addFailure(String.format("Invalid Schema : %s", e.getMessage()), null);
                throw collector.getOrThrowException();
            }
        }
    }
}

