Skip to content

Commit

Permalink
minor code fixes and test case addition
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasrathee-cs committed Nov 20, 2023
1 parent 39eb8d5 commit f9e6f9c
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 27 deletions.
17 changes: 11 additions & 6 deletions amazon-redshift-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<name>Amazon Redshift plugin</name>
<artifactId>amazon-redshift-plugin</artifactId>
<modelVersion>4.0.0</modelVersion>

<properties>
<redshift-jdbc.version>2.1.0.18</redshift-jdbc.version>
</properties>
Expand Down Expand Up @@ -83,6 +83,11 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-api</artifactId>
Expand All @@ -109,11 +114,11 @@
<configuration>
<instructions>
<_exportcontents>
io.cdap.plugin.amazon.redshift.*;
io.cdap.plugin.db.source.*;
org.apache.commons.lang;
org.apache.commons.logging.*;
org.codehaus.jackson.*
io.cdap.plugin.amazon.redshift.*;
io.cdap.plugin.db.source.*;
org.apache.commons.lang;
org.apache.commons.logging.*;
org.codehaus.jackson.*
</_exportcontents>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ protected String getTableName(String database, String schema, String table) {
return String.format("\"%s\".\"%s\"", schema, table);
}

@Override
protected String getRandomQuery(String tableName, int limit) {
return String.format("SELECT * FROM %s\n" +
"TABLESAMPLE BERNOULLI (100.0 * %d / (SELECT COUNT(*) FROM %s))",
tableName, limit, tableName);
}

@Override
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
ConnectorSpec.Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
Schema nonNullableSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
setZonedDateTimeBasedOnOutputSchema(recordBuilder, nonNullableSchema.getLogicalType(),
field.getName(), zonedDateTime);
field.getName(), zonedDateTime);
} else {
recordBuilder.set(field.getName(), null);
}
Expand Down Expand Up @@ -105,9 +105,9 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
}

