/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.db.batch.sink;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.DBConfig;
import io.cdap.plugin.DBManager;
import io.cdap.plugin.DBRecord;
import io.cdap.plugin.DBUtils;
import io.cdap.plugin.FieldCase;
import io.cdap.plugin.common.ReferenceBatchSink;
import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.db.batch.sink.ETLDBOutputFormat;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsink")
@Name(value="Database")
@Description(value="Writes records to a database table. Each record will be written to a row in the table.")
public class DBSink
extends ReferenceBatchSink<StructuredRecord, DBRecord, NullWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(DBSink.class);
    private final DBSinkConfig dbSinkConfig;
    private final DBManager dbManager;
    private Class<? extends Driver> driverClass;
    private int[] columnTypes;
    private List<String> columns;

    public DBSink(DBSinkConfig dbSinkConfig) {
        super(new ReferencePluginConfig(dbSinkConfig.referenceName));
        this.dbSinkConfig = dbSinkConfig;
        this.dbManager = new DBManager(dbSinkConfig);
    }

    private String getJDBCPluginId() {
        return String.format("%s.%s.%s", "sink", this.dbSinkConfig.jdbcPluginType, this.dbSinkConfig.jdbcPluginName);
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.dbManager.validateJDBCPluginPipeline(pipelineConfigurer, this.getJDBCPluginId(), collector);
    }

    public void prepareRun(BatchSinkContext context) {
        LOG.debug("tableName = {}; pluginType = {}; pluginName = {}; connectionString = {}; columns = {}; transaction isolation level: {}", new Object[]{this.dbSinkConfig.tableName, this.dbSinkConfig.jdbcPluginType, this.dbSinkConfig.jdbcPluginName, this.dbSinkConfig.connectionString, this.dbSinkConfig.columns, this.dbSinkConfig.transactionIsolationLevel});
        Class driverClass = context.loadPluginClass(this.getJDBCPluginId());
        try {
            Preconditions.checkArgument((boolean)this.dbManager.tableExists(driverClass, this.dbSinkConfig.tableName), (String)"Table %s does not exist. Please check that the 'tableName' property has been set correctly, and that the connection string %s points to a valid database.", (Object[])new Object[]{this.dbSinkConfig.tableName, this.dbSinkConfig.connectionString});
        }
        finally {
            DBUtils.cleanup(driverClass);
        }
        context.addOutput(Output.of((String)this.dbSinkConfig.referenceName, (OutputFormatProvider)new DBOutputFormatProvider(this.dbSinkConfig, driverClass)));
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        this.driverClass = context.loadPluginClass(this.getJDBCPluginId());
        this.setResultSetMetadata();
    }

    public void transform(StructuredRecord input, Emitter<KeyValue<DBRecord, NullWritable>> emitter) throws Exception {
        ArrayList<Schema.Field> outputFields = new ArrayList<Schema.Field>();
        for (String column : this.columns) {
            Schema.Field field = input.getSchema().getField(column);
            Preconditions.checkNotNull((Object)field, (String)"Missing schema field for column '%s'", (Object[])new Object[]{column});
            outputFields.add(field);
        }
        StructuredRecord.Builder output = StructuredRecord.builder((Schema)Schema.recordOf((String)input.getSchema().getRecordName(), outputFields));
        for (String column : this.columns) {
            output.set(column, input.get(column));
        }
        emitter.emit((Object)new KeyValue((Object)new DBRecord(output.build(), this.columnTypes), null));
    }

    public void destroy() {
        DBUtils.cleanup(this.driverClass);
        this.dbManager.destroy();
    }

    @VisibleForTesting
    void setColumns(List<String> columns) {
        this.columns = ImmutableList.copyOf(columns);
    }

    private void setResultSetMetadata() throws Exception {
        TreeMap<String, Integer> columnToType = new TreeMap<String, Integer>(String.CASE_INSENSITIVE_ORDER);
        this.dbManager.ensureJDBCDriverIsAvailable(this.driverClass);
        try (Connection connection = DriverManager.getConnection(this.dbSinkConfig.connectionString, this.dbSinkConfig.getConnectionArguments());
             Statement statement = connection.createStatement();
             ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM %s WHERE 1 = 0", this.dbSinkConfig.columns, this.dbSinkConfig.tableName));){
            ResultSetMetaData resultSetMetadata = rs.getMetaData();
            FieldCase fieldCase = FieldCase.toFieldCase(this.dbSinkConfig.columnNameCase);
            for (int i = 0; i < rs.getMetaData().getColumnCount(); ++i) {
                String name = resultSetMetadata.getColumnName(i + 1);
                int type = resultSetMetadata.getColumnType(i + 1);
                if (fieldCase == FieldCase.LOWER) {
                    name = name.toLowerCase();
                } else if (fieldCase == FieldCase.UPPER) {
                    name = name.toUpperCase();
                }
                columnToType.put(name, type);
            }
        }
        this.columns = ImmutableList.copyOf((Iterable)Splitter.on((String)",").omitEmptyStrings().trimResults().split((CharSequence)this.dbSinkConfig.columns));
        this.columnTypes = new int[this.columns.size()];
        for (int i = 0; i < this.columnTypes.length; ++i) {
            String name = this.columns.get(i);
            Preconditions.checkArgument((boolean)columnToType.containsKey(name), (String)"Missing column '%s' in SQL table", (Object[])new Object[]{name});
            this.columnTypes[i] = (Integer)columnToType.get(name);
        }
    }

    private static class DBOutputFormatProvider
    implements OutputFormatProvider {
        private final Map<String, String> conf = new HashMap<String, String>();

        DBOutputFormatProvider(DBSinkConfig dbSinkConfig, Class<? extends Driver> driverClass) {
            this.conf.put("io.cdap.hydrator.db.output.autocommit.enabled", String.valueOf(dbSinkConfig.getEnableAutoCommit()));
            if (dbSinkConfig.transactionIsolationLevel != null) {
                this.conf.put("io.cdap.hydrator.db.plugin.transaction.isolation.level", dbSinkConfig.transactionIsolationLevel);
            }
            if (dbSinkConfig.connectionArguments != null) {
                this.conf.put("io.cdap.hydrator.db.connection.arguments", dbSinkConfig.connectionArguments);
            }
            this.conf.put("mapreduce.jdbc.driver.class", driverClass.getName());
            this.conf.put("mapreduce.jdbc.url", dbSinkConfig.connectionString);
            if (dbSinkConfig.user != null) {
                this.conf.put("mapreduce.jdbc.username", dbSinkConfig.user);
            }
            if (dbSinkConfig.password != null) {
                this.conf.put("mapreduce.jdbc.password", dbSinkConfig.password);
            }
            this.conf.put("mapreduce.jdbc.output.table.name", dbSinkConfig.tableName);
            this.conf.put("mapreduce.jdbc.output.field.names", dbSinkConfig.columns);
        }

        public String getOutputFormatClassName() {
            return ETLDBOutputFormat.class.getName();
        }

        public Map<String, String> getOutputFormatConfiguration() {
            return this.conf;
        }
    }

    public static class DBSinkConfig
    extends DBConfig {
        public static final String COLUMNS = "columns";
        public static final String TABLE_NAME = "tableName";
        public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
        @Name(value="columns")
        @Description(value="Comma-separated list of columns in the specified table to export to.")
        @Macro
        public String columns;
        @Name(value="tableName")
        @Description(value="Name of the database table to write to.")
        @Macro
        public String tableName;
        @Nullable
        @Name(value="transactionIsolationLevel")
        @Description(value="The transaction isolation level for queries run by this sink. Defaults to TRANSACTION_SERIALIZABLE. See java.sql.Connection#setTransactionIsolation for more details. The Phoenix jdbc driver will throw an exception if the Phoenix database does not have transactions enabled and this setting is set to true. For drivers like that, this should be set to TRANSACTION_NONE.")
        @Macro
        public String transactionIsolationLevel;
    }
}

