diff --git a/amazon-redshift-plugin/docs/Redshift-batchsource.md b/amazon-redshift-plugin/docs/Redshift-batchsource.md
new file mode 100644
index 000000000..38873b15a
--- /dev/null
+++ b/amazon-redshift-plugin/docs/Redshift-batchsource.md
@@ -0,0 +1,102 @@
+# Amazon Redshift Batch Source
+
+Description
+-----------
+Reads from an Amazon Redshift database using a configurable SQL query.
+Outputs one record for each row returned by the query.
+
+
+Use Case
+--------
+The source is used whenever you need to read from an Amazon Redshift database. For example, you may want
+to create daily snapshots of a database table by using this source and writing to
+a TimePartitionedFileSet.
+
+
+Properties
+----------
+**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
+
+**JDBC Driver name:** Name of the JDBC driver to use.
+
+**Host:** Host URL of the current master instance of Redshift cluster.
+
+**Port:** Port that Redshift master instance is listening to.
+
+**Database:** Redshift database name.
+
+**Import Query:** The SELECT query to use to import data from the specified table.
+You can specify an arbitrary number of columns to import, or import all columns using \*. The Query should
+contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDITIONS'.
+The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
+The '$CONDITIONS' string is not required if numSplits is set to one.
+
+**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field.
+For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.
+
+**Split-By Field Name:** Field Name which will be used to generate splits. Not required if numSplits is set to one.
+
+**Number of Splits to Generate:** Number of splits to generate.
+
+**Username:** User identity for connecting to the specified database.
+
+**Password:** Password to use to connect to the specified database.
+
+**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
+will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
+
+**Schema:** The schema of records output by the source. This will be used in place of whatever schema comes
+back from the query. However, it must match the schema that comes back from the query,
+except it can mark fields as nullable and can contain a subset of the fields.
+
+**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import,
+with the tradeoff of higher memory usage.
+
+Example
+------
+Suppose you want to read data from an Amazon Redshift database named "prod" that is running on
+"redshift.xyz.eu-central-1.redshift.amazonaws.com", port 5439, as "sa" user with "Test11" password.
+Ensure that the driver for Redshift is installed (you can also provide driver name for some specific driver,
+otherwise "redshift" will be used), then configure the plugin with:then configure plugin with:
+
+```
+Reference Name: "src1"
+Driver Name: "redshift"
+Host: "redshift.xyz.eu-central-1.redshift.amazonaws.com"
+Port: 5439
+Database: "prod"
+Import Query: "select id, name, email, phone from users;"
+Number of Splits to Generate: 1
+Username: "sa"
+Password: "Test11"
+```
+
+Data Types Mapping
+------------------
+
+Mapping of Redshift types to CDAP schema:
+
+| Redshift Data Type | CDAP Schema Data Type | Comment |
+|-----------------------------------------------------|-----------------------|----------------------------------|
+| bigint | long | |
+| boolean | boolean | |
+| character | string | |
+| character varying | string | |
+| double precision | double | |
+| integer | int | |
+| numeric(precision, scale)/decimal(precision, scale) | decimal | |
+| numeric(with 0 precision) | string | |
+| real | float | |
+| smallint | int | |
+| smallserial | int | |
+| text | string | |
+| date | date | |
+| time [ (p) ] [ without time zone ] | time | |
+| time [ (p) ] with time zone | string | |
+| timestamp [ (p) ] [ without time zone ] | timestamp | |
+| timestamp [ (p) ] with time zone | timestamp | stored in UTC format in database |
+| xml | string | |
+| json | string | |
+| super | string | |
+| geometry | bytes | |
+| hllsketch | string | |
diff --git a/amazon-redshift-plugin/docs/Redshift-connector.md b/amazon-redshift-plugin/docs/Redshift-connector.md
new file mode 100644
index 000000000..368d9e09f
--- /dev/null
+++ b/amazon-redshift-plugin/docs/Redshift-connector.md
@@ -0,0 +1,26 @@
+# Amazon Redshift Connection
+
+Description
+-----------
+Use this connection to access data in an Amazon Redshift database using JDBC.
+
+Properties
+----------
+**Name:** Name of the connection. Connection names must be unique in a namespace.
+
+**Description:** Description of the connection.
+
+**JDBC Driver name:** Name of the JDBC driver to use.
+
+**Host:** Host of the current master instance of Redshift cluster.
+
+**Port:** Port that Redshift master instance is listening to.
+
+**Database:** Redshift database name.
+
+**Username:** User identity for connecting to the specified database.
+
+**Password:** Password to use to connect to the specified database.
+
+**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
+will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
diff --git a/amazon-redshift-plugin/icons/Redshift-batchsource.png b/amazon-redshift-plugin/icons/Redshift-batchsource.png
new file mode 100644
index 000000000..11c334799
Binary files /dev/null and b/amazon-redshift-plugin/icons/Redshift-batchsource.png differ
diff --git a/amazon-redshift-plugin/pom.xml b/amazon-redshift-plugin/pom.xml
new file mode 100644
index 000000000..17aa5e48b
--- /dev/null
+++ b/amazon-redshift-plugin/pom.xml
@@ -0,0 +1,139 @@
+
+
+
+
+ database-plugins-parent
+ io.cdap.plugin
+ 1.12.0-SNAPSHOT
+
+
+ Amazon Redshift plugin
+ amazon-redshift-plugin
+ 4.0.0
+
+
+ 2.1.0.18
+
+
+
+
+ redshift
+ http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release
+
+
+
+
+
+ io.cdap.cdap
+ cdap-etl-api
+
+
+ io.cdap.plugin
+ database-commons
+ ${project.version}
+
+
+ io.cdap.plugin
+ hydrator-common
+
+
+ com.google.guava
+ guava
+
+
+
+
+ com.amazon.redshift
+ redshift-jdbc42
+ ${redshift-jdbc.version}
+ test
+
+
+ io.cdap.plugin
+ database-commons
+ ${project.version}
+ test-jar
+ test
+
+
+ io.cdap.cdap
+ hydrator-test
+
+
+ io.cdap.cdap
+ cdap-data-pipeline3_2.12
+
+
+ junit
+ junit
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ io.cdap.cdap
+ cdap-api
+ provided
+
+
+ org.jetbrains
+ annotations
+ RELEASE
+ compile
+
+
+
+
+
+ io.cdap
+ cdap-maven-plugin
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 5.1.2
+ true
+
+
+ <_exportcontents>
+ io.cdap.plugin.amazon.redshift.*;
+ io.cdap.plugin.db.source.*;
+ org.apache.commons.lang;
+ org.apache.commons.logging.*;
+ org.codehaus.jackson.*
+
+ *;inline=false;scope=compile
+ true
+ lib
+
+
+
+
+ package
+
+ bundle
+
+
+
+
+
+
+
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java
new file mode 100644
index 000000000..fb8cac4a7
--- /dev/null
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.cdap.api.annotation.Category;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.connector.Connector;
+import io.cdap.cdap.etl.api.connector.ConnectorSpec;
+import io.cdap.cdap.etl.api.connector.ConnectorSpecRequest;
+import io.cdap.cdap.etl.api.connector.PluginSpec;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.common.ReferenceNames;
+import io.cdap.plugin.common.db.DBConnectorPath;
+import io.cdap.plugin.common.db.DBPath;
+import io.cdap.plugin.db.SchemaReader;
+import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Amazon Redshift Database Connector that connects to Amazon Redshift database via JDBC.
+ */
+@Plugin(type = Connector.PLUGIN_TYPE)
+@Name(RedshiftConnector.NAME)
+@Description("Connection to access data in Amazon Redshift using JDBC.")
+@Category("Database")
+public class RedshiftConnector extends AbstractDBSpecificConnector {
+ public static final String NAME = RedshiftConstants.PLUGIN_NAME;
+ private final RedshiftConnectorConfig config;
+
+ public RedshiftConnector(RedshiftConnectorConfig config) {
+ super(config);
+ this.config = config;
+ }
+
+ @Override
+ protected DBConnectorPath getDBConnectorPath(String path) throws IOException {
+ return new DBPath(path, true);
+ }
+
+ @Override
+ public boolean supportSchema() {
+ return true;
+ }
+
+ @Override
+ protected Class extends DBWritable> getDBRecordType() {
+ return RedshiftDBRecord.class;
+ }
+
+ @Override
+ public StructuredRecord transform(LongWritable longWritable, RedshiftDBRecord redshiftDBRecord) {
+ return redshiftDBRecord.getRecord();
+ }
+
+ @Override
+ protected SchemaReader getSchemaReader(String sessionID) {
+ return new RedshiftSchemaReader(sessionID);
+ }
+
+ @Override
+ protected String getTableName(String database, String schema, String table) {
+ return String.format("\"%s\".\"%s\"", schema, table);
+ }
+
+ @Override
+ protected String getRandomQuery(String tableName, int limit) {
+ return String.format("SELECT * FROM %s\n" +
+ "TABLESAMPLE BERNOULLI (100.0 * %d / (SELECT COUNT(*) FROM %s))",
+ tableName, limit, tableName);
+ }
+
+ @Override
+ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
+ ConnectorSpec.Builder builder) {
+ Map sourceProperties = new HashMap<>();
+ setConnectionProperties(sourceProperties, request);
+ builder
+ .addRelatedPlugin(new PluginSpec(RedshiftConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE, sourceProperties));
+
+ String schema = path.getSchema();
+ sourceProperties.put(RedshiftSource.RedshiftSourceConfig.NUM_SPLITS, "1");
+ sourceProperties.put(RedshiftSource.RedshiftSourceConfig.FETCH_SIZE,
+ RedshiftSource.RedshiftSourceConfig.DEFAULT_FETCH_SIZE);
+ String table = path.getTable();
+ if (table == null) {
+ return;
+ }
+ sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY,
+ getTableQuery(path.getDatabase(), schema, table));
+ sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
+ }
+
+}
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorConfig.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorConfig.java
new file mode 100644
index 000000000..f05f26d10
--- /dev/null
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
+
+import javax.annotation.Nullable;
+
+/**
+ * Configuration for Redshift connector
+ */
+public class RedshiftConnectorConfig extends AbstractDBConnectorConfig {
+
+ @Name(ConnectionConfig.HOST)
+ @Description(
+ "The endpoint of the Amazon Redshift cluster.")
+ @Macro
+ private String host;
+
+ @Name(ConnectionConfig.PORT)
+ @Description("Database port number")
+ @Macro
+ @Nullable
+ private Integer port;
+
+ @Name(ConnectionConfig.DATABASE)
+ @Description("Database name to connect to")
+ @Macro
+ private String database;
+
+ public RedshiftConnectorConfig(String username, String password, String jdbcPluginName,
+ String connectionArguments, String host,
+ String database, @Nullable Integer port) {
+ this.user = username;
+ this.password = password;
+ this.jdbcPluginName = jdbcPluginName;
+ this.connectionArguments = connectionArguments;
+ this.host = host;
+ this.database = database;
+ this.port = port;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port == null ? 5439 : port;
+ }
+
+ @Override
+ public String getConnectionString() {
+ return String.format(
+ RedshiftConstants.REDSHIFT_CONNECTION_STRING_FORMAT,
+ host,
+ getPort(),
+ database);
+ }
+
+ @Override
+ public boolean canConnect() {
+ return super.canConnect() && !containsMacro(ConnectionConfig.HOST) &&
+ !containsMacro(ConnectionConfig.PORT) && !containsMacro(ConnectionConfig.DATABASE);
+ }
+}
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConstants.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConstants.java
new file mode 100644
index 000000000..081052fb1
--- /dev/null
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+/** Amazon Redshift constants. */
+public final class RedshiftConstants {
+
+ private RedshiftConstants() {
+ }
+
+ public static final String PLUGIN_NAME = "Redshift";
+ public static final String REDSHIFT_CONNECTION_STRING_FORMAT = "jdbc:redshift://%s:%s/%s";
+}
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecord.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecord.java
new file mode 100644
index 000000000..38e9140d8
--- /dev/null
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecord.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.db.DBRecord;
+import io.cdap.plugin.db.SchemaReader;
+import io.cdap.plugin.util.DBUtils;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+/**
+ * Writable class for Redshift Source
+ */
+public class RedshiftDBRecord extends DBRecord {
+
+ /**
+ * Used in map-reduce. Do not remove.
+ */
+ @SuppressWarnings("unused")
+ public RedshiftDBRecord() {
+ }
+
+ @Override
+ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
+ int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
+ ResultSetMetaData metadata = resultSet.getMetaData();
+ String columnTypeName = metadata.getColumnTypeName(columnIndex);
+ if (isUseSchema(metadata, columnIndex)) {
+ setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
+ return;
+ }
+
+ // HandleTimestamp
+ if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) {
+ Timestamp timestamp = resultSet.getTimestamp(columnIndex, DBUtils.PURE_GREGORIAN_CALENDAR);
+ if (timestamp != null) {
+ ZonedDateTime zonedDateTime = OffsetDateTime.of(timestamp.toLocalDateTime(), OffsetDateTime.now().getOffset())
+ .atZoneSameInstant(ZoneId.of("UTC"));
+ Schema nonNullableSchema = field.getSchema().isNullable() ?
+ field.getSchema().getNonNullable() : field.getSchema();
+ setZonedDateTimeBasedOnOutputSchema(recordBuilder, nonNullableSchema.getLogicalType(),
+ field.getName(), zonedDateTime);
+ } else {
+ recordBuilder.set(field.getName(), null);
+ }
+ return;
+ }
+
+ // HandleTimestampTZ
+ if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) {
+ OffsetDateTime timestamp = resultSet.getObject(columnIndex, OffsetDateTime.class);
+ if (timestamp != null) {
+ recordBuilder.setTimestamp(field.getName(), timestamp.atZoneSameInstant(ZoneId.of("UTC")));
+ } else {
+ recordBuilder.set(field.getName(), null);
+ }
+ return;
+ }
+
+ // HandleNumeric
+ int columnType = metadata.getColumnType(columnIndex);
+ if (columnType == Types.NUMERIC) {
+ Schema nonNullableSchema = field.getSchema().isNullable() ?
+ field.getSchema().getNonNullable() : field.getSchema();
+ int precision = metadata.getPrecision(columnIndex);
+ if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) {
+ // When output schema is set to String for precision less numbers
+ recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
+ } else if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) {
+ BigDecimal originalDecimalValue = resultSet.getBigDecimal(columnIndex);
+ if (originalDecimalValue != null) {
+ BigDecimal newDecimalValue = new BigDecimal(originalDecimalValue.toPlainString())
+ .setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN);
+ recordBuilder.setDecimal(field.getName(), newDecimalValue);
+ }
+ }
+ return;
+ }
+ setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
+ }
+
+ private void setZonedDateTimeBasedOnOutputSchema(StructuredRecord.Builder recordBuilder,
+ Schema.LogicalType logicalType,
+ String fieldName,
+ ZonedDateTime zonedDateTime) {
+ if (Schema.LogicalType.DATETIME.equals(logicalType)) {
+ recordBuilder.setDateTime(fieldName, zonedDateTime.toLocalDateTime());
+ } else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(logicalType)) {
+ recordBuilder.setTimestamp(fieldName, zonedDateTime);
+ }
+ }
+
+ private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
+ String columnTypeName = metadata.getColumnTypeName(columnIndex);
+ // If the column Type Name is present in the String mapped Redshift types then return true.
+ return RedshiftSchemaReader.STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(columnTypeName);
+ }
+
+ @Override
+ protected SchemaReader getSchemaReader() {
+ return new RedshiftSchemaReader();
+ }
+
+}
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java
new file mode 100644
index 000000000..df9938a45
--- /dev/null
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.db.CommonSchemaReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Redshift Schema Reader class
+ */
+public class RedshiftSchemaReader extends CommonSchemaReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedshiftSchemaReader.class);
+
+ public static final Set STRING_MAPPED_REDSHIFT_TYPES_NAMES = ImmutableSet.of(
+ "timetz", "money"
+ );
+
+ private final String sessionID;
+
+ public RedshiftSchemaReader() {
+ this(null);
+ }
+
+ public RedshiftSchemaReader(String sessionID) {
+ super();
+ this.sessionID = sessionID;
+ }
+
+ @Override
+ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
+ String typeName = metadata.getColumnTypeName(index);
+ int columnType = metadata.getColumnType(index);
+
+ if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
+ return Schema.of(Schema.Type.STRING);
+ }
+ if (typeName.equalsIgnoreCase("INT")) {
+ return Schema.of(Schema.Type.INT);
+ }
+ if (typeName.equalsIgnoreCase("BIGINT")) {
+ return Schema.of(Schema.Type.LONG);
+ }
+
+ // If it is a numeric type without precision then use the Schema of String to avoid any precision loss
+ if (Types.NUMERIC == columnType) {
+ int precision = metadata.getPrecision(index);
+ if (precision == 0) {
+ LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
+ + "converting into STRING type to avoid any precision loss.",
+ metadata.getColumnName(index),
+ metadata.getColumnTypeName(index)));
+ return Schema.of(Schema.Type.STRING);
+ }
+ }
+
+ if (typeName.equalsIgnoreCase("timestamp")) {
+ return Schema.of(Schema.LogicalType.DATETIME);
+ }
+
+ return super.getSchema(metadata, index);
+ }
+
+ @Override
+ public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
+ if (sessionID == null) {
+ return false;
+ }
+ return metadata.getColumnName(index).equals("c_" + sessionID) ||
+ metadata.getColumnName(index).equals("sqn_" + sessionID);
+ }
+
+ @Override
+ public List getSchemaFields(ResultSet resultSet) throws SQLException {
+ List schemaFields = Lists.newArrayList();
+ ResultSetMetaData metadata = resultSet.getMetaData();
+ // ResultSetMetadata columns are numbered starting with 1
+ for (int i = 1; i <= metadata.getColumnCount(); i++) {
+ if (shouldIgnoreColumn(metadata, i)) {
+ continue;
+ }
+ String columnName = metadata.getColumnName(i);
+ Schema columnSchema = getSchema(metadata, i);
+ // Setting up schema as nullable as cdata driver doesn't provide proper information about isNullable.
+ columnSchema = Schema.nullableOf(columnSchema);
+ Schema.Field field = Schema.Field.of(columnName, columnSchema);
+ schemaFields.add(field);
+ }
+ return schemaFields;
+ }
+
+}
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java
new file mode 100644
index 000000000..1b5894de9
--- /dev/null
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Metadata;
+import io.cdap.cdap.api.annotation.MetadataProperty;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import io.cdap.cdap.etl.api.connector.Connector;
+import io.cdap.plugin.common.Asset;
+import io.cdap.plugin.common.ConfigUtil;
+import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.db.SchemaReader;
+import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
+import io.cdap.plugin.db.source.AbstractDBSource;
+import io.cdap.plugin.util.DBUtils;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Batch source to read from an Amazon Redshift database.
+ */
+@Plugin(type = BatchSource.PLUGIN_TYPE)
+@Name(RedshiftConstants.PLUGIN_NAME)
+@Description(
+ "Reads from a Amazon Redshift database table(s) using a configurable SQL query."
+ + " Outputs one record for each row returned by the query.")
+@Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = RedshiftConnector.NAME)})
+public class RedshiftSource
+ extends AbstractDBSource {
+
+ private final RedshiftSourceConfig redshiftSourceConfig;
+
+ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
+ super(redshiftSourceConfig);
+ this.redshiftSourceConfig = redshiftSourceConfig;
+ }
+
+ @Override
+ protected SchemaReader getSchemaReader() {
+ return new RedshiftSchemaReader();
+ }
+
+ @Override
+ protected Class extends DBWritable> getDBRecordType() {
+ return RedshiftDBRecord.class;
+ }
+
+ @Override
+ protected String createConnectionString() {
+ return String.format(
+ RedshiftConstants.REDSHIFT_CONNECTION_STRING_FORMAT,
+ redshiftSourceConfig.connection.getHost(),
+ redshiftSourceConfig.connection.getPort(),
+ redshiftSourceConfig.connection.getDatabase());
+ }
+
+ @Override
+ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
+ String fqn = DBUtils.constructFQN("redshift", redshiftSourceConfig.getConnection().getHost(),
+ redshiftSourceConfig.getConnection().getPort(),
+ redshiftSourceConfig.getConnection().getDatabase(),
+ redshiftSourceConfig.getReferenceName());
+ Asset.Builder assetBuilder = Asset.builder(redshiftSourceConfig.getReferenceName()).setFqn(fqn);
+ return new LineageRecorder(context, assetBuilder.build());
+ }
+
+ /**
+ * Redshift source config.
+ */
+ public static class RedshiftSourceConfig extends AbstractDBSpecificSourceConfig {
+
+ @Name(ConfigUtil.NAME_USE_CONNECTION)
+ @Nullable
+ @Description("Whether to use an existing connection.")
+ private Boolean useConnection;
+
+ @Name(ConfigUtil.NAME_CONNECTION)
+ @Macro
+ @Nullable
+ @Description("The existing connection to use.")
+ private RedshiftConnectorConfig connection;
+
+ @Override
+ public Map getDBSpecificArguments() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Integer getFetchSize() {
+ Integer fetchSize = super.getFetchSize();
+ return fetchSize == null ? Integer.parseInt(DEFAULT_FETCH_SIZE) : fetchSize;
+ }
+
+ @Override
+ protected RedshiftConnectorConfig getConnection() {
+ return connection;
+ }
+
+ @Override
+ public void validate(FailureCollector collector) {
+ ConfigUtil.validateConnection(this, useConnection, connection, collector);
+ super.validate(collector);
+ }
+ }
+}
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorTest.java
new file mode 100644
index 000000000..a43eb4302
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorTest.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.plugin.db.connector.DBSpecificConnectorBaseTest;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Unit tests for {@link RedshiftConnector}
+ */
+public class RedshiftConnectorTest extends DBSpecificConnectorBaseTest {
+
+ private static final String JDBC_DRIVER_CLASS_NAME = "com.amazon.redshift.Driver";
+
+ @Test
+ public void test() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
+ test(new RedshiftConnector(
+ new RedshiftConnectorConfig(username, password, JDBC_PLUGIN_NAME, connectionArguments, host, database,
+ port)),
+ JDBC_DRIVER_CLASS_NAME, RedshiftConstants.PLUGIN_NAME);
+ }
+}
+
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
new file mode 100644
index 000000000..39579cb60
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link RedshiftConnector}
+ */
+public class RedshiftConnectorUnitTest {
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ private static final RedshiftConnector CONNECTOR = new RedshiftConnector(null);
+
+ /**
+ * Unit test for getTableName()
+ */
+ @Test
+ public void getTableNameTest() {
+ Assert.assertEquals("\"schema\".\"table\"",
+ CONNECTOR.getTableName("db", "schema", "table"));
+ }
+
+ /**
+ * Unit tests for getTableQuery()
+ */
+ @Test
+ public void getTableQueryTest() {
+ String tableName = CONNECTOR.getTableName("db", "schema", "table");
+
+ // random query
+ Assert.assertEquals(String.format("SELECT * FROM %s\n" +
+ "TABLESAMPLE BERNOULLI (100.0 * %d / (SELECT COUNT(*) FROM %s))",
+ tableName, 100, tableName),
+ CONNECTOR.getRandomQuery(tableName, 100));
+ }
+}
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecordUnitTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecordUnitTest.java
new file mode 100644
index 000000000..4d11004e4
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecordUnitTest.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.util.DBUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit Test class for the PostgresDBRecord
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class RedshiftDBRecordUnitTest {
+
+ private static final int DEFAULT_PRECISION = 38;
+
+ /**
+ * Validate the precision less Numbers handling against following use cases.
+ * 1. Ensure that the numeric type with [p,s] set as [38,4] detect as BigDecimal(38,4) in cdap.
+ * 2. Ensure that the numeric type without [p,s] detect as String type in cdap.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void validatePrecisionLessDecimalParsing() throws Exception {
+ Schema.Field field1 = Schema.Field.of("ID1", Schema.decimalOf(DEFAULT_PRECISION, 4));
+ Schema.Field field2 = Schema.Field.of("ID2", Schema.of(Schema.Type.STRING));
+
+ Schema schema = Schema.recordOf(
+ "dbRecord",
+ field1,
+ field2
+ );
+
+ ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
+ Mockito.when(resultSetMetaData.getColumnType(Mockito.eq(1))).thenReturn(Types.NUMERIC);
+ Mockito.when(resultSetMetaData.getPrecision(Mockito.eq(1))).thenReturn(DEFAULT_PRECISION);
+ Mockito.when(resultSetMetaData.getColumnType(eq(2))).thenReturn(Types.NUMERIC);
+ when(resultSetMetaData.getPrecision(eq(2))).thenReturn(0);
+
+ ResultSet resultSet = Mockito.mock(ResultSet.class);
+
+ when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+ when(resultSet.getBigDecimal(eq(1))).thenReturn(BigDecimal.valueOf(123.4568));
+ when(resultSet.getString(eq(2))).thenReturn("123.4568");
+
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+ RedshiftDBRecord dbRecord = new RedshiftDBRecord();
+ dbRecord.handleField(resultSet, builder, field1, 1, Types.NUMERIC, DEFAULT_PRECISION, 4);
+ dbRecord.handleField(resultSet, builder, field2, 2, Types.NUMERIC, 0, -127);
+
+ StructuredRecord record = builder.build();
+ Assert.assertTrue(record.getDecimal("ID1") instanceof BigDecimal);
+ Assert.assertEquals(record.getDecimal("ID1"), BigDecimal.valueOf(123.4568));
+ Assert.assertTrue(record.get("ID2") instanceof String);
+ Assert.assertEquals(record.get("ID2"), "123.4568");
+ }
+
+ @Test
+ public void validateTimestampType() throws SQLException {
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
+ ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
+ when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamp");
+
+ ResultSet resultSet = Mockito.mock(ResultSet.class);
+ when(resultSet.getMetaData()).thenReturn(metaData);
+ when(resultSet.getTimestamp(eq(0), eq(DBUtils.PURE_GREGORIAN_CALENDAR)))
+ .thenReturn(Timestamp.from(offsetDateTime.toInstant()));
+
+ Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.DATETIME));
+ Schema schema = Schema.recordOf(
+ "dbRecord",
+ field1
+ );
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+
+ RedshiftDBRecord dbRecord = new RedshiftDBRecord();
+ dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
+ StructuredRecord record = builder.build();
+ Assert.assertNotNull(record);
+ Assert.assertNotNull(record.getDateTime("field1"));
+ Assert.assertEquals(record.getDateTime("field1").toInstant(ZoneOffset.UTC), offsetDateTime.toInstant());
+
+ // Validate backward compatibility
+
+ field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
+ schema = Schema.recordOf(
+ "dbRecord",
+ field1
+ );
+ builder = StructuredRecord.builder(schema);
+ dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
+ record = builder.build();
+ Assert.assertNotNull(record);
+ Assert.assertNotNull(record.getTimestamp("field1"));
+ Assert.assertEquals(record.getTimestamp("field1").toInstant(), offsetDateTime.toInstant());
+ }
+
+ @Test
+ public void validateTimestampTZType() throws SQLException {
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
+ ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
+ when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamptz");
+
+ ResultSet resultSet = Mockito.mock(ResultSet.class);
+ when(resultSet.getMetaData()).thenReturn(metaData);
+ when(resultSet.getObject(eq(0), eq(OffsetDateTime.class))).thenReturn(offsetDateTime);
+
+ Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
+ Schema schema = Schema.recordOf(
+ "dbRecord",
+ field1
+ );
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+
+ RedshiftDBRecord dbRecord = new RedshiftDBRecord();
+ dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
+ StructuredRecord record = builder.build();
+ Assert.assertNotNull(record);
+ Assert.assertNotNull(record.getTimestamp("field1", ZoneId.of("UTC")));
+ Assert.assertEquals(record.getTimestamp("field1", ZoneId.of("UTC")).toInstant(), offsetDateTime.toInstant());
+ }
+}
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java
new file mode 100644
index 000000000..2d21c4478
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.plugin.db.connector.DBSpecificFailedConnectionTest;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest {
+ private static final String JDBC_DRIVER_CLASS_NAME = "com.amazon.redshift.Driver";
+
+ @Test
+ public void test() throws ClassNotFoundException, IOException {
+
+ RedshiftConnector connector = new RedshiftConnector(
+ new RedshiftConnectorConfig("username", "password", "jdbc", "", "localhost", "db", 5432));
+
+ super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string: " +
+ "jdbc:redshift://localhost:5432/db and arguments: " +
+ "{user=username}. Error: ConnectException: Connection refused " +
+ "(Connection refused).");
+ }
+}
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftPluginTestBase.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftPluginTestBase.java
new file mode 100644
index 000000000..5df4fb300
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftPluginTestBase.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+import io.cdap.cdap.api.artifact.ArtifactSummary;
+import io.cdap.cdap.api.plugin.PluginClass;
+import io.cdap.cdap.datapipeline.DataPipelineApp;
+import io.cdap.cdap.proto.id.ArtifactId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.DBRecord;
+import io.cdap.plugin.db.batch.DatabasePluginTestBase;
+import io.cdap.plugin.db.sink.ETLDBOutputFormat;
+import io.cdap.plugin.db.source.DataDrivenETLDBInputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * Base test class for Redshift plugins.
+ */
+public abstract class RedshiftPluginTestBase extends DatabasePluginTestBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftPluginTestBase.class);
+ protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "3.2.0");
+ protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0");
+ protected static final long CURRENT_TS = System.currentTimeMillis();
+
+ protected static final String JDBC_DRIVER_NAME = "redshift";
+ protected static final Map BASE_PROPS = new HashMap<>();
+
+ protected static String connectionUrl;
+ protected static int year;
+ protected static final int PRECISION = 10;
+ protected static final int SCALE = 6;
+ private static int startCount;
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+ if (startCount++ > 0) {
+ return;
+ }
+
+ getProperties();
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date(CURRENT_TS));
+ year = calendar.get(Calendar.YEAR);
+
+ setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
+
+ addPluginArtifact(NamespaceId.DEFAULT.artifact(JDBC_DRIVER_NAME, "1.0.0"),
+ DATAPIPELINE_ARTIFACT_ID,
+ RedshiftSource.class, DBRecord.class,
+ ETLDBOutputFormat.class, DataDrivenETLDBInputFormat.class, DBRecord.class);
+
+ // add mysql 3rd party plugin
+ PluginClass mysqlDriver = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME,
+ "redshift driver class", Driver.class.getName(),
+ null, Collections.emptyMap());
+ addPluginArtifact(NamespaceId.DEFAULT.artifact("redshift-jdbc-connector", "1.0.0"),
+ DATAPIPELINE_ARTIFACT_ID,
+ Sets.newHashSet(mysqlDriver), Driver.class);
+
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+
+ connectionUrl = "jdbc:redshift://" + BASE_PROPS.get(ConnectionConfig.HOST) + ":" +
+ BASE_PROPS.get(ConnectionConfig.PORT) + "/" + BASE_PROPS.get(ConnectionConfig.DATABASE);
+ Connection conn = createConnection();
+ createTestTables(conn);
+ prepareTestData(conn);
+ }
+
+ private static void getProperties() {
+ BASE_PROPS.put(ConnectionConfig.HOST, getPropertyOrSkip("redshift.clusterEndpoint"));
+ BASE_PROPS.put(ConnectionConfig.PORT, getPropertyOrSkip("redshift.port"));
+ BASE_PROPS.put(ConnectionConfig.DATABASE, getPropertyOrSkip("redshift.database"));
+ BASE_PROPS.put(ConnectionConfig.USER, getPropertyOrSkip("redshift.username"));
+ BASE_PROPS.put(ConnectionConfig.PASSWORD, getPropertyOrSkip("redshift.password"));
+ BASE_PROPS.put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME);
+ }
+
+ protected static void createTestTables(Connection conn) throws SQLException {
+ try (Statement stmt = conn.createStatement()) {
+ // create a table that the action will truncate at the end of the run
+ stmt.execute("CREATE TABLE \"dbActionTest\" (x int, day varchar(10))");
+ // create a table that the action will truncate at the end of the run
+ stmt.execute("CREATE TABLE \"postActionTest\" (x int, day varchar(10))");
+
+ stmt.execute("CREATE TABLE my_table" +
+ "(" +
+ "\"ID\" INT NOT NULL," +
+ "\"NAME\" VARCHAR(40) NOT NULL," +
+ "\"SCORE\" REAL," +
+ "\"GRADUATED\" BOOLEAN," +
+ "\"NOT_IMPORTED\" VARCHAR(30)," +
+ "\"SMALLINT_COL\" SMALLINT," +
+ "\"BIG\" BIGINT," +
+ "\"NUMERIC_COL\" NUMERIC(" + PRECISION + "," + SCALE + ")," +
+ "\"DECIMAL_COL\" DECIMAL(" + PRECISION + "," + SCALE + ")," +
+ "\"DOUBLE_PREC_COL\" DOUBLE PRECISION," +
+ "\"DATE_COL\" DATE," +
+ "\"TIME_COL\" TIME," +
+ "\"TIMESTAMP_COL\" TIMESTAMP(3)," +
+ "\"TEXT_COL\" TEXT," +
+ "\"CHAR_COL\" CHAR(100)," +
+ "\"BYTEA_COL\" BYTEA" +
+ ")");
+ stmt.execute("CREATE TABLE \"MY_DEST_TABLE\" AS " +
+ "SELECT * FROM my_table");
+ stmt.execute("CREATE TABLE your_table AS " +
+ "SELECT * FROM my_table");
+ }
+ }
+
+ protected static void prepareTestData(Connection conn) throws SQLException {
+ try (
+ Statement stmt = conn.createStatement();
+ PreparedStatement pStmt1 =
+ conn.prepareStatement("INSERT INTO my_table " +
+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
+ " ?, ?, ?, ?, ?, ?)");
+ PreparedStatement pStmt2 =
+ conn.prepareStatement("INSERT INTO your_table " +
+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
+ " ?, ?, ?, ?, ?, ?)")) {
+
+ stmt.execute("insert into \"dbActionTest\" values (1, '1970-01-01')");
+ stmt.execute("insert into \"postActionTest\" values (1, '1970-01-01')");
+
+ populateData(pStmt1, pStmt2);
+ }
+ }
+
+ private static void populateData(PreparedStatement... stmts) throws SQLException {
+ // insert the same data into both tables: my_table and your_table
+ for (PreparedStatement pStmt : stmts) {
+ for (int i = 1; i <= 5; i++) {
+ String name = "user" + i;
+ pStmt.setInt(1, i);
+ pStmt.setString(2, name);
+ pStmt.setDouble(3, 123.45 + i);
+ pStmt.setBoolean(4, (i % 2 == 0));
+ pStmt.setString(5, "random" + i);
+ pStmt.setShort(6, (short) i);
+ pStmt.setLong(7, (long) i);
+ pStmt.setBigDecimal(8, new BigDecimal("123.45").add(new BigDecimal(i)));
+ pStmt.setBigDecimal(9, new BigDecimal("123.45").add(new BigDecimal(i)));
+ pStmt.setDouble(10, 123.45 + i);
+ pStmt.setDate(11, new Date(CURRENT_TS));
+ pStmt.setTime(12, new Time(CURRENT_TS));
+ pStmt.setTimestamp(13, new Timestamp(CURRENT_TS));
+ pStmt.setString(14, name);
+ pStmt.setString(15, "char" + i);
+ pStmt.setBytes(16, name.getBytes(Charsets.UTF_8));
+ pStmt.executeUpdate();
+ }
+ }
+ }
+
+ public static Connection createConnection() {
+ try {
+ Class.forName(Driver.class.getCanonicalName());
+ return DriverManager.getConnection(connectionUrl, BASE_PROPS.get(ConnectionConfig.USER),
+ BASE_PROPS.get(ConnectionConfig.PASSWORD));
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @AfterClass
+ public static void tearDownDB() {
+ try (Connection conn = createConnection();
+ Statement stmt = conn.createStatement()) {
+ executeCleanup(Arrays.asList(() -> stmt.execute("DROP TABLE my_table"),
+ () -> stmt.execute("DROP TABLE your_table"),
+ () -> stmt.execute("DROP TABLE postActionTest"),
+ () -> stmt.execute("DROP TABLE dbActionTest"),
+ () -> stmt.execute("DROP TABLE MY_DEST_TABLE")), LOGGER);
+ } catch (Exception e) {
+ LOGGER.warn("Fail to tear down.", e);
+ }
+ }
+}
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftPluginTestSuite.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftPluginTestSuite.java
new file mode 100644
index 000000000..95ad0938b
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftPluginTestSuite.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.cdap.common.test.TestSuite;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * This is a test suite that runs all the tests for Redshift plugins.
+ */
+@RunWith(TestSuite.class)
+@Suite.SuiteClasses({
+ RedshiftSourceTestRun.class,
+})
+public class RedshiftPluginTestSuite extends RedshiftPluginTestBase {
+}
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSourceTestRun.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSourceTestRun.java
new file mode 100644
index 000000000..1ac41bcd0
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSourceTestRun.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.api.common.Bytes;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.dataset.table.Table;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.cdap.test.DataSetManager;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.DBConfig;
+import io.cdap.plugin.db.source.AbstractDBSource;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for Redshift source plugin.
+ */
+public class RedshiftSourceTestRun extends RedshiftPluginTestBase {
+
+ @Test
+ @SuppressWarnings("ConstantConditions")
+ public void testDBMacroSupport() throws Exception {
+ String importQuery = "SELECT * FROM my_table WHERE \"DATE_COL\" <= '${logicalStartTime(yyyy-MM-dd,1d)}' " +
+ "AND $CONDITIONS";
+ String boundingQuery = "SELECT MIN(ID),MAX(ID) from my_table";
+ String splitBy = "ID";
+
+ ImmutableMap sourceProps = ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBTestSource").build();
+
+ ETLPlugin sourceConfig = new ETLPlugin(
+ RedshiftConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ sourceProps
+ );
+
+ ETLPlugin sinkConfig = MockSink.getPlugin("macroOutputTable");
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig,
+ DATAPIPELINE_ARTIFACT, "testDBMacro");
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS)));
+
+ DataSetManager outputManager = getDataset("macroOutputTable");
+ Assert.assertTrue(MockSink.readOutput(outputManager).isEmpty());
+ }
+
+ @Test
+ @SuppressWarnings("ConstantConditions")
+ public void testDBSource() throws Exception {
+ String importQuery = "SELECT \"ID\", \"NAME\", \"SCORE\", \"GRADUATED\", \"SMALLINT_COL\", \"BIG\", " +
+ "\"NUMERIC_COL\", \"CHAR_COL\", \"DECIMAL_COL\", \"BYTEA_COL\", \"DATE_COL\", \"TIME_COL\", \"TIMESTAMP_COL\", " +
+ "\"TEXT_COL\", \"DOUBLE_PREC_COL\" FROM my_table " +
+ "WHERE \"ID\" < 3 AND $CONDITIONS";
+ String boundingQuery = "SELECT MIN(\"ID\"),MAX(\"ID\") from my_table";
+ String splitBy = "ID";
+ ETLPlugin sourceConfig = new ETLPlugin(
+ RedshiftConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBSourceTest")
+ .build(),
+ null
+ );
+
+ String outputDatasetName = "output-dbsourcetest";
+ ETLPlugin sinkConfig = MockSink.getPlugin(outputDatasetName);
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig,
+ DATAPIPELINE_ARTIFACT, "testDBSource");
+ runETLOnce(appManager);
+
+ DataSetManager outputManager = getDataset(outputDatasetName);
+ List outputRecords = MockSink.readOutput(outputManager);
+
+ Assert.assertEquals(2, outputRecords.size());
+ String userid = outputRecords.get(0).get("NAME");
+ StructuredRecord row1 = "user1".equals(userid) ? outputRecords.get(0) : outputRecords.get(1);
+ StructuredRecord row2 = "user1".equals(userid) ? outputRecords.get(1) : outputRecords.get(0);
+
+ // Verify data
+ Assert.assertEquals("user1", row1.get("NAME"));
+ Assert.assertEquals("user2", row2.get("NAME"));
+ Assert.assertEquals("user1", row1.get("TEXT_COL"));
+ Assert.assertEquals("user2", row2.get("TEXT_COL"));
+ Assert.assertEquals("char1", ((String) row1.get("CHAR_COL")).trim());
+ Assert.assertEquals("char2", ((String) row2.get("CHAR_COL")).trim());
+ Assert.assertEquals(124.45f, ((Float) row1.get("SCORE")).doubleValue(), 0.000001);
+ Assert.assertEquals(125.45f, ((Float) row2.get("SCORE")).doubleValue(), 0.000001);
+ Assert.assertEquals(false, row1.get("GRADUATED"));
+ Assert.assertEquals(true, row2.get("GRADUATED"));
+ Assert.assertNull(row1.get("NOT_IMPORTED"));
+ Assert.assertNull(row2.get("NOT_IMPORTED"));
+
+ Assert.assertEquals(1, (int) row1.get("SMALLINT_COL"));
+ Assert.assertEquals(2, (int) row2.get("SMALLINT_COL"));
+ Assert.assertEquals(1, (long) row1.get("BIG"));
+ Assert.assertEquals(2, (long) row2.get("BIG"));
+
+ Assert.assertEquals(new BigDecimal("124.45", new MathContext(PRECISION)).setScale(SCALE),
+ row1.getDecimal("NUMERIC_COL"));
+ Assert.assertEquals(new BigDecimal("125.45", new MathContext(PRECISION)).setScale(SCALE),
+ row2.getDecimal("NUMERIC_COL"));
+ Assert.assertEquals(new BigDecimal("124.45", new MathContext(PRECISION)).setScale(SCALE),
+ row1.getDecimal("DECIMAL_COL"));
+
+ Assert.assertEquals(124.45, (double) row1.get("DOUBLE_PREC_COL"), 0.000001);
+ Assert.assertEquals(125.45, (double) row2.get("DOUBLE_PREC_COL"), 0.000001);
+ // Verify time columns
+ java.util.Date date = new java.util.Date(CURRENT_TS);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ LocalDate expectedDate = Date.valueOf(sdf.format(date)).toLocalDate();
+ sdf = new SimpleDateFormat("H:mm:ss");
+ LocalTime expectedTime = Time.valueOf(sdf.format(date)).toLocalTime();
+ ZonedDateTime expectedTs = date.toInstant().atZone(ZoneId.ofOffset("UTC", ZoneOffset.UTC));
+ Assert.assertEquals(expectedDate, row1.getDate("DATE_COL"));
+ Assert.assertEquals(expectedTime, row1.getTime("TIME_COL"));
+ Assert.assertEquals(expectedTs, row1.getTimestamp("TIMESTAMP_COL", ZoneId.ofOffset("UTC", ZoneOffset.UTC)));
+
+ // verify binary columns
+ Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("BYTEA_COL")).array(), 0, 5));
+ Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("BYTEA_COL")).array(), 0, 5));
+ }
+
+ @Test
+ public void testDbSourceMultipleTables() throws Exception {
+ String importQuery = "SELECT \"my_table\".\"ID\", \"your_table\".\"NAME\" FROM \"my_table\", \"your_table\"" +
+ "WHERE \"my_table\".\"ID\" < 3 and \"my_table\".\"ID\" = \"your_table\".\"ID\" and $CONDITIONS";
+ String boundingQuery = "SELECT MIN(MIN(\"my_table\".\"ID\"), MIN(\"your_table\".\"ID\")), " +
+ "MAX(MAX(\"my_table\".\"ID\"), MAX(\"your_table\".\"ID\"))";
+ String splitBy = "\"my_table\".\"ID\"";
+ ETLPlugin sourceConfig = new ETLPlugin(
+ RedshiftConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBMultipleTest")
+ .build(),
+ null
+ );
+
+ String outputDatasetName = "output-multitabletest";
+ ETLPlugin sinkConfig = MockSink.getPlugin(outputDatasetName);
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig,
+ DATAPIPELINE_ARTIFACT, "testDBSourceWithMultipleTables");
+ runETLOnce(appManager);
+
+ // records should be written
+ DataSetManager outputManager = getDataset(outputDatasetName);
+ List outputRecords = MockSink.readOutput(outputManager);
+ Assert.assertEquals(2, outputRecords.size());
+ String userid = outputRecords.get(0).get("NAME");
+ StructuredRecord row1 = "user1".equals(userid) ? outputRecords.get(0) : outputRecords.get(1);
+ StructuredRecord row2 = "user1".equals(userid) ? outputRecords.get(1) : outputRecords.get(0);
+ // Verify data
+ Assert.assertEquals("user1", row1.get("NAME"));
+ Assert.assertEquals("user2", row2.get("NAME"));
+ Assert.assertEquals(1, row1.get("ID").intValue());
+ Assert.assertEquals(2, row2.get("ID").intValue());
+ }
+
+ @Test
+ public void testUserNamePasswordCombinations() throws Exception {
+ String importQuery = "SELECT * FROM \"my_table\" WHERE $CONDITIONS";
+ String boundingQuery = "SELECT MIN(\"ID\"),MAX(\"ID\") from \"my_table\"";
+ String splitBy = "\"ID\"";
+
+ ETLPlugin sinkConfig = MockSink.getPlugin("outputTable");
+
+ Map baseSourceProps = ImmutableMap.builder()
+ .put(ConnectionConfig.HOST, BASE_PROPS.get(ConnectionConfig.HOST))
+ .put(ConnectionConfig.PORT, BASE_PROPS.get(ConnectionConfig.PORT))
+ .put(ConnectionConfig.DATABASE, BASE_PROPS.get(ConnectionConfig.DATABASE))
+ .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME)
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "UserPassDBTest")
+ .build();
+
+ ApplicationId appId = NamespaceId.DEFAULT.app("dbTest");
+
+ // null user name, null password. Should succeed.
+ // as source
+ ETLPlugin dbConfig = new ETLPlugin(RedshiftConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE,
+ baseSourceProps, null);
+ ETLStage table = new ETLStage("uniqueTableSink", sinkConfig);
+ ETLStage database = new ETLStage("databaseSource", dbConfig);
+ ETLBatchConfig etlConfig = ETLBatchConfig.builder()
+ .addStage(database)
+ .addStage(table)
+ .addConnection(database.getName(), table.getName())
+ .build();
+ AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlConfig);
+ deployApplication(appId, appRequest);
+
+ // null user name, non-null password. Should fail.
+ // as source
+ Map noUser = new HashMap<>(baseSourceProps);
+ noUser.put(DBConfig.PASSWORD, "password");
+ database = new ETLStage("databaseSource", new ETLPlugin(RedshiftConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE, noUser, null));
+ etlConfig = ETLBatchConfig.builder()
+ .addStage(database)
+ .addStage(table)
+ .addConnection(database.getName(), table.getName())
+ .build();
+ assertDeploymentFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
+ "Deploying DB Source with null username but non-null password should have failed.");
+
+ // non-null username, non-null, but empty password. Should succeed.
+ // as source
+ Map emptyPassword = new HashMap<>(baseSourceProps);
+ emptyPassword.put(DBConfig.USER, "root");
+ emptyPassword.put(DBConfig.PASSWORD, "");
+ database = new ETLStage("databaseSource", new ETLPlugin(RedshiftConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE, emptyPassword, null));
+ etlConfig = ETLBatchConfig.builder()
+ .addStage(database)
+ .addStage(table)
+ .addConnection(database.getName(), table.getName())
+ .build();
+ appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlConfig);
+ deployApplication(appId, appRequest);
+ }
+
+ @Test
+ public void testNonExistentDBTable() throws Exception {
+ // source
+ String importQuery = "SELECT \"ID\", \"NAME\" FROM \"dummy\" WHERE ID < 3 AND $CONDITIONS";
+ String boundingQuery = "SELECT MIN(\"ID\"),MAX(\"ID\") FROM \"dummy\"";
+ String splitBy = "\"ID\"";
+ ETLPlugin sinkConfig = MockSink.getPlugin("table");
+ ETLPlugin sourceBadNameConfig = new ETLPlugin(
+ RedshiftConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBNonExistentTest")
+ .build(),
+ null);
+ ETLStage sink = new ETLStage("sink", sinkConfig);
+ ETLStage sourceBadName = new ETLStage("sourceBadName", sourceBadNameConfig);
+
+ ETLBatchConfig etlConfig = ETLBatchConfig.builder()
+ .addStage(sourceBadName)
+ .addStage(sink)
+ .addConnection(sourceBadName.getName(), sink.getName())
+ .build();
+ ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
+ assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
+
+ // Bad connection
+ ETLPlugin sourceBadConnConfig = new ETLPlugin(
+ RedshiftConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .put(ConnectionConfig.HOST, BASE_PROPS.get(ConnectionConfig.HOST))
+ .put(ConnectionConfig.PORT, BASE_PROPS.get(ConnectionConfig.PORT))
+ .put(ConnectionConfig.DATABASE, "dumDB")
+ .put(ConnectionConfig.USER, BASE_PROPS.get(ConnectionConfig.USER))
+ .put(ConnectionConfig.PASSWORD, BASE_PROPS.get(ConnectionConfig.PASSWORD))
+ .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME)
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "RedshiftTest")
+ .build(),
+ null);
+ ETLStage sourceBadConn = new ETLStage("sourceBadConn", sourceBadConnConfig);
+ etlConfig = ETLBatchConfig.builder()
+ .addStage(sourceBadConn)
+ .addStage(sink)
+ .addConnection(sourceBadConn.getName(), sink.getName())
+ .build();
+ assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
+ }
+}
diff --git a/amazon-redshift-plugin/widgets/Redshift-batchsource.json b/amazon-redshift-plugin/widgets/Redshift-batchsource.json
new file mode 100644
index 000000000..91e860ee9
--- /dev/null
+++ b/amazon-redshift-plugin/widgets/Redshift-batchsource.json
@@ -0,0 +1,240 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "Redshift",
+ "configuration-groups": [
+ {
+ "label": "Connection",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "Use connection",
+ "name": "useConnection",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "YES"
+ },
+ "off": {
+ "value": "false",
+ "label": "NO"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "connection-select",
+ "label": "Connection",
+ "name": "connection",
+ "widget-attributes": {
+ "connectionType": "Redshift"
+ }
+ },
+ {
+ "widget-type": "plugin-list",
+ "label": "JDBC Driver name",
+ "name": "jdbcPluginName",
+ "widget-attributes": {
+ "plugin-type": "jdbc"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Host",
+ "name": "host",
+ "widget-attributes": {
+ "placeholder": "Redshift endpoint host name."
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Port",
+ "name": "port",
+ "widget-attributes": {
+ "default": "5439"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "user"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Connection Arguments",
+ "name": "connectionArguments",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "key-placeholder": "Key",
+ "value-placeholder": "Value",
+ "kv-delimiter" : "=",
+ "delimiter" : ";"
+ }
+ }
+ ]
+ },
+ {
+ "label": "Basic",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Reference Name",
+ "name": "referenceName",
+ "widget-attributes": {
+ "placeholder": "Name used to identify this source for lineage. Typically, the name of the table/view."
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Database",
+ "name": "database"
+ },
+ {
+ "widget-type": "connection-browser",
+ "widget-category": "plugin",
+ "widget-attributes": {
+ "connectionType": "Redshift",
+ "label": "Browse Database"
+ }
+ }
+ ]
+ },
+ {
+ "label": "SQL Query",
+ "properties": [
+ {
+ "widget-type": "textarea",
+ "label": "Import Query",
+ "name": "importQuery",
+ "widget-attributes": {
+ "rows": "4"
+ }
+ },
+ {
+ "widget-type": "get-schema",
+ "widget-category": "plugin"
+ }
+ ]
+ },
+ {
+ "label": "Advanced",
+ "properties": [
+ {
+ "widget-type": "textarea",
+ "label": "Bounding Query",
+ "name": "boundingQuery",
+ "widget-attributes": {
+ "rows": "4"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Split Column",
+ "name": "splitBy"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Number of Splits",
+ "name": "numSplits",
+ "widget-attributes": {
+ "default": "1"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Fetch Size",
+ "name": "fetchSize",
+ "widget-attributes": {
+ "default": "1000",
+ "minimum": "0"
+ }
+ }
+ ]
+ }
+ ],
+ "outputs": [
+ {
+ "name": "schema",
+ "widget-type": "schema",
+ "widget-attributes": {
+ "schema-types": [
+ "boolean",
+ "int",
+ "long",
+ "float",
+ "double",
+ "bytes",
+ "string"
+ ],
+ "schema-default-type": "string"
+ }
+ }
+ ],
+ "filters": [
+ {
+ "name": "showConnectionProperties ",
+ "condition": {
+ "expression": "useConnection == false"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "jdbcPluginName"
+ },
+ {
+ "type": "property",
+ "name": "instanceType"
+ },
+ {
+ "type": "property",
+ "name": "host"
+ },
+ {
+ "type": "property",
+ "name": "port"
+ },
+ {
+ "type": "property",
+ "name": "user"
+ },
+ {
+ "type": "property",
+ "name": "password"
+ },
+ {
+ "type": "property",
+ "name": "database"
+ },
+ {
+ "type": "property",
+ "name": "connectionArguments"
+ }
+ ]
+ },
+ {
+ "name": "showConnectionId",
+ "condition": {
+ "expression": "useConnection == true"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "connection"
+ }
+ ]
+ },
+ ],
+ "jump-config": {
+ "datasets": [
+ {
+ "ref-property-name": "referenceName"
+ }
+ ]
+ }
+}
diff --git a/amazon-redshift-plugin/widgets/Redshift-connector.json b/amazon-redshift-plugin/widgets/Redshift-connector.json
new file mode 100644
index 000000000..3a2af8e01
--- /dev/null
+++ b/amazon-redshift-plugin/widgets/Redshift-connector.json
@@ -0,0 +1,75 @@
+{
+ "metadata": {
+ "spec-version": "1.0"
+ },
+ "display-name": "Redshift",
+ "configuration-groups": [
+ {
+ "label": "Basic",
+ "properties": [
+ {
+ "widget-type": "plugin-list",
+ "label": "JDBC Driver name",
+ "name": "jdbcPluginName",
+ "widget-attributes": {
+ "plugin-type": "jdbc"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Host",
+ "name": "host",
+ "widget-attributes": {
+ "default": "localhost"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Port",
+ "name": "port",
+ "widget-attributes": {
+ "default": "5439"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Database",
+ "name": "database"
+ }
+ ]
+ },
+ {
+ "label": "Credentials",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "user"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ }
+ ]
+ },
+ {
+ "label": "Advanced",
+ "properties": [
+ {
+ "widget-type": "keyvalue",
+ "label": "Connection Arguments",
+ "name": "connectionArguments",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "key-placeholder": "Key",
+ "value-placeholder": "Value",
+ "kv-delimiter": "=",
+ "delimiter": ";"
+ }
+ }
+ ]
+ }
+ ],
+ "outputs": []
+}
diff --git a/pom.xml b/pom.xml
index 74932ee90..847e157e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@
cloudsql-postgresql-plugin
teradata-plugin
generic-db-argument-setter
+ amazon-redshift-plugin