Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Sqoop library for DB plugins #31

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig;
import io.cdap.plugin.db.batch.source.AbstractDBSource;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.sqoop.mapreduce.DBWritable;

import java.util.Map;
import javax.annotation.Nullable;
Expand Down
5 changes: 5 additions & 0 deletions database-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<artifactId>cdap-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop</artifactId>
<classifier>hadoop260</classifier>
</dependency>
</dependencies>

<build>
Expand Down
11 changes: 7 additions & 4 deletions database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@
* @see org.apache.hadoop.mapreduce.lib.db.DBOutputFormat DBOutputFormat
* @see DBWritable DBWritable
*/
public class DBRecord implements Writable, DBWritable, Configurable {
public class DBRecord implements Writable, org.apache.sqoop.mapreduce.DBWritable, Configurable {
protected StructuredRecord record;
protected Configuration conf;

protected List<Schema.Field> schemaFields;
protected Schema schema;
/**
* Need to cache {@link ResultSetMetaData} of the record for use during writing to a table.
* This is because we cannot rely on JDBC drivers to properly set metadata in the {@link PreparedStatement}
Expand Down Expand Up @@ -97,8 +98,10 @@ public StructuredRecord getRecord() {
*/
public void readFields(ResultSet resultSet) throws SQLException {
ResultSetMetaData metadata = resultSet.getMetaData();
List<Schema.Field> schemaFields = getSchemaReader().getSchemaFields(resultSet, conf.get(DBUtils.OVERRIDE_SCHEMA));
Schema schema = Schema.recordOf("dbRecord", schemaFields);
if (schemaFields == null) {
schemaFields = DBUtils.getSchemaFields(resultSet, conf.get(DBUtils.OVERRIDE_SCHEMA));
schema = Schema.recordOf("dbRecord", schemaFields);
}
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);
for (int i = 0; i < schemaFields.size(); i++) {
Schema.Field field = schemaFields.get(i);
Expand Down
40 changes: 40 additions & 0 deletions database-commons/src/main/java/io/cdap/plugin/db/FieldCase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.db;

import com.google.common.base.Strings;

/**
* Enum to denote case of Structured Record field.
*/
public enum FieldCase {
LOWER,
UPPER,
NONE;

public static FieldCase toFieldCase(String fieldCase) {
if (Strings.isNullOrEmpty(fieldCase)) {
return FieldCase.NONE;
}

try {
return FieldCase.valueOf(fieldCase.toUpperCase());
} catch (IllegalArgumentException e) {
return FieldCase.NONE;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.db;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Utils class that contains StructuredRecord related transformations.
*/
public class StructuredRecordUtils {

/**
* Converts the field names in the input {@link StructuredRecord} to a desired case
*
* @param input {@link StructuredRecord}
* @param fieldCase {@link FieldCase}
* @return {@link StructuredRecord} which contains field names confirming to the {@link FieldCase} passed in
* @throws Exception if there is a conflict in the field names while converting the case
*/
public static StructuredRecord convertCase(StructuredRecord input, FieldCase fieldCase) throws Exception {
if (fieldCase.equals(FieldCase.NONE)) {
return input;
}

Schema oldSchema = input.getSchema();
Map<String, String> fieldNameMap = new HashMap<>();
List<Schema.Field> newFields = new ArrayList<>();
for (Schema.Field field : oldSchema.getFields()) {
String newName = changeName(field.getName(), fieldCase);
if (fieldNameMap.containsValue(newName)) {
// field name used already. indication of field names conflict. can't do anything.
throw new IllegalStateException(String.format(
"Duplicate field/column name %s found when trying to confirm to the chosen case option %s. " +
"Check Database Table schema.", field.getName(), fieldCase));
}
fieldNameMap.put(field.getName(), newName);
newFields.add(Schema.Field.of(newName, field.getSchema()));
}
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(Schema.recordOf("dbRecord", newFields));
for (Map.Entry<String, String> nameMap : fieldNameMap.entrySet()) {
recordBuilder.set(nameMap.getValue(), input.get(nameMap.getKey()));
}
return recordBuilder.build();
}

private StructuredRecordUtils() {
}

private static String changeName(String oldName, FieldCase fieldCase) {
switch (fieldCase) {
case LOWER:
return oldName.toLowerCase();
case UPPER:
return oldName.toUpperCase();
default:
return oldName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.DBConfig;
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.FieldCase;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.StructuredRecordUtils;
import io.cdap.plugin.db.batch.TransactionIsolationLevel;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -72,6 +73,7 @@ public abstract class AbstractDBSource extends ReferenceBatchSource<LongWritable

protected final DBSourceConfig sourceConfig;
protected Class<? extends Driver> driverClass;
protected FieldCase fieldCase;

public AbstractDBSource(DBSourceConfig sourceConfig) {
super(new ReferencePluginConfig(sourceConfig.referenceName));
Expand Down Expand Up @@ -115,7 +117,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
@Path("getSchema")
public Schema getSchema(GetSchemaRequest request,
EndpointPluginContext pluginContext) throws IllegalAccessException,
SQLException, InstantiationException {
SQLException, InstantiationException, ClassNotFoundException {
DriverCleanup driverCleanup;
try {

Expand Down Expand Up @@ -144,7 +146,7 @@ protected SchemaReader getSchemaReader() {

private DriverCleanup loadPluginClassAndGetDriver(GetSchemaRequest request,
EndpointPluginContext pluginContext)
throws IllegalAccessException, InstantiationException, SQLException {
throws IllegalAccessException, InstantiationException, SQLException, ClassNotFoundException {

Class<? extends Driver> driverClass =
pluginContext.loadPluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE,
Expand Down Expand Up @@ -196,16 +198,18 @@ public void prepareRun(BatchSourceContext context) throws Exception {
ConnectionConfig.JDBC_PLUGIN_TYPE, sourceConfig.jdbcPluginName,
connectionString,
sourceConfig.getImportQuery(), sourceConfig.getBoundingQuery());
Configuration hConf = new Configuration();
JobConf hConf = new JobConf();
hConf.clear();

int fetchSize = 1000;
// Load the plugin class to make sure it is available.
Class<? extends Driver> driverClass = context.loadPluginClass(getJDBCPluginId());
if (sourceConfig.user == null && sourceConfig.password == null) {
DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString);
DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString, fetchSize);
} else {
DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString,
sourceConfig.user, sourceConfig.password);
sourceConfig.user, sourceConfig.password, fetchSize);
hConf.set("io.cdap.cdap.jdbc.passwd", sourceConfig.password);
}

DataDrivenETLDBInputFormat.setInput(hConf, getDBRecordType(),
Expand Down Expand Up @@ -240,19 +244,20 @@ public void prepareRun(BatchSourceContext context) throws Exception {
new SourceInputFormatProvider(DataDrivenETLDBInputFormat.class, hConf)));
}

protected Class<? extends DBWritable> getDBRecordType() {
protected Class<? extends org.apache.sqoop.mapreduce.DBWritable> getDBRecordType() {
return DBRecord.class;
}

@Override
public void initialize(BatchRuntimeContext context) throws Exception {
super.initialize(context);
driverClass = context.loadPluginClass(getJDBCPluginId());
driverClass = context.loadPluginClass(getJDBCPluginId());;
fieldCase = FieldCase.toFieldCase(sourceConfig.columnNameCase);
}

@Override
public void transform(KeyValue<LongWritable, DBRecord> input, Emitter<StructuredRecord> emitter) throws Exception {
emitter.emit(input.getValue().getRecord());
emitter.emit(StructuredRecordUtils.convertCase(input.getValue().getRecord(), fieldCase));
}

@Override
Expand All @@ -276,6 +281,7 @@ public abstract static class DBSourceConfig extends DBConfig {
public static final String NUM_SPLITS = "numSplits";
public static final String SCHEMA = "schema";
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
public static final String COLUMN_NAME_CASE = "columnCase";

@Name(IMPORT_QUERY)
@Description("The SELECT query to use to import data from the specified table. " +
Expand Down Expand Up @@ -315,6 +321,17 @@ public abstract static class DBSourceConfig extends DBConfig {
"is not correctly getting marked as nullable.")
public String schema;


@Name(COLUMN_NAME_CASE)
@Description("Sets the case of the column names returned from the query. " +
"Possible options are upper or lower. By default or for any other input, the column names are not modified and " +
"the names returned from the database are used as-is. Note that setting this property provides predictability " +
"of column name cases across different databases but might result in column name conflicts if multiple column " +
"names are the same when the case is ignored.")
@Nullable
public String columnNameCase;


private String getImportQuery() {
return cleanQuery(importQuery);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.JDBCDriverShim;
import io.cdap.plugin.db.batch.NoOpCommitConnection;
import io.cdap.plugin.db.batch.TransactionIsolationLevel;
import io.cdap.plugin.util.DBUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import org.apache.sqoop.mapreduce.DBWritable;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.db.DBInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,12 +41,13 @@
/**
* Class that extends {@link DBInputFormat} to load the database driver class correctly.
*/
public class DataDrivenETLDBInputFormat extends DataDrivenDBInputFormat {
public class DataDrivenETLDBInputFormat extends org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat {
public static final String AUTO_COMMIT_ENABLED = "io.cdap.plugin.db.autocommit.enabled";

private static final Logger LOG = LoggerFactory.getLogger(DataDrivenETLDBInputFormat.class);
private Driver driver;
private JDBCDriverShim driverShim;
private Connection connection;

static void setInput(Configuration conf,
Class<? extends DBWritable> inputClass,
Expand Down Expand Up @@ -92,7 +91,7 @@ public Connection getConnection() {
Properties properties =
ConnectionConfig.getConnectionArguments(conf.get(DBUtils.CONNECTION_ARGUMENTS),
conf.get(DBConfiguration.USERNAME_PROPERTY),
conf.get(DBConfiguration.PASSWORD_PROPERTY));
conf.get("io.cdap.cdap.jdbc.passwd"));
connection = DriverManager.getConnection(url, properties);


Expand All @@ -103,9 +102,11 @@ public Connection getConnection() {
} else {
this.connection.setAutoCommit(false);
}
String level = conf.get(TransactionIsolationLevel.CONF_KEY);
LOG.debug("Transaction isolation level: {}", level);
connection.setTransactionIsolation(TransactionIsolationLevel.getLevel(level));
this.connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
//
// String level = conf.get(TransactionIsolationLevel.CONF_KEY);
// LOG.debug("Transaction isolation level: {}", level);
// connection.setTransactionIsolation(TransactionIsolationLevel.getLevel(level));
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down
Loading