Skip to content

Commit

Permalink
big-int-unsgined transforms (#2039)
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle authored Nov 29, 2024
1 parent 3455819 commit c9d233f
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public class MysqlJdbcValueMappings implements JdbcValueMappingsProvider {
.unscaledValue()
.toByteArray());

/** Map BigInt Unsigned type to Avro Number. */
private static final ResultSetValueMapper<BigDecimal> bigDecimalToAvroNumber =
(value, schema) -> value.toString();

/* Hex Encoded string for bytes type. */
private static final ResultSetValueMapper<byte[]> bytesToHexString =
(value, schema) -> new String(Hex.encodeHex(value));
Expand Down Expand Up @@ -191,7 +195,7 @@ private static long instantToMicro(Instant instant) {
private static final ImmutableMap<String, JdbcValueMapper<?>> SCHEMA_MAPPINGS =
ImmutableMap.<String, Pair<ResultSetValueExtractor<?>, ResultSetValueMapper<?>>>builder()
.put("BIGINT", Pair.of(ResultSet::getLong, valuePassThrough))
.put("BIGINT UNSIGNED", Pair.of(ResultSet::getBigDecimal, bigDecimalToByteArray))
.put("BIGINT UNSIGNED", Pair.of(ResultSet::getBigDecimal, bigDecimalToAvroNumber))
.put("BINARY", Pair.of(ResultSet::getBytes, bytesToHexString))
.put("BIT", Pair.of(ResultSet::getBytes, bytesToLong))
.put("BLOB", Pair.of(ResultSet::getBlob, blobToHexString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final class MysqlMappingProvider {
private static final ImmutableMap<String, UnifiedTypeMapping> MAPPING =
ImmutableMap.<String, UnifiedMappingProvider.Type>builder()
.put("BIGINT", UnifiedMappingProvider.Type.LONG)
.put("BIGINT UNSIGNED", UnifiedMappingProvider.Type.DECIMAL)
.put("BIGINT UNSIGNED", UnifiedMappingProvider.Type.NUMBER)
.put("BINARY", UnifiedMappingProvider.Type.STRING)
.put("BIT", UnifiedMappingProvider.Type.LONG)
.put("BLOB", UnifiedMappingProvider.Type.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private static ImmutableList<Column> mySQLColumns() {
.derbyColumnType("BIGINT")
.sourceColumnType("BIGINT UNSIGNED", new Long[] {20L, 0L})
.inputValue(12345L)
.mappedValue(ByteBuffer.wrap(new byte[] {(byte) 0x30, (byte) 0x39}))
.mappedValue("12345")
.build())
.add(
Column.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ public void testMySqlMappingProvider() {
private ImmutableMap<String, String> expectedMapping() {
return ImmutableMap.<String, String>builder()
.put("BIGINT", "\"long\"")
.put(
"BIGINT UNSIGNED",
"{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":1,\"scale\":1}")
.put("BIGINT UNSIGNED", "{\"type\":\"string\",\"logicalType\":\"number\"}")
.put("BINARY", "\"string\"")
.put("BIT", "\"long\"")
.put("BLOB", "\"string\"")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.kerby.util.Hex;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -252,15 +253,20 @@ private Map<String, Object> genericRecordToMap(GenericRecord record) {
switch (fieldType) {
case INT:
case LONG:
fieldValue = Long.valueOf(fieldValue.toString());
break;
case BOOLEAN:
fieldValue = (fieldValue == null) ? null : Long.valueOf(fieldValue.toString());
fieldValue = Boolean.valueOf(fieldValue.toString());
break;
case FLOAT:
case DOUBLE:
fieldValue = (fieldValue == null) ? null : Double.valueOf(fieldValue.toString());
fieldValue = Double.valueOf(fieldValue.toString());
break;
case BYTES:
fieldValue = Hex.encode(((ByteBuffer) fieldValue).array());
break;
default:
fieldValue = (fieldValue == null) ? null : fieldValue.toString();
fieldValue = fieldValue.toString();
}
map.put(fieldName, fieldValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import com.google.cloud.teleport.v2.spanner.migrations.schema.IdentityMapper;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SessionBasedMapper;
import com.google.cloud.teleport.v2.spanner.type.Type;
import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse;
import com.google.common.io.Resources;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand All @@ -46,6 +49,7 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.map.HashedMap;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -393,9 +397,14 @@ public void transformChangeEventTest_identityMapper() throws InvalidTransformati
genericRecord.put("bytes_col", ByteBuffer.wrap(new byte[] {10, 20, 30}));
genericRecord.put("timestamp_col", 1602599400056483L);
genericRecord.put("date_col", 738991);

GenericRecord genericRecordAllNulls = new GenericData.Record(getAllSpannerTypesSchema());
getAllSpannerTypesSchema().getFields().stream()
.forEach(f -> genericRecordAllNulls.put(f.name(), null));

GenericRecordTypeConvertor genericRecordTypeConvertor =
new GenericRecordTypeConvertor(new IdentityMapper(getIdentityDdl()), "", null, null);
Map<String, Value> actual =
Map<String, Value> actualWithoutCustomTransform =
genericRecordTypeConvertor.transformChangeEvent(genericRecord, "all_types");
Map<String, Value> expected =
Map.of(
Expand All @@ -408,7 +417,72 @@ public void transformChangeEventTest_identityMapper() throws InvalidTransformati
"timestamp_col",
Value.timestamp(Timestamp.parseTimestamp("2020-10-13T14:30:00.056483Z")),
"date_col", Value.date(com.google.cloud.Date.parseDate("3993-04-16")));
assertEquals(expected, actual);
// Implementation Detail, the transform returns Spanner values, and Value.Null is not equal to
// java null,
// So simple transform for expected map to have null values does not work for us.
Map<String, Value> expectedNulls =
Map.of(
"bool_col",
Value.bool(null),
"int_col",
Value.int64(null),
"float_col",
Value.float64(null),
"string_col",
Value.string(null),
"numeric_col",
Value.numeric(null),
"bytes_col",
Value.bytes(null),
"timestamp_col",
Value.timestamp(null),
"date_col",
Value.date(null));
Map<String, Value> actualWithCustomTransform =
new GenericRecordTypeConvertor(
new IdentityMapper(getIdentityDdl()),
"",
null,
new TestCustomTransform(expected, false, false))
.transformChangeEvent(genericRecord, "all_types");

/* Checks that when there's no custom transform, output is as expected */
assertEquals(expected, actualWithoutCustomTransform);

/* Checks for the part of the code that supplies inputs to custom transforms */

/* Check correct input map generated when using customTransform */
assertEquals(expected, actualWithCustomTransform);

/* Checks that if any fields is made null by the custom transform, we get output with values as Value.NULL */
assertEquals(
expectedNulls,
new GenericRecordTypeConvertor(
new IdentityMapper(getIdentityDdl()),
"",
null,
new TestCustomTransform(expected, false, true))
.transformChangeEvent(genericRecord, "all_types"));

/* Checks that if event is filtered by the custom transform, output is null. */
assertEquals(
null,
new GenericRecordTypeConvertor(
new IdentityMapper(getIdentityDdl()),
"",
null,
new TestCustomTransform(expected, true, false))
.transformChangeEvent(genericRecord, "all_types"));

/* Checks that if any field in generic record is null, we get custom transform input map entry with value as Value.NULL */
assertEquals(
expectedNulls,
new GenericRecordTypeConvertor(
new IdentityMapper(getIdentityDdl()),
"",
null,
new TestCustomTransform(expected, false, false))
.transformChangeEvent(genericRecordAllNulls, "all_types"));
}

@Test
Expand Down Expand Up @@ -796,4 +870,40 @@ public void transformChangeEventTest_SynthPKPopulation() throws InvalidTransform
assertTrue(actual.containsKey("synth_id"));
assertEquals(Value.string("name1"), actual.get("new_name"));
}

private class TestCustomTransform implements ISpannerMigrationTransformer {

private Map<String, Value> expected;
private Boolean isFiltered;
private Boolean nullify;

public TestCustomTransform(Map<String, Value> expected, boolean isFiltered, boolean nullify) {
this.expected = expected;
this.isFiltered = isFiltered;
this.nullify = nullify;
}

@Override
public void init(String customParameters) {}

@Override
public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request)
throws InvalidTransformationException {
if (!nullify) {
return new MigrationTransformationResponse(request.getRequestRow(), isFiltered);
} else {
Map<String, Object> allNulls = new HashedMap();
for (String k : request.getRequestRow().keySet()) {
allNulls.put(k, null);
}
return new MigrationTransformationResponse(allNulls, isFiltered);
}
}

@Override
public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request)
throws InvalidTransformationException {
return null;
}
}
}

0 comments on commit c9d233f

Please sign in to comment.