private void setZonedDateTimeBasedOnOutputSchema(StructuredRecord.Builder recordBuilder,
Schema.LogicalType logicalType,
String fieldName,
ZonedDateTime zonedDateTime) {
Schema.LogicalType logicalType,
String fieldName,
ZonedDateTime zonedDateTime) {
if (Schema.LogicalType.DATETIME.equals(logicalType)) {
recordBuilder.setDateTime(fieldName, zonedDateTime.toLocalDateTime());
} else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(logicalType)) {
Expand All @@ -118,8 +118,7 @@ private void setZonedDateTimeBasedOnOutputSchema(StructuredRecord.Builder record
private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
String columnTypeName = metadata.getColumnTypeName(columnIndex);
// If the column Type Name is present in the String mapped Redshift types then return true.
return RedshiftSchemaReader.STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(columnTypeName)
|| RedshiftSchemaReader.STRING_MAPPED_REDSHIFT_TYPES.contains(metadata.getColumnType(columnIndex));
return RedshiftSchemaReader.STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(columnTypeName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,14 @@
import java.util.Set;

/**
* Redshift Schema Reader class
* Redshift Schema Reader class
*/
public class RedshiftSchemaReader extends CommonSchemaReader {

private static final Logger LOG = LoggerFactory.getLogger(RedshiftSchemaReader.class);

public static final Set<Integer> STRING_MAPPED_REDSHIFT_TYPES = ImmutableSet.of(
Types.OTHER, Types.ARRAY, Types.SQLXML
);

public static final Set<String> STRING_MAPPED_REDSHIFT_TYPES_NAMES = ImmutableSet.of(
"timetz", "money"
"timetz", "money"
);

private final String sessionID;
Expand All @@ -61,7 +57,7 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
String typeName = metadata.getColumnTypeName(index);
int columnType = metadata.getColumnType(index);

if (STRING_MAPPED_REDSHIFT_TYPES.contains(columnType) || STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
return Schema.of(Schema.Type.STRING);
}
if (typeName.equalsIgnoreCase("INT")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
Expand Down Expand Up @@ -59,11 +58,6 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
this.redshiftSourceConfig = redshiftSourceConfig;
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
}

@Override
protected SchemaReader getSchemaReader() {
return new RedshiftSchemaReader();
Expand All @@ -85,8 +79,7 @@ protected String createConnectionString() {

@Override
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
String host = redshiftSourceConfig.getConnection().getHost();
String fqn = DBUtils.constructFQN("redshift", host,
String fqn = DBUtils.constructFQN("redshift", redshiftSourceConfig.getConnection().getHost(),
redshiftSourceConfig.getConnection().getPort(),
redshiftSourceConfig.getConnection().getDatabase(),
redshiftSourceConfig.getReferenceName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.amazon.redshift;

import io.cdap.plugin.db.connector.DBSpecificConnectorBaseTest;
import org.junit.Test;

import java.io.IOException;

/**
* Unit tests for {@link RedshiftConnector}
*/
public class RedshiftConnectorTest extends DBSpecificConnectorBaseTest {

private static final String JDBC_DRIVER_CLASS_NAME = "org.postgresql.Driver";

@Test
public void test() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
test(new RedshiftConnector(
new RedshiftConnectorConfig(username, password, JDBC_PLUGIN_NAME, connectionArguments, host, database,
port)),
JDBC_DRIVER_CLASS_NAME, RedshiftConstants.PLUGIN_NAME);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.amazon.redshift;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/**
* Unit tests for {@link RedshiftConnector}
*/
public class RedshiftConnectorUnitTest {
@Rule
public ExpectedException expectedEx = ExpectedException.none();

private static final RedshiftConnector CONNECTOR = new RedshiftConnector(null);

/**
* Unit test for getTableName()
*/
@Test
public void getTableNameTest() {
Assert.assertEquals("\"schema\".\"table\"",
CONNECTOR.getTableName("db", "schema", "table"));
}

/**
* Unit tests for getTableQuery()
*/
@Test
public void getTableQueryTest() {
String tableName = CONNECTOR.getTableName("db", "schema", "table");

// random query
Assert.assertEquals(String.format("SELECT * FROM %s\n" +
"TABLESAMPLE BERNOULLI (100.0 * %d / (SELECT COUNT(*) FROM %s))",
tableName, 100, tableName),
CONNECTOR.getRandomQuery(tableName, 100));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.amazon.redshift;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.util.DBUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

/**
* Unit Test class for the PostgresDBRecord
*/
@RunWith(MockitoJUnitRunner.class)
public class RedshiftDBRecordUnitTest {

private static final int DEFAULT_PRECISION = 38;

/**
* Validate the precision less Numbers handling against following use cases.
* 1. Ensure that the numeric type with [p,s] set as [38,4] detect as BigDecimal(38,4) in cdap.
* 2. Ensure that the numeric type without [p,s] detect as String type in cdap.
*
* @throws Exception
*/
@Test
public void validatePrecisionLessDecimalParsing() throws Exception {
Schema.Field field1 = Schema.Field.of("ID1", Schema.decimalOf(DEFAULT_PRECISION, 4));
Schema.Field field2 = Schema.Field.of("ID2", Schema.of(Schema.Type.STRING));

Schema schema = Schema.recordOf(
"dbRecord",
field1,
field2
);

ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
Mockito.when(resultSetMetaData.getColumnType(Mockito.eq(1))).thenReturn(Types.NUMERIC);
Mockito.when(resultSetMetaData.getPrecision(Mockito.eq(1))).thenReturn(DEFAULT_PRECISION);
Mockito.when(resultSetMetaData.getColumnType(eq(2))).thenReturn(Types.NUMERIC);
when(resultSetMetaData.getPrecision(eq(2))).thenReturn(0);

ResultSet resultSet = Mockito.mock(ResultSet.class);

when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
when(resultSet.getBigDecimal(eq(1))).thenReturn(BigDecimal.valueOf(123.4568));
when(resultSet.getString(eq(2))).thenReturn("123.4568");

StructuredRecord.Builder builder = StructuredRecord.builder(schema);
RedshiftDBRecord dbRecord = new RedshiftDBRecord();
dbRecord.handleField(resultSet, builder, field1, 1, Types.NUMERIC, DEFAULT_PRECISION, 4);
dbRecord.handleField(resultSet, builder, field2, 2, Types.NUMERIC, 0, -127);

StructuredRecord record = builder.build();
Assert.assertTrue(record.getDecimal("ID1") instanceof BigDecimal);
Assert.assertEquals(record.getDecimal("ID1"), BigDecimal.valueOf(123.4568));
Assert.assertTrue(record.get("ID2") instanceof String);
Assert.assertEquals(record.get("ID2"), "123.4568");
}

@Test
public void validateTimestampType() throws SQLException {
OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamp");

ResultSet resultSet = Mockito.mock(ResultSet.class);
when(resultSet.getMetaData()).thenReturn(metaData);
when(resultSet.getTimestamp(eq(0), eq(DBUtils.PURE_GREGORIAN_CALENDAR)))
.thenReturn(Timestamp.from(offsetDateTime.toInstant()));

Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.DATETIME));
Schema schema = Schema.recordOf(
"dbRecord",
field1
);
StructuredRecord.Builder builder = StructuredRecord.builder(schema);

RedshiftDBRecord dbRecord = new RedshiftDBRecord();
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
StructuredRecord record = builder.build();
Assert.assertNotNull(record);
Assert.assertNotNull(record.getDateTime("field1"));
Assert.assertEquals(record.getDateTime("field1").toInstant(ZoneOffset.UTC), offsetDateTime.toInstant());

// Validate backward compatibility

field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
schema = Schema.recordOf(
"dbRecord",
field1
);
builder = StructuredRecord.builder(schema);
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
record = builder.build();
Assert.assertNotNull(record);
Assert.assertNotNull(record.getTimestamp("field1"));
Assert.assertEquals(record.getTimestamp("field1").toInstant(), offsetDateTime.toInstant());
}

@Test
public void validateTimestampTZType() throws SQLException {
OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamptz");

ResultSet resultSet = Mockito.mock(ResultSet.class);
when(resultSet.getMetaData()).thenReturn(metaData);
when(resultSet.getObject(eq(0), eq(OffsetDateTime.class))).thenReturn(offsetDateTime);

Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
Schema schema = Schema.recordOf(
"dbRecord",
field1
);
StructuredRecord.Builder builder = StructuredRecord.builder(schema);

RedshiftDBRecord dbRecord = new RedshiftDBRecord();
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
StructuredRecord record = builder.build();
Assert.assertNotNull(record);
Assert.assertNotNull(record.getTimestamp("field1", ZoneId.of("UTC")));
Assert.assertEquals(record.getTimestamp("field1", ZoneId.of("UTC")).toInstant(), offsetDateTime.toInstant());
}
}
Loading

0 comments on commit f9e6f9c

Please sign in to comment.