diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/provider/MysqlJdbcValueMappings.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/provider/MysqlJdbcValueMappings.java index d501db5978..43f23afdee 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/provider/MysqlJdbcValueMappings.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/provider/MysqlJdbcValueMappings.java @@ -66,6 +66,10 @@ public class MysqlJdbcValueMappings implements JdbcValueMappingsProvider { .unscaledValue() .toByteArray()); + /** Map BigInt Unsigned type to Avro Number. */ + private static final ResultSetValueMapper bigDecimalToAvroNumber = + (value, schema) -> value.toString(); + /* Hex Encoded string for bytes type. */ private static final ResultSetValueMapper bytesToHexString = (value, schema) -> new String(Hex.encodeHex(value)); @@ -191,7 +195,7 @@ private static long instantToMicro(Instant instant) { private static final ImmutableMap> SCHEMA_MAPPINGS = ImmutableMap., ResultSetValueMapper>>builder() .put("BIGINT", Pair.of(ResultSet::getLong, valuePassThrough)) - .put("BIGINT UNSIGNED", Pair.of(ResultSet::getBigDecimal, bigDecimalToByteArray)) + .put("BIGINT UNSIGNED", Pair.of(ResultSet::getBigDecimal, bigDecimalToAvroNumber)) .put("BINARY", Pair.of(ResultSet::getBytes, bytesToHexString)) .put("BIT", Pair.of(ResultSet::getBytes, bytesToLong)) .put("BLOB", Pair.of(ResultSet::getBlob, blobToHexString)) diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/MysqlMappingProvider.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/MysqlMappingProvider.java index 536c8a3fcf..8ea3ee32b4 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/MysqlMappingProvider.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/MysqlMappingProvider.java @@ -33,7 +33,7 @@ public final class MysqlMappingProvider { private static final ImmutableMap MAPPING = ImmutableMap.builder() .put("BIGINT", UnifiedMappingProvider.Type.LONG) - .put("BIGINT UNSIGNED", UnifiedMappingProvider.Type.DECIMAL) + .put("BIGINT UNSIGNED", UnifiedMappingProvider.Type.NUMBER) .put("BINARY", UnifiedMappingProvider.Type.STRING) .put("BIT", UnifiedMappingProvider.Type.LONG) .put("BLOB", UnifiedMappingProvider.Type.STRING) diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapperTest.java index d1fe943d38..df0abfe098 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapperTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapperTest.java @@ -272,7 +272,7 @@ private static ImmutableList mySQLColumns() { .derbyColumnType("BIGINT") .sourceColumnType("BIGINT UNSIGNED", new Long[] {20L, 0L}) .inputValue(12345L) - .mappedValue(ByteBuffer.wrap(new byte[] {(byte) 0x30, (byte) 0x39})) + .mappedValue("12345") .build()) .add( Column.builder() diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/MysqlMappingProviderTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/MysqlMappingProviderTest.java index b32ef0c6eb..56e883450a 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/MysqlMappingProviderTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/MysqlMappingProviderTest.java @@ -49,9 +49,7 @@ public void testMySqlMappingProvider() { private ImmutableMap expectedMapping() { return ImmutableMap.builder() .put("BIGINT", "\"long\"") - .put( - "BIGINT UNSIGNED", - "{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":1,\"scale\":1}") + .put("BIGINT UNSIGNED", "{\"type\":\"string\",\"logicalType\":\"number\"}") .put("BINARY", "\"string\"") .put("BIT", "\"long\"") .put("BLOB", "\"string\"") diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java index e00bcfe2ca..4c7ea2cdc6 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java @@ -47,6 +47,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.kerby.util.Hex; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -252,15 +253,20 @@ private Map genericRecordToMap(GenericRecord record) { switch (fieldType) { case INT: case LONG: + fieldValue = Long.valueOf(fieldValue.toString()); + break; case BOOLEAN: - fieldValue = (fieldValue == null) ? null : Long.valueOf(fieldValue.toString()); + fieldValue = Boolean.valueOf(fieldValue.toString()); break; case FLOAT: case DOUBLE: - fieldValue = (fieldValue == null) ? null : Double.valueOf(fieldValue.toString()); + fieldValue = Double.valueOf(fieldValue.toString()); + break; + case BYTES: + fieldValue = Hex.encode(((ByteBuffer) fieldValue).array()); break; default: - fieldValue = (fieldValue == null) ? null : fieldValue.toString(); + fieldValue = fieldValue.toString(); } map.put(fieldName, fieldValue); } diff --git a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java index 43f2a068ad..68b94f69d3 100644 --- a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java +++ b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java @@ -33,6 +33,9 @@ import com.google.cloud.teleport.v2.spanner.migrations.schema.IdentityMapper; import com.google.cloud.teleport.v2.spanner.migrations.schema.SessionBasedMapper; import com.google.cloud.teleport.v2.spanner.type.Type; +import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer; +import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest; +import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse; import com.google.common.io.Resources; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -46,6 +49,7 @@ import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.collections.map.HashedMap; import org.junit.Test; import org.mockito.Mockito; @@ -393,9 +397,14 @@ public void transformChangeEventTest_identityMapper() throws InvalidTransformati genericRecord.put("bytes_col", ByteBuffer.wrap(new byte[] {10, 20, 30})); genericRecord.put("timestamp_col", 1602599400056483L); genericRecord.put("date_col", 738991); + + GenericRecord genericRecordAllNulls = new GenericData.Record(getAllSpannerTypesSchema()); + getAllSpannerTypesSchema().getFields().stream() + .forEach(f -> genericRecordAllNulls.put(f.name(), null)); + GenericRecordTypeConvertor genericRecordTypeConvertor = new GenericRecordTypeConvertor(new IdentityMapper(getIdentityDdl()), "", null, null); - Map actual = + Map actualWithoutCustomTransform = genericRecordTypeConvertor.transformChangeEvent(genericRecord, "all_types"); Map expected = Map.of( @@ -408,7 +417,72 @@ public void transformChangeEventTest_identityMapper() throws InvalidTransformati "timestamp_col", Value.timestamp(Timestamp.parseTimestamp("2020-10-13T14:30:00.056483Z")), "date_col", Value.date(com.google.cloud.Date.parseDate("3993-04-16"))); - assertEquals(expected, actual); + // Implementation Detail, the transform returns Spanner values, and Value.Null is not equal to + // java null, + // So simple transform for expected map to have null values does not work for us. + Map expectedNulls = + Map.of( + "bool_col", + Value.bool(null), + "int_col", + Value.int64(null), + "float_col", + Value.float64(null), + "string_col", + Value.string(null), + "numeric_col", + Value.numeric(null), + "bytes_col", + Value.bytes(null), + "timestamp_col", + Value.timestamp(null), + "date_col", + Value.date(null)); + Map actualWithCustomTransform = + new GenericRecordTypeConvertor( + new IdentityMapper(getIdentityDdl()), + "", + null, + new TestCustomTransform(expected, false, false)) + .transformChangeEvent(genericRecord, "all_types"); + + /* Checks that when there's no custom transform, output is as expected */ + assertEquals(expected, actualWithoutCustomTransform); + + /* Checks for the part of the code that supplies inputs to custom transforms */ + + /* Check correct input map generated when using customTransform */ + assertEquals(expected, actualWithCustomTransform); + + /* Checks that if any fields is made null by the custom transform, we get output with values as Value.NULL */ + assertEquals( + expectedNulls, + new GenericRecordTypeConvertor( + new IdentityMapper(getIdentityDdl()), + "", + null, + new TestCustomTransform(expected, false, true)) + .transformChangeEvent(genericRecord, "all_types")); + + /* Checks that if event is filtered by the custom transform, output is null. */ + assertEquals( + null, + new GenericRecordTypeConvertor( + new IdentityMapper(getIdentityDdl()), + "", + null, + new TestCustomTransform(expected, true, false)) + .transformChangeEvent(genericRecord, "all_types")); + + /* Checks that if any field in generic record is null, we get custom transform input map entry with value as Value.NULL */ + assertEquals( + expectedNulls, + new GenericRecordTypeConvertor( + new IdentityMapper(getIdentityDdl()), + "", + null, + new TestCustomTransform(expected, false, false)) + .transformChangeEvent(genericRecordAllNulls, "all_types")); } @Test @@ -796,4 +870,40 @@ public void transformChangeEventTest_SynthPKPopulation() throws InvalidTransform assertTrue(actual.containsKey("synth_id")); assertEquals(Value.string("name1"), actual.get("new_name")); } + + private class TestCustomTransform implements ISpannerMigrationTransformer { + + private Map expected; + private Boolean isFiltered; + private Boolean nullify; + + public TestCustomTransform(Map expected, boolean isFiltered, boolean nullify) { + this.expected = expected; + this.isFiltered = isFiltered; + this.nullify = nullify; + } + + @Override + public void init(String customParameters) {} + + @Override + public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request) + throws InvalidTransformationException { + if (!nullify) { + return new MigrationTransformationResponse(request.getRequestRow(), isFiltered); + } else { + Map allNulls = new HashedMap(); + for (String k : request.getRequestRow().keySet()) { + allNulls.put(k, null); + } + return new MigrationTransformationResponse(allNulls, isFiltered); + } + } + + @Override + public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request) + throws InvalidTransformationException { + return null; + } + } }