diff --git a/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSource.java b/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSource.java index 365df0ccc..3639f1495 100644 --- a/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSource.java +++ b/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSource.java @@ -24,7 +24,7 @@ import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.DBWritable; import java.util.Map; import javax.annotation.Nullable; diff --git a/database-commons/pom.xml b/database-commons/pom.xml index 79cc52ba4..43b889bc7 100644 --- a/database-commons/pom.xml +++ b/database-commons/pom.xml @@ -59,6 +59,11 @@ cdap-api compile + + org.apache.sqoop + sqoop + hadoop260 + diff --git a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java index 755de2a54..074e91664 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java @@ -51,10 +51,11 @@ * @see org.apache.hadoop.mapreduce.lib.db.DBOutputFormat DBOutputFormat * @see DBWritable DBWritable */ -public class DBRecord implements Writable, DBWritable, Configurable { +public class DBRecord implements Writable, org.apache.sqoop.mapreduce.DBWritable, Configurable { protected StructuredRecord record; protected Configuration conf; - + protected List schemaFields; + protected Schema schema; /** * Need to cache {@link ResultSetMetaData} of the record for use during writing to a table. * This is because we cannot rely on JDBC drivers to properly set metadata in the {@link PreparedStatement} @@ -97,8 +98,10 @@ public StructuredRecord getRecord() { */ public void readFields(ResultSet resultSet) throws SQLException { ResultSetMetaData metadata = resultSet.getMetaData(); - List schemaFields = getSchemaReader().getSchemaFields(resultSet, conf.get(DBUtils.OVERRIDE_SCHEMA)); - Schema schema = Schema.recordOf("dbRecord", schemaFields); + if (schemaFields == null) { + schemaFields = DBUtils.getSchemaFields(resultSet, conf.get(DBUtils.OVERRIDE_SCHEMA)); + schema = Schema.recordOf("dbRecord", schemaFields); + } StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema); for (int i = 0; i < schemaFields.size(); i++) { Schema.Field field = schemaFields.get(i); diff --git a/database-commons/src/main/java/io/cdap/plugin/db/FieldCase.java b/database-commons/src/main/java/io/cdap/plugin/db/FieldCase.java new file mode 100644 index 000000000..b65980d40 --- /dev/null +++ b/database-commons/src/main/java/io/cdap/plugin/db/FieldCase.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db; + +import com.google.common.base.Strings; + +/** + * Enum to denote case of Structured Record field. + */ +public enum FieldCase { + LOWER, + UPPER, + NONE; + + public static FieldCase toFieldCase(String fieldCase) { + if (Strings.isNullOrEmpty(fieldCase)) { + return FieldCase.NONE; + } + + try { + return FieldCase.valueOf(fieldCase.toUpperCase()); + } catch (IllegalArgumentException e) { + return FieldCase.NONE; + } + } +} diff --git a/database-commons/src/main/java/io/cdap/plugin/db/StructuredRecordUtils.java b/database-commons/src/main/java/io/cdap/plugin/db/StructuredRecordUtils.java new file mode 100644 index 000000000..6334a3f9b --- /dev/null +++ b/database-commons/src/main/java/io/cdap/plugin/db/StructuredRecordUtils.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utils class that contains StructuredRecord related transformations. + */ +public class StructuredRecordUtils { + + /** + * Converts the field names in the input {@link StructuredRecord} to a desired case + * + * @param input {@link StructuredRecord} + * @param fieldCase {@link FieldCase} + * @return {@link StructuredRecord} which contains field names confirming to the {@link FieldCase} passed in + * @throws Exception if there is a conflict in the field names while converting the case + */ + public static StructuredRecord convertCase(StructuredRecord input, FieldCase fieldCase) throws Exception { + if (fieldCase.equals(FieldCase.NONE)) { + return input; + } + + Schema oldSchema = input.getSchema(); + Map fieldNameMap = new HashMap<>(); + List newFields = new ArrayList<>(); + for (Schema.Field field : oldSchema.getFields()) { + String newName = changeName(field.getName(), fieldCase); + if (fieldNameMap.containsValue(newName)) { + // field name used already. indication of field names conflict. can't do anything. + throw new IllegalStateException(String.format( + "Duplicate field/column name %s found when trying to confirm to the chosen case option %s. " + + "Check Database Table schema.", field.getName(), fieldCase)); + } + fieldNameMap.put(field.getName(), newName); + newFields.add(Schema.Field.of(newName, field.getSchema())); + } + StructuredRecord.Builder recordBuilder = StructuredRecord.builder(Schema.recordOf("dbRecord", newFields)); + for (Map.Entry nameMap : fieldNameMap.entrySet()) { + recordBuilder.set(nameMap.getValue(), input.get(nameMap.getKey())); + } + return recordBuilder.build(); + } + + private StructuredRecordUtils() { + } + + private static String changeName(String oldName, FieldCase fieldCase) { + switch (fieldCase) { + case LOWER: + return oldName.toLowerCase(); + case UPPER: + return oldName.toUpperCase(); + default: + return oldName; + } + } +} diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java index dfcc54624..fd3230766 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java @@ -39,15 +39,16 @@ import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.DBConfig; import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.FieldCase; import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.db.StructuredRecordUtils; import io.cdap.plugin.db.batch.TransactionIsolationLevel; import io.cdap.plugin.util.DBUtils; import io.cdap.plugin.util.DriverCleanup; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.db.DBConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,7 @@ public abstract class AbstractDBSource extends ReferenceBatchSource driverClass; + protected FieldCase fieldCase; public AbstractDBSource(DBSourceConfig sourceConfig) { super(new ReferencePluginConfig(sourceConfig.referenceName)); @@ -115,7 +117,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { @Path("getSchema") public Schema getSchema(GetSchemaRequest request, EndpointPluginContext pluginContext) throws IllegalAccessException, - SQLException, InstantiationException { + SQLException, InstantiationException, ClassNotFoundException { DriverCleanup driverCleanup; try { @@ -144,7 +146,7 @@ protected SchemaReader getSchemaReader() { private DriverCleanup loadPluginClassAndGetDriver(GetSchemaRequest request, EndpointPluginContext pluginContext) - throws IllegalAccessException, InstantiationException, SQLException { + throws IllegalAccessException, InstantiationException, SQLException, ClassNotFoundException { Class driverClass = pluginContext.loadPluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, @@ -196,16 +198,18 @@ public void prepareRun(BatchSourceContext context) throws Exception { ConnectionConfig.JDBC_PLUGIN_TYPE, sourceConfig.jdbcPluginName, connectionString, sourceConfig.getImportQuery(), sourceConfig.getBoundingQuery()); - Configuration hConf = new Configuration(); + JobConf hConf = new JobConf(); hConf.clear(); + int fetchSize = 1000; // Load the plugin class to make sure it is available. Class driverClass = context.loadPluginClass(getJDBCPluginId()); if (sourceConfig.user == null && sourceConfig.password == null) { - DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString); + DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString, fetchSize); } else { DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString, - sourceConfig.user, sourceConfig.password); + sourceConfig.user, sourceConfig.password, fetchSize); + hConf.set("io.cdap.cdap.jdbc.passwd", sourceConfig.password); } DataDrivenETLDBInputFormat.setInput(hConf, getDBRecordType(), @@ -240,19 +244,20 @@ public void prepareRun(BatchSourceContext context) throws Exception { new SourceInputFormatProvider(DataDrivenETLDBInputFormat.class, hConf))); } - protected Class getDBRecordType() { + protected Class getDBRecordType() { return DBRecord.class; } @Override public void initialize(BatchRuntimeContext context) throws Exception { super.initialize(context); - driverClass = context.loadPluginClass(getJDBCPluginId()); + driverClass = context.loadPluginClass(getJDBCPluginId());; + fieldCase = FieldCase.toFieldCase(sourceConfig.columnNameCase); } @Override public void transform(KeyValue input, Emitter emitter) throws Exception { - emitter.emit(input.getValue().getRecord()); + emitter.emit(StructuredRecordUtils.convertCase(input.getValue().getRecord(), fieldCase)); } @Override @@ -276,6 +281,7 @@ public abstract static class DBSourceConfig extends DBConfig { public static final String NUM_SPLITS = "numSplits"; public static final String SCHEMA = "schema"; public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel"; + public static final String COLUMN_NAME_CASE = "columnCase"; @Name(IMPORT_QUERY) @Description("The SELECT query to use to import data from the specified table. " + @@ -315,6 +321,17 @@ public abstract static class DBSourceConfig extends DBConfig { "is not correctly getting marked as nullable.") public String schema; + + @Name(COLUMN_NAME_CASE) + @Description("Sets the case of the column names returned from the query. " + + "Possible options are upper or lower. By default or for any other input, the column names are not modified and " + + "the names returned from the database are used as-is. Note that setting this property provides predictability " + + "of column name cases across different databases but might result in column name conflicts if multiple column " + + "names are the same when the case is ignored.") + @Nullable + public String columnNameCase; + + private String getImportQuery() { return cleanQuery(importQuery); } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java index 48f412e69..24ce4b665 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java @@ -20,16 +20,14 @@ import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.JDBCDriverShim; import io.cdap.plugin.db.batch.NoOpCommitConnection; -import io.cdap.plugin.db.batch.TransactionIsolationLevel; import io.cdap.plugin.util.DBUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; -import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; -import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; +import org.apache.sqoop.mapreduce.DBWritable; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.mapreduce.db.DBInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,12 +41,13 @@ /** * Class that extends {@link DBInputFormat} to load the database driver class correctly. */ -public class DataDrivenETLDBInputFormat extends DataDrivenDBInputFormat { +public class DataDrivenETLDBInputFormat extends org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat { public static final String AUTO_COMMIT_ENABLED = "io.cdap.plugin.db.autocommit.enabled"; private static final Logger LOG = LoggerFactory.getLogger(DataDrivenETLDBInputFormat.class); private Driver driver; private JDBCDriverShim driverShim; + private Connection connection; static void setInput(Configuration conf, Class inputClass, @@ -92,7 +91,7 @@ public Connection getConnection() { Properties properties = ConnectionConfig.getConnectionArguments(conf.get(DBUtils.CONNECTION_ARGUMENTS), conf.get(DBConfiguration.USERNAME_PROPERTY), - conf.get(DBConfiguration.PASSWORD_PROPERTY)); + conf.get("io.cdap.cdap.jdbc.passwd")); connection = DriverManager.getConnection(url, properties); @@ -103,9 +102,11 @@ public Connection getConnection() { } else { this.connection.setAutoCommit(false); } - String level = conf.get(TransactionIsolationLevel.CONF_KEY); - LOG.debug("Transaction isolation level: {}", level); - connection.setTransactionIsolation(TransactionIsolationLevel.getLevel(level)); + this.connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); +// +// String level = conf.get(TransactionIsolationLevel.CONF_KEY); +// LOG.debug("Transaction isolation level: {}", level); +// connection.setTransactionIsolation(TransactionIsolationLevel.getLevel(level)); } catch (Exception e) { throw Throwables.propagate(e); } diff --git a/database-commons/src/test/java/io/cdap/plugin/db/StructuredRecordUtilsTest.java b/database-commons/src/test/java/io/cdap/plugin/db/StructuredRecordUtilsTest.java new file mode 100644 index 000000000..1d731da58 --- /dev/null +++ b/database-commons/src/test/java/io/cdap/plugin/db/StructuredRecordUtilsTest.java @@ -0,0 +1,69 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit Tests for {@link StructuredRecordUtils}. + */ +public class StructuredRecordUtilsTest { + + @Test + public void testLowerAndUpperCaseTransformation() throws Exception { + StructuredRecord record = StructuredRecord.builder( + Schema.recordOf("dbrecord", + Schema.Field.of("Name", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("Age", Schema.of(Schema.Type.INT)))).set("Name", "Abcd").set("Age", 10).build(); + StructuredRecord upperCaseRecord = StructuredRecordUtils.convertCase( + record, FieldCase.toFieldCase("upPer")); + Assert.assertEquals("Abcd", upperCaseRecord.get("NAME")); + Assert.assertEquals(10, upperCaseRecord.get("AGE").intValue()); + Assert.assertNull(upperCaseRecord.get("Age")); + Assert.assertNull(upperCaseRecord.get("Name")); + + StructuredRecord lowerCaseRecord = StructuredRecordUtils.convertCase( + record, FieldCase.toFieldCase("lowEr")); + Assert.assertEquals("Abcd", lowerCaseRecord.get("name")); + Assert.assertEquals(10, lowerCaseRecord.get("age").intValue()); + Assert.assertNull(upperCaseRecord.get("Age")); + Assert.assertNull(upperCaseRecord.get("Name")); + + StructuredRecord noChangeRecord = StructuredRecordUtils.convertCase( + record, FieldCase.toFieldCase("no change")); + Assert.assertEquals("Abcd", noChangeRecord.get("Name")); + Assert.assertEquals(10, noChangeRecord.get("Age").intValue()); + } + + @Test + public void testInvalidTransformation() throws Exception { + StructuredRecord record = StructuredRecord.builder( + Schema.recordOf("dbrecord", + Schema.Field.of("age", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("Age", Schema.of(Schema.Type.INT)))).set("age", "10").set("Age", 10).build(); + + try { + StructuredRecordUtils.convertCase(record, FieldCase.toFieldCase("lower")); + Assert.fail(); + } catch (Exception e) { + //expected + } + } +} diff --git a/db2-plugin/pom.xml b/db2-plugin/pom.xml index cd16e1389..70e17ba45 100644 --- a/db2-plugin/pom.xml +++ b/db2-plugin/pom.xml @@ -78,6 +78,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + @@ -93,7 +101,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java b/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java index cb5cc8088..75bed821b 100644 --- a/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java +++ b/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java @@ -22,7 +22,7 @@ import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.DBWritable; /** diff --git a/generic-database-plugin/pom.xml b/generic-database-plugin/pom.xml index f8524dde7..58b0aa74b 100644 --- a/generic-database-plugin/pom.xml +++ b/generic-database-plugin/pom.xml @@ -83,6 +83,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + diff --git a/mssql-plugin/pom.xml b/mssql-plugin/pom.xml index 25f7f40ef..89dbeb466 100644 --- a/mssql-plugin/pom.xml +++ b/mssql-plugin/pom.xml @@ -78,6 +78,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + @@ -93,7 +101,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/mysql-plugin/pom.xml b/mysql-plugin/pom.xml index b706b557d..d4234671c 100644 --- a/mysql-plugin/pom.xml +++ b/mysql-plugin/pom.xml @@ -78,6 +78,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + @@ -93,7 +101,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/netezza-plugin/pom.xml b/netezza-plugin/pom.xml index 407fc94da..cc272e56c 100644 --- a/netezza-plugin/pom.xml +++ b/netezza-plugin/pom.xml @@ -78,6 +78,14 @@ RELEASE test + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + @@ -93,7 +101,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java b/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java index 92235a371..2811d0b4b 100644 --- a/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java +++ b/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java @@ -22,8 +22,7 @@ import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; - +import org.apache.sqoop.mapreduce.DBWritable; /** * Batch source to read from Netezza. diff --git a/oracle-plugin/pom.xml b/oracle-plugin/pom.xml index 9b32e92f6..790bd1fd9 100644 --- a/oracle-plugin/pom.xml +++ b/oracle-plugin/pom.xml @@ -83,6 +83,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + @@ -98,7 +106,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 518513fdd..5f70053fe 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -23,7 +23,7 @@ import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.DBWritable; import java.util.Map; import javax.annotation.Nullable; diff --git a/pom.xml b/pom.xml index 36003755f..c389feadf 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ 0.9.0 + 1.4.7 @@ -230,6 +231,22 @@ + + org.apache.sqoop + sqoop + hadoop260 + ${sqoop.version} + + + org.slf4j + jcl-over-slf4j + ${slf4j.version} + + + org.slf4j + log4j-over-slf4j + ${slf4j.version} + org.apache.hadoop hadoop-mapreduce-client-core diff --git a/postgresql-plugin/pom.xml b/postgresql-plugin/pom.xml index bee71bafa..3451374e8 100644 --- a/postgresql-plugin/pom.xml +++ b/postgresql-plugin/pom.xml @@ -99,7 +99,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java index 1550f2f19..75d61e17e 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java @@ -24,7 +24,7 @@ import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.DBWritable; import java.util.Map; import javax.annotation.Nullable;