diff --git a/amazon-redshift-plugin/pom.xml b/amazon-redshift-plugin/pom.xml
index 603ea7837..806f1e700 100644
--- a/amazon-redshift-plugin/pom.xml
+++ b/amazon-redshift-plugin/pom.xml
@@ -26,7 +26,7 @@
Amazon Redshift plugin
amazon-redshift-plugin
4.0.0
-
+
2.1.0.18
@@ -83,6 +83,11 @@
junit
junit
+
+ org.mockito
+ mockito-core
+ test
+
io.cdap.cdap
cdap-api
@@ -109,11 +114,11 @@
<_exportcontents>
- io.cdap.plugin.amazon.redshift.*;
- io.cdap.plugin.db.source.*;
- org.apache.commons.lang;
- org.apache.commons.logging.*;
- org.codehaus.jackson.*
+ io.cdap.plugin.amazon.redshift.*;
+ io.cdap.plugin.db.source.*;
+ org.apache.commons.lang;
+ org.apache.commons.logging.*;
+ org.codehaus.jackson.*
*;inline=false;scope=compile
true
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java
index 96ec94edf..fb8cac4a7 100644
--- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java
@@ -85,6 +85,13 @@ protected String getTableName(String database, String schema, String table) {
return String.format("\"%s\".\"%s\"", schema, table);
}
+ @Override
+ protected String getRandomQuery(String tableName, int limit) {
+ return String.format("SELECT * FROM %s\n" +
+ "TABLESAMPLE BERNOULLI (100.0 * %d / (SELECT COUNT(*) FROM %s))",
+ tableName, limit, tableName);
+ }
+
@Override
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
ConnectorSpec.Builder builder) {
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecord.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecord.java
index f6935bc29..3dfc2376f 100644
--- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecord.java
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecord.java
@@ -64,7 +64,7 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
Schema nonNullableSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
setZonedDateTimeBasedOnOutputSchema(recordBuilder, nonNullableSchema.getLogicalType(),
- field.getName(), zonedDateTime);
+ field.getName(), zonedDateTime);
} else {
recordBuilder.set(field.getName(), null);
}
@@ -105,9 +105,9 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
}
private void setZonedDateTimeBasedOnOutputSchema(StructuredRecord.Builder recordBuilder,
- Schema.LogicalType logicalType,
- String fieldName,
- ZonedDateTime zonedDateTime) {
+ Schema.LogicalType logicalType,
+ String fieldName,
+ ZonedDateTime zonedDateTime) {
if (Schema.LogicalType.DATETIME.equals(logicalType)) {
recordBuilder.setDateTime(fieldName, zonedDateTime.toLocalDateTime());
} else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(logicalType)) {
@@ -118,8 +118,7 @@ private void setZonedDateTimeBasedOnOutputSchema(StructuredRecord.Builder record
private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
String columnTypeName = metadata.getColumnTypeName(columnIndex);
// If the column Type Name is present in the String mapped Redshift types then return true.
- return RedshiftSchemaReader.STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(columnTypeName)
- || RedshiftSchemaReader.STRING_MAPPED_REDSHIFT_TYPES.contains(metadata.getColumnType(columnIndex));
+ return RedshiftSchemaReader.STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(columnTypeName);
}
@Override
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java
index 5df35940e..df9938a45 100644
--- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java
@@ -31,18 +31,14 @@
import java.util.Set;
/**
- * Redshift Schema Reader class
+ * Redshift Schema Reader class
*/
public class RedshiftSchemaReader extends CommonSchemaReader {
private static final Logger LOG = LoggerFactory.getLogger(RedshiftSchemaReader.class);
- public static final Set STRING_MAPPED_REDSHIFT_TYPES = ImmutableSet.of(
- Types.OTHER, Types.ARRAY, Types.SQLXML
- );
-
public static final Set STRING_MAPPED_REDSHIFT_TYPES_NAMES = ImmutableSet.of(
- "timetz", "money"
+ "timetz", "money"
);
private final String sessionID;
@@ -61,7 +57,7 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
String typeName = metadata.getColumnTypeName(index);
int columnType = metadata.getColumnType(index);
- if (STRING_MAPPED_REDSHIFT_TYPES.contains(columnType) || STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
+ if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
return Schema.of(Schema.Type.STRING);
}
if (typeName.equalsIgnoreCase("INT")) {
diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java
index 2bd16a1f3..1b5894de9 100644
--- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java
+++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java
@@ -23,7 +23,6 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.FailureCollector;
-import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
@@ -59,11 +58,6 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
this.redshiftSourceConfig = redshiftSourceConfig;
}
- @Override
- public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
- super.configurePipeline(pipelineConfigurer);
- }
-
@Override
protected SchemaReader getSchemaReader() {
return new RedshiftSchemaReader();
@@ -85,8 +79,7 @@ protected String createConnectionString() {
@Override
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
- String host = redshiftSourceConfig.getConnection().getHost();
- String fqn = DBUtils.constructFQN("redshift", host,
+ String fqn = DBUtils.constructFQN("redshift", redshiftSourceConfig.getConnection().getHost(),
redshiftSourceConfig.getConnection().getPort(),
redshiftSourceConfig.getConnection().getDatabase(),
redshiftSourceConfig.getReferenceName());
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorTest.java
new file mode 100644
index 000000000..e96d8fb13
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorTest.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.plugin.db.connector.DBSpecificConnectorBaseTest;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Unit tests for {@link RedshiftConnector}
+ */
+public class RedshiftConnectorTest extends DBSpecificConnectorBaseTest {
+
+ private static final String JDBC_DRIVER_CLASS_NAME = "org.postgresql.Driver";
+
+ @Test
+ public void test() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
+ test(new RedshiftConnector(
+ new RedshiftConnectorConfig(username, password, JDBC_PLUGIN_NAME, connectionArguments, host, database,
+ port)),
+ JDBC_DRIVER_CLASS_NAME, RedshiftConstants.PLUGIN_NAME);
+ }
+}
+
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
new file mode 100644
index 000000000..39579cb60
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link RedshiftConnector}
+ */
+public class RedshiftConnectorUnitTest {
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ private static final RedshiftConnector CONNECTOR = new RedshiftConnector(null);
+
+ /**
+ * Unit test for getTableName()
+ */
+ @Test
+ public void getTableNameTest() {
+ Assert.assertEquals("\"schema\".\"table\"",
+ CONNECTOR.getTableName("db", "schema", "table"));
+ }
+
+ /**
+ * Unit tests for getTableQuery()
+ */
+ @Test
+ public void getTableQueryTest() {
+ String tableName = CONNECTOR.getTableName("db", "schema", "table");
+
+ // random query
+ Assert.assertEquals(String.format("SELECT * FROM %s\n" +
+ "TABLESAMPLE BERNOULLI (100.0 * %d / (SELECT COUNT(*) FROM %s))",
+ tableName, 100, tableName),
+ CONNECTOR.getRandomQuery(tableName, 100));
+ }
+}
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecordUnitTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecordUnitTest.java
new file mode 100644
index 000000000..4d11004e4
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftDBRecordUnitTest.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.util.DBUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit Test class for the PostgresDBRecord
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class RedshiftDBRecordUnitTest {
+
+ private static final int DEFAULT_PRECISION = 38;
+
+ /**
+ * Validate the precision less Numbers handling against following use cases.
+ * 1. Ensure that the numeric type with [p,s] set as [38,4] detect as BigDecimal(38,4) in cdap.
+ * 2. Ensure that the numeric type without [p,s] detect as String type in cdap.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void validatePrecisionLessDecimalParsing() throws Exception {
+ Schema.Field field1 = Schema.Field.of("ID1", Schema.decimalOf(DEFAULT_PRECISION, 4));
+ Schema.Field field2 = Schema.Field.of("ID2", Schema.of(Schema.Type.STRING));
+
+ Schema schema = Schema.recordOf(
+ "dbRecord",
+ field1,
+ field2
+ );
+
+ ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
+ Mockito.when(resultSetMetaData.getColumnType(Mockito.eq(1))).thenReturn(Types.NUMERIC);
+ Mockito.when(resultSetMetaData.getPrecision(Mockito.eq(1))).thenReturn(DEFAULT_PRECISION);
+ Mockito.when(resultSetMetaData.getColumnType(eq(2))).thenReturn(Types.NUMERIC);
+ when(resultSetMetaData.getPrecision(eq(2))).thenReturn(0);
+
+ ResultSet resultSet = Mockito.mock(ResultSet.class);
+
+ when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+ when(resultSet.getBigDecimal(eq(1))).thenReturn(BigDecimal.valueOf(123.4568));
+ when(resultSet.getString(eq(2))).thenReturn("123.4568");
+
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+ RedshiftDBRecord dbRecord = new RedshiftDBRecord();
+ dbRecord.handleField(resultSet, builder, field1, 1, Types.NUMERIC, DEFAULT_PRECISION, 4);
+ dbRecord.handleField(resultSet, builder, field2, 2, Types.NUMERIC, 0, -127);
+
+ StructuredRecord record = builder.build();
+ Assert.assertTrue(record.getDecimal("ID1") instanceof BigDecimal);
+ Assert.assertEquals(record.getDecimal("ID1"), BigDecimal.valueOf(123.4568));
+ Assert.assertTrue(record.get("ID2") instanceof String);
+ Assert.assertEquals(record.get("ID2"), "123.4568");
+ }
+
+ @Test
+ public void validateTimestampType() throws SQLException {
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
+ ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
+ when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamp");
+
+ ResultSet resultSet = Mockito.mock(ResultSet.class);
+ when(resultSet.getMetaData()).thenReturn(metaData);
+ when(resultSet.getTimestamp(eq(0), eq(DBUtils.PURE_GREGORIAN_CALENDAR)))
+ .thenReturn(Timestamp.from(offsetDateTime.toInstant()));
+
+ Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.DATETIME));
+ Schema schema = Schema.recordOf(
+ "dbRecord",
+ field1
+ );
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+
+ RedshiftDBRecord dbRecord = new RedshiftDBRecord();
+ dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
+ StructuredRecord record = builder.build();
+ Assert.assertNotNull(record);
+ Assert.assertNotNull(record.getDateTime("field1"));
+ Assert.assertEquals(record.getDateTime("field1").toInstant(ZoneOffset.UTC), offsetDateTime.toInstant());
+
+ // Validate backward compatibility
+
+ field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
+ schema = Schema.recordOf(
+ "dbRecord",
+ field1
+ );
+ builder = StructuredRecord.builder(schema);
+ dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
+ record = builder.build();
+ Assert.assertNotNull(record);
+ Assert.assertNotNull(record.getTimestamp("field1"));
+ Assert.assertEquals(record.getTimestamp("field1").toInstant(), offsetDateTime.toInstant());
+ }
+
+ @Test
+ public void validateTimestampTZType() throws SQLException {
+ OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
+ ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
+ when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamptz");
+
+ ResultSet resultSet = Mockito.mock(ResultSet.class);
+ when(resultSet.getMetaData()).thenReturn(metaData);
+ when(resultSet.getObject(eq(0), eq(OffsetDateTime.class))).thenReturn(offsetDateTime);
+
+ Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
+ Schema schema = Schema.recordOf(
+ "dbRecord",
+ field1
+ );
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+
+ RedshiftDBRecord dbRecord = new RedshiftDBRecord();
+ dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
+ StructuredRecord record = builder.build();
+ Assert.assertNotNull(record);
+ Assert.assertNotNull(record.getTimestamp("field1", ZoneId.of("UTC")));
+ Assert.assertEquals(record.getTimestamp("field1", ZoneId.of("UTC")).toInstant(), offsetDateTime.toInstant());
+ }
+}
diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java
new file mode 100644
index 000000000..2d21c4478
--- /dev/null
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.amazon.redshift;
+
+import io.cdap.plugin.db.connector.DBSpecificFailedConnectionTest;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest {
+ private static final String JDBC_DRIVER_CLASS_NAME = "com.amazon.redshift.Driver";
+
+ @Test
+ public void test() throws ClassNotFoundException, IOException {
+
+ RedshiftConnector connector = new RedshiftConnector(
+ new RedshiftConnectorConfig("username", "password", "jdbc", "", "localhost", "db", 5432));
+
+ super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string: " +
+ "jdbc:redshift://localhost:5432/db and arguments: " +
+ "{user=username}. Error: ConnectException: Connection refused " +
+ "(Connection refused).");
+ }
+}