diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorConfig.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorConfig.java index f05f26d10..bae0013b3 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorConfig.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorConfig.java @@ -72,16 +72,16 @@ public int getPort() { @Override public String getConnectionString() { - return String.format( - RedshiftConstants.REDSHIFT_CONNECTION_STRING_FORMAT, - host, - getPort(), - database); + return String.format( + RedshiftConstants.REDSHIFT_CONNECTION_STRING_FORMAT, + host, + getPort(), + database); } @Override public boolean canConnect() { return super.canConnect() && !containsMacro(ConnectionConfig.HOST) && - !containsMacro(ConnectionConfig.PORT) && !containsMacro(ConnectionConfig.DATABASE); + !containsMacro(ConnectionConfig.PORT) && !containsMacro(ConnectionConfig.DATABASE); } } diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java index 1b5894de9..6a0df3a2d 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java @@ -16,6 +16,7 @@ package io.cdap.plugin.amazon.redshift; +import com.google.common.annotations.VisibleForTesting; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Metadata; @@ -108,6 +109,13 @@ public Map getDBSpecificArguments() { return Collections.emptyMap(); } + @VisibleForTesting + public RedshiftSourceConfig(@Nullable Boolean useConnection, + @Nullable RedshiftConnectorConfig connection) { + this.useConnection = useConnection; + this.connection = connection; + } + @Override public Integer getFetchSize() { Integer fetchSize = super.getFetchSize(); diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java index 39579cb60..47e8b0a52 100644 --- a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java +++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java @@ -39,6 +39,19 @@ public void getTableNameTest() { CONNECTOR.getTableName("db", "schema", "table")); } + @Test + public void getRandomQuery() { + Assert.assertEquals("SELECT * FROM TestData\n" + + "TABLESAMPLE BERNOULLI (100.0 * 10 / (SELECT COUNT(*) FROM TestData))", + CONNECTOR.getRandomQuery("TestData", 10)); + } + + @Test + public void getDBRecordType() { + Assert.assertEquals("class io.cdap.plugin.amazon.redshift.RedshiftDBRecord", + CONNECTOR.getDBRecordType().toString()); + } + /** * Unit tests for getTableQuery() */ diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReaderTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReaderTest.java new file mode 100644 index 000000000..206b4ae9f --- /dev/null +++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReaderTest.java @@ -0,0 +1,131 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.amazon.redshift; + +import com.google.common.collect.Lists; +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class RedshiftSchemaReaderTest { + + @Test + public void testGetSchema() throws SQLException { + RedshiftSchemaReader schemaReader = new RedshiftSchemaReader(); + + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("timetz"); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.TIMESTAMP); + + Schema schema = schemaReader.getSchema(metadata, 1); + + Assert.assertEquals(Schema.of(Schema.Type.STRING), schema); + } + + @Test + public void testGetSchemaWithIntType() throws SQLException { + RedshiftSchemaReader schemaReader = new RedshiftSchemaReader(); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("INT"); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC); + Schema schema = schemaReader.getSchema(metadata, 1); + + Assert.assertEquals(Schema.of(Schema.Type.INT), schema); + } + + @Test + public void testGetSchemaWithNumericTypeWithPrecision() throws SQLException { + RedshiftSchemaReader schemaReader = new RedshiftSchemaReader(); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("STRING"); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC); + Mockito.when(metadata.getPrecision(1)).thenReturn(0); + + Schema schema = schemaReader.getSchema(metadata, 1); + + Assert.assertEquals(Schema.of(Schema.Type.STRING), schema); + } + + @Test + public void testGetSchemaWithOtherTypes() throws SQLException { + RedshiftSchemaReader schemaReader = new RedshiftSchemaReader(); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("BIGINT"); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.BIGINT); + Schema schema = schemaReader.getSchema(metadata, 1); + + Assert.assertEquals(Schema.of(Schema.Type.LONG), schema); + + Mockito.when(metadata.getColumnTypeName(2)).thenReturn("timestamp"); + Mockito.when(metadata.getColumnType(2)).thenReturn(Types.TIMESTAMP); + + schema = schemaReader.getSchema(metadata, 2); + + Assert.assertEquals(Schema.of(Schema.LogicalType.DATETIME), schema); + } + + @Test + public void testShouldIgnoreColumn() throws SQLException { + RedshiftSchemaReader schemaReader = new RedshiftSchemaReader("sessionID"); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(metadata.getColumnName(1)).thenReturn("c_sessionID"); + Assert.assertTrue(schemaReader.shouldIgnoreColumn(metadata, 1)); + Mockito.when(metadata.getColumnName(2)).thenReturn("sqn_sessionID"); + Assert.assertTrue(schemaReader.shouldIgnoreColumn(metadata, 2)); + Mockito.when(metadata.getColumnName(3)).thenReturn("columnName"); + Assert.assertFalse(schemaReader.shouldIgnoreColumn(metadata, 3)); + } + + @Test + public void testGetSchemaFields() throws SQLException { + RedshiftSchemaReader schemaReader = new RedshiftSchemaReader(); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + + // Mock two columns with different types + Mockito.when(metadata.getColumnCount()).thenReturn(2); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("INT"); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC); + Mockito.when(metadata.getColumnName(1)).thenReturn("column1"); + + Mockito.when(metadata.getColumnTypeName(2)).thenReturn("BIGINT"); + Mockito.when(metadata.getColumnType(2)).thenReturn(Types.BIGINT); + Mockito.when(metadata.getColumnName(2)).thenReturn("column2"); + + List expectedSchemaFields = Lists.newArrayList(); + expectedSchemaFields.add(Schema.Field.of("column1", Schema.nullableOf(Schema.of(Schema.Type.INT)))); + expectedSchemaFields.add(Schema.Field.of("column2", Schema.nullableOf(Schema.of(Schema.Type.LONG)))); + + List actualSchemaFields = schemaReader.getSchemaFields(resultSet); + + Assert.assertEquals(expectedSchemaFields.get(0).getName(), actualSchemaFields.get(0).getName()); + Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName()); + } +} diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSourceTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSourceTest.java new file mode 100644 index 000000000..d09de8f0d --- /dev/null +++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftSourceTest.java @@ -0,0 +1,98 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.amazon.redshift; + +import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.db.SchemaReader; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Map; + +@RunWith(MockitoJUnitRunner.class) +public class RedshiftSourceTest { + + @Test + public void testGetDBSpecificArguments() { + RedshiftConnectorConfig connectorConfig = new RedshiftConnectorConfig("username", "password", + "jdbcPluginName", "connectionArguments", + "host", "database", 1101); + RedshiftSource.RedshiftSourceConfig config = new RedshiftSource.RedshiftSourceConfig(false, connectorConfig); + Map dbSpecificArguments = config.getDBSpecificArguments(); + Assert.assertEquals(0, dbSpecificArguments.size()); + } + + @Test + public void testGetFetchSize() { + RedshiftConnectorConfig connectorConfig = new RedshiftConnectorConfig("username", "password", + "jdbcPluginName", "connectionArguments", + "host", "database", 1101); + RedshiftSource.RedshiftSourceConfig config = new RedshiftSource.RedshiftSourceConfig(false, connectorConfig); + Integer fetchSize = config.getFetchSize(); + Assert.assertEquals(1000, fetchSize.intValue()); + } + + @Test + public void testGetSchemaReader() { + RedshiftConnectorConfig connectorConfig = new RedshiftConnectorConfig("username", "password", + "jdbcPluginName", "connectionArguments", + "host", "database", 1101); + RedshiftSource source = new RedshiftSource(new RedshiftSource.RedshiftSourceConfig(false, connectorConfig)); + SchemaReader schemaReader = source.getSchemaReader(); + Assert.assertTrue(schemaReader instanceof RedshiftSchemaReader); + } + + @Test + public void testGetDBRecordType() { + RedshiftConnectorConfig connectorConfig = new RedshiftConnectorConfig("username", "password", + "jdbcPluginName", "connectionArguments", + "host", "database", 1101); + RedshiftSource source = new RedshiftSource(new RedshiftSource.RedshiftSourceConfig(false, connectorConfig)); + Class dbRecordType = source.getDBRecordType(); + Assert.assertEquals(RedshiftDBRecord.class, dbRecordType); + } + + @Test + public void testCreateConnectionString() { + RedshiftConnectorConfig connectorConfig = new RedshiftConnectorConfig("username", "password", + "jdbcPluginName", "connectionArguments", + "localhost", "test", 5439); + RedshiftSource.RedshiftSourceConfig config = new RedshiftSource.RedshiftSourceConfig(false, connectorConfig); + + RedshiftSource source = new RedshiftSource(config); + String connectionString = source.createConnectionString(); + Assert.assertEquals("jdbc:redshift://localhost:5439/test", connectionString); + } + + @Test + public void testGetLineageRecorder() { + BatchSourceContext context = Mockito.mock(BatchSourceContext.class); + RedshiftConnectorConfig connectorConfig = new RedshiftConnectorConfig("username", "password", + "jdbcPluginName", "connectionArguments", + "host", "database", 1101); + RedshiftSource.RedshiftSourceConfig config = new RedshiftSource.RedshiftSourceConfig(false, connectorConfig); + RedshiftSource source = new RedshiftSource(config); + + LineageRecorder lineageRecorder = source.getLineageRecorder(context); + Assert.assertNotNull(lineageRecorder); + } +} diff --git a/amazon-redshift-plugin/widgets/Redshift-batchsource.json b/amazon-redshift-plugin/widgets/Redshift-batchsource.json index 91e860ee9..943e2d24e 100644 --- a/amazon-redshift-plugin/widgets/Redshift-batchsource.json +++ b/amazon-redshift-plugin/widgets/Redshift-batchsource.json @@ -135,7 +135,7 @@ }, { "widget-type": "textbox", - "label": "Split Column", + "label": "Split-By Field Name", "name": "splitBy" }, {