diff --git a/v2/gcs-to-sourcedb/src/test/resources/GCSToSourceDbWithoutReaderIT/events.txt b/v2/gcs-to-sourcedb/src/test/resources/GCSToSourceDbWithoutReaderIT/events.txt index 19829a98ab..5d642b6cac 100644 --- a/v2/gcs-to-sourcedb/src/test/resources/GCSToSourceDbWithoutReaderIT/events.txt +++ b/v2/gcs-to-sourcedb/src/test/resources/GCSToSourceDbWithoutReaderIT/events.txt @@ -1,6 +1,6 @@ {"commitTimestamp":{"seconds":1719247984,"nanos":109263000},"serverTransactionId":"NzkxOTE5OTY4NTQzNjMzODY3Mg\u003d\u003d","recordSequence":"00000000","tableName":"Users","mods":[{"keysJson":"{\"id\":\"1\"}","oldValuesJson":"{}","newValuesJson":"{\"name\":\"FF\"}"}],"modType":"INSERT","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} -{"commitTimestamp":{"seconds":1719846465,"nanos":372383000},"serverTransactionId":"Mjk0MTkyNDQ4NzkyMDcyMzcwOQ\u003d\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example2\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"1000\",\"binary_column\":\"YmluX2NvbHVtbg\u003d\u003d\",\"bit_column\":\"MQ\u003d\u003d\",\"blob_column\":\"YmxvYl9jb2x1bW4\u003d\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"99999.99\",\"double_column\":123456.123,\"enum_column\":\"1\",\"float_column\":12345.67,\"int_column\":\"100\",\"text_column\":\"Sample text for entry 2\",\"time_column\":\"410000\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"2\",\"year_column\":\"2024\"}"}],"modType":"INSERT","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} -{"commitTimestamp":{"seconds":1719846503,"nanos":617919000},"serverTransactionId":"MTY0MDExNjY5NDAxMjEwMDA5Mzg\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example2\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"1000\",\"binary_column\":\"YmluX2NvbHVtbg\u003d\u003d\",\"bit_column\":\"MQ\u003d\u003d\",\"blob_column\":\"YmxvYl9jb2x1bW4\u003d\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"99999.99\",\"double_column\":123456.123,\"enum_column\":\"1\",\"float_column\":12345.67,\"int_column\":\"100\",\"text_column\":\"Sample text for entry 2\",\"time_column\":\"143000\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"2\",\"year_column\":\"2024\"}"}],"modType":"UPDATE","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} -{"commitTimestamp":{"seconds":1719846503,"nanos":617919000},"serverTransactionId":"MTY0MDExNjY5NDAxMjEwMDA5Mzg\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example2\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"1000\",\"binary_column\":\"YmluX2NvbHVtbg\u003d\u003d\",\"bit_column\":\"MQ\u003d\u003d\",\"blob_column\":\"YmxvYl9jb2x1bW4\u003d\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"99999.99\",\"double_column\":123456.123,\"enum_column\":\"1\",\"float_column\":12345.67,\"int_column\":\"100\",\"text_column\":\"Sample text for entry 2\",\"time_column\":\"143000\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"2\",\"year_column\":\"2024\"}"}],"modType":"DELETE","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} -{"commitTimestamp":{"seconds":1718795517,"nanos":877439000},"serverTransactionId":"MTI4NTc0NTY4OTYwMzkxMDgyNA\u003d\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example1\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"1000\",\"binary_column\":\"ZXhhbXBsZWJpbmFyeTE\u003d\",\"bit_column\":\"ZXhhbXBsZWJpdDE\u003d\",\"blob_column\":\"ZXhhbXBsZWJsb2Ix\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"99999.99\",\"double_column\":123456.123,\"enum_column\":\"1\",\"float_column\":12345.67,\"int_column\":\"100\",\"text_column\":\"Sample text for entry 1\",\"time_column\":\"143000\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"1\",\"year_column\":\"2024\"}"}],"modType":"INSERT","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} -{"commitTimestamp":{"seconds":1718781240,"nanos":419055000},"serverTransactionId":"MTQxMTQ5MzExNTc0NTkyNzAzODM\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"12345\",\"binary_column\":\"U29tZSBiaW5hcnkgZGF0YQ\u003d\u003d\",\"bit_column\":\"U29tZSBiaW5hcnkgZGF0YQ\u003d\u003d\",\"blob_column\":\"U29tZSBiaW5hcnkgZGF0YQ\u003d\u003d\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"12345.67\",\"double_column\":123.456,\"enum_column\":\"1\",\"float_column\":123.45,\"int_column\":\"123\",\"text_column\":\"Sample text\",\"time_column\":\"143000\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"1\",\"year_column\":\"2024\"}"}],"modType":"INSERT","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} \ No newline at end of file +{"commitTimestamp":{"seconds":1719846465,"nanos":372383000},"serverTransactionId":"Mjk0MTkyNDQ4NzkyMDcyMzcwOQ\u003d\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example2\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"1000\",\"binary_column\":\"YmluX2NvbHVtbg\u003d\u003d\",\"bit_column\":\"MQ\u003d\u003d\",\"blob_column\":\"YmxvYl9jb2x1bW4\u003d\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"99999.99\",\"double_column\":123456.123,\"enum_column\":\"1\",\"float_column\":12345.67,\"int_column\":\"100\",\"text_column\":\"Sample text for entry 2\",\"time_column\":\"14:30:00\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"2\",\"year_column\":\"2024\"}"}],"modType":"INSERT","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} +{"commitTimestamp":{"seconds":1719846503,"nanos":617919000},"serverTransactionId":"MTY0MDExNjY5NDAxMjEwMDA5Mzg\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example2\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"1000\",\"binary_column\":\"YmluX2NvbHVtbg\u003d\u003d\",\"bit_column\":\"MQ\u003d\u003d\",\"blob_column\":\"YmxvYl9jb2x1bW4\u003d\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"99999.99\",\"double_column\":123456.123,\"enum_column\":\"1\",\"float_column\":12345.67,\"int_column\":\"100\",\"text_column\":\"Sample text for entry 2\",\"time_column\":\"14:30:00\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"2\",\"year_column\":\"2024\"}"}],"modType":"UPDATE","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} +{"commitTimestamp":{"seconds":1719846503,"nanos":617919000},"serverTransactionId":"MTY0MDExNjY5NDAxMjEwMDA5Mzg\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example2\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"1000\",\"binary_column\":\"YmluX2NvbHVtbg\u003d\u003d\",\"bit_column\":\"MQ\u003d\u003d\",\"blob_column\":\"YmxvYl9jb2x1bW4\u003d\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"99999.99\",\"double_column\":123456.123,\"enum_column\":\"1\",\"float_column\":12345.67,\"int_column\":\"100\",\"text_column\":\"Sample text for entry 2\",\"time_column\":\"14:30:00\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"2\",\"year_column\":\"2024\"}"}],"modType":"DELETE","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} +{"commitTimestamp":{"seconds":1718795517,"nanos":877439000},"serverTransactionId":"MTI4NTc0NTY4OTYwMzkxMDgyNA\u003d\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example1\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"1000\",\"binary_column\":\"ZXhhbXBsZWJpbmFyeTE\u003d\",\"bit_column\":\"ZXhhbXBsZWJpdDE\u003d\",\"blob_column\":\"ZXhhbXBsZWJsb2Ix\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"99999.99\",\"double_column\":123456.123,\"enum_column\":\"1\",\"float_column\":12345.67,\"int_column\":\"100\",\"text_column\":\"Sample text for entry 1\",\"time_column\":\"14:30:00\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"1\",\"year_column\":\"2024\"}"}],"modType":"INSERT","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} +{"commitTimestamp":{"seconds":1718781240,"nanos":419055000},"serverTransactionId":"MTQxMTQ5MzExNTc0NTkyNzAzODM\u003d","recordSequence":"00000000","tableName":"AllDatatypeTransformation","mods":[{"keysJson":"{\"varchar_column\":\"example\"}","oldValuesJson":"{}","newValuesJson":"{\"bigint_column\":\"12345\",\"binary_column\":\"U29tZSBiaW5hcnkgZGF0YQ\u003d\u003d\",\"bit_column\":\"U29tZSBiaW5hcnkgZGF0YQ\u003d\u003d\",\"blob_column\":\"U29tZSBiaW5hcnkgZGF0YQ\u003d\u003d\",\"bool_column\":true,\"date_column\":\"2024-01-01\",\"datetime_column\":\"2024-01-01T12:34:56Z\",\"decimal_column\":\"12345.67\",\"double_column\":123.456,\"enum_column\":\"1\",\"float_column\":123.45,\"int_column\":\"123\",\"text_column\":\"Sample text\",\"time_column\":\"14:30:00\",\"timestamp_column\":\"2024-01-01T12:34:56Z\",\"tinyint_column\":\"1\",\"year_column\":\"2024\"}"}],"modType":"INSERT","numberOfRecordsInTransaction":1,"transactionTag":"","shard":"ls1"} \ No newline at end of file diff --git a/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForLiveIT.java b/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForLiveIT.java index f6b65ba41b..bf9c890bd0 100644 --- a/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForLiveIT.java +++ b/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForLiveIT.java @@ -131,7 +131,6 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques Long tinyIntColumn = Long.parseLong((String) requestRow.get("tinyint_column")) + 1; Long intColumn = Long.parseLong((String) requestRow.get("int_column")) + 1; Long bigIntColumn = Long.parseLong((String) requestRow.get("bigint_column")) + 1; - Long timeColumn = Long.parseLong((String) requestRow.get("time_column")) + 1000; Long yearColumn = Long.parseLong((String) requestRow.get("year_column")) + 1; BigDecimal floatColumn = (BigDecimal) requestRow.get("float_column"); BigDecimal doubleColumn = (BigDecimal) requestRow.get("double_column"); @@ -143,7 +142,6 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques responseRow.put("double_column", doubleColumn.add(BigDecimal.ONE).toString()); Double value = Double.parseDouble((String) requestRow.get("decimal_column")); responseRow.put("decimal_column", String.valueOf(value - 1)); - responseRow.put("time_column", "\'" + timeColumn + "\'"); responseRow.put("bool_column", "false"); responseRow.put("enum_column", "\'3\'"); responseRow.put( @@ -191,11 +189,26 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques "CONVERT_TZ(\'" + timestampColumn.substring(0, timestampColumn.length() - 1) + "\','+00:00','+00:00')"); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); + LocalTime time = LocalTime.parse((String) requestRow.get("time_column"), formatter); + + LocalTime newTime = time.plusMinutes(10); + responseRow.put("time_column", "\'" + newTime.format(formatter) + "\'"); } catch (Exception e) { throw new InvalidTransformationException(e); } + MigrationTransformationResponse response = + new MigrationTransformationResponse(responseRow, false); + return response; + } else if (request.getTableName().equals("Users1")) { + Map responseRow = new HashMap<>(); + Map requestRow = request.getRequestRow(); + String name = requestRow.get("name").toString(); + String[] nameArray = name.split(" "); + responseRow.put("first_name", "\'" + nameArray[0] + "\'"); + responseRow.put("last_name", "\'" + nameArray[1] + "\'"); MigrationTransformationResponse response = new MigrationTransformationResponse(responseRow, false); return response; diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java index 6f56432066..e58488283d 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java @@ -122,6 +122,7 @@ public void setUp() throws IOException, InterruptedException { getClass().getSimpleName(), "input/customShard.jar", "com.custom.CustomShardIdFetcherForIT", + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java new file mode 100644 index 0000000000..5f755e8ea7 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java @@ -0,0 +1,412 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.google.cloud.ByteArray; +import com.google.cloud.Date; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Value; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; +import com.google.common.io.Resources; +import com.google.pubsub.v1.SubscriptionName; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.apache.beam.it.gcp.storage.GcsResourceManager; +import org.apache.beam.it.jdbc.MySQLResourceManager; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for {@link SpannerToSourceDb} Flex template with custom transformation jar + * supplied. + */ +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(SpannerToSourceDb.class) +@RunWith(JUnit4.class) +public class SpannerToSourceDbCustomTransformationIT extends SpannerToSourceDbITBase { + private static final Logger LOG = + LoggerFactory.getLogger(SpannerToSourceDbCustomTransformationIT.class); + + private static final String SPANNER_DDL_RESOURCE = + "SpannerToSourceDbCustomTransformationIT/spanner-schema.sql"; + private static final String SESSION_FILE_RESOURCE = + "SpannerToSourceDbCustomTransformationIT/session.json"; + private static final String MYSQL_SCHEMA_FILE_RESOURCE = + "SpannerToSourceDbCustomTransformationIT/mysql-schema.sql"; + + private static final String TABLE = "Users1"; + + private static final String TABLE2 = "AllDatatypeTransformation"; + private static final HashSet testInstances = + new HashSet<>(); + private static PipelineLauncher.LaunchInfo jobInfo; + public static SpannerResourceManager spannerResourceManager; + private static SpannerResourceManager spannerMetadataResourceManager; + private static MySQLResourceManager jdbcResourceManager; + private static GcsResourceManager gcsResourceManager; + private static PubsubResourceManager pubsubResourceManager; + private SubscriptionName subscriptionName; + + /** + * Setup resource managers and Launch dataflow job once during the execution of this test class. + * + * @throws IOException + */ + @Before + public void setUp() throws IOException, InterruptedException { + skipBaseCleanup = true; + synchronized (SpannerToSourceDbCustomTransformationIT.class) { + testInstances.add(this); + if (jobInfo == null) { + spannerResourceManager = + createSpannerDatabase(SpannerToSourceDbCustomTransformationIT.SPANNER_DDL_RESOURCE); + spannerMetadataResourceManager = createSpannerMetadataDatabase(); + + jdbcResourceManager = MySQLResourceManager.builder(testName).build(); + + createMySQLSchema( + jdbcResourceManager, + SpannerToSourceDbCustomTransformationIT.MYSQL_SCHEMA_FILE_RESOURCE); + + gcsResourceManager = + GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials) + .build(); + createAndUploadShardConfigToGcs(gcsResourceManager, jdbcResourceManager); + gcsResourceManager.uploadArtifact( + "input/session.json", Resources.getResource(SESSION_FILE_RESOURCE).getPath()); + pubsubResourceManager = setUpPubSubResourceManager(); + subscriptionName = + createPubsubResources( + getClass().getSimpleName(), + pubsubResourceManager, + getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, "")); + CustomTransformation customTransformation = + CustomTransformation.builder( + "input/customShard.jar", "com.custom.CustomTransformationWithShardForLiveIT") + .build(); + createAndUploadJarToGcs(gcsResourceManager); + jobInfo = + launchDataflowJob( + gcsResourceManager, + spannerResourceManager, + spannerMetadataResourceManager, + subscriptionName.toString(), + null, + null, + null, + null, + customTransformation); + } + } + } + + /** + * Cleanup dataflow job and all the resources and resource managers. + * + * @throws IOException + */ + @AfterClass + public static void cleanUp() throws IOException { + for (SpannerToSourceDbCustomTransformationIT instance : testInstances) { + instance.tearDownBase(); + } + ResourceManagerUtils.cleanResources( + spannerResourceManager, + jdbcResourceManager, + spannerMetadataResourceManager, + gcsResourceManager, + pubsubResourceManager); + } + + @Test + public void spannerToSourceDbWithCustomTransformation() throws InterruptedException { + assertThatPipeline(jobInfo).isRunning(); + // Write row in Spanner + writeRowInSpanner(); + // Assert events on Mysql + assertRowInMySQL(); + } + + private void writeRowInSpanner() { + Mutation m = + Mutation.newInsertOrUpdateBuilder("Users1").set("id").to(1).set("name").to("AA BB").build(); + spannerResourceManager.write(m); + m = + Mutation.newInsertOrUpdateBuilder("AllDatatypeTransformation") + .set("varchar_column") + .to("example2") + .set("bigint_column") + .to(1000) + .set("binary_column") + .to(Value.bytes(ByteArray.copyFrom("bin_column"))) + .set("bit_column") + .to(Value.bytes(ByteArray.copyFrom("1"))) + .set("blob_column") + .to(Value.bytes(ByteArray.copyFrom("blob_column"))) + .set("bool_column") + .to(Value.bool(Boolean.TRUE)) + .set("date_column") + .to(Value.date(Date.fromYearMonthDay(2024, 01, 01))) + .set("datetime_column") + .to(Value.timestamp(Timestamp.parseTimestamp("2024-01-01T12:34:56Z"))) + .set("decimal_column") + .to(new BigDecimal("99999.99")) + .set("double_column") + .to(123456.123) + .set("enum_column") + .to("1") + .set("float_column") + .to(12345.67) + .set("int_column") + .to(100) + .set("text_column") + .to("Sample text for entry 2") + .set("time_column") + .to("14:30:00") + .set("timestamp_column") + .to(Value.timestamp(Timestamp.parseTimestamp("2024-01-01T12:34:56Z"))) + .set("tinyint_column") + .to(2) + .set("year_column") + .to("2024") + .build(); + spannerResourceManager.write(m); + m = + Mutation.newUpdateBuilder("AllDatatypeTransformation") + .set("varchar_column") + .to("example2") + .set("bigint_column") + .to(1000) + .set("binary_column") + .to(Value.bytes(ByteArray.copyFrom("bin_column"))) + .set("bit_column") + .to(Value.bytes(ByteArray.copyFrom("1"))) + .set("blob_column") + .to(Value.bytes(ByteArray.copyFrom("blob_column"))) + .set("bool_column") + .to(Value.bool(Boolean.TRUE)) + .set("date_column") + .to(Value.date(Date.fromYearMonthDay(2024, 01, 01))) + .set("datetime_column") + .to(Value.timestamp(Timestamp.parseTimestamp("2024-01-01T12:34:56Z"))) + .set("decimal_column") + .to(new BigDecimal("99999.99")) + .set("double_column") + .to(123456.123) + .set("enum_column") + .to("1") + .set("float_column") + .to(12345.67) + .set("int_column") + .to(100) + .set("text_column") + .to("Sample text for entry 2") + .set("time_column") + .to("14:30:00") + .set("timestamp_column") + .to(Value.timestamp(Timestamp.parseTimestamp("2024-01-01T12:34:56Z"))) + .set("tinyint_column") + .to(2) + .set("year_column") + .to("2024") + .build(); + spannerResourceManager.write(m); + m = Mutation.delete("AllDatatypeTransformation", Key.of("example2")); + spannerResourceManager.write(m); + m = + Mutation.newInsertBuilder("AllDatatypeTransformation") + .set("varchar_column") + .to("example1") + .set("bigint_column") + .to(1000) + .set("binary_column") + .to(Value.bytes(ByteArray.copyFrom("examplebinary1"))) + .set("bit_column") + .to(Value.bytes(ByteArray.copyFrom("1"))) + .set("blob_column") + .to(Value.bytes(ByteArray.copyFrom("exampleblob1"))) + .set("bool_column") + .to(Value.bool(Boolean.TRUE)) + .set("date_column") + .to(Value.date(Date.fromYearMonthDay(2024, 01, 01))) + .set("datetime_column") + .to(Timestamp.parseTimestamp("2024-01-01T12:34:56Z")) + .set("decimal_column") + .to(new BigDecimal("99999.99")) + .set("double_column") + .to(123456.123) + .set("enum_column") + .to("1") + .set("float_column") + .to(12345.67) + .set("int_column") + .to(100) + .set("text_column") + .to("Sample text for entry 1") + .set("time_column") + .to("14:30:00") + .set("timestamp_column") + .to(Timestamp.parseTimestamp("2024-01-01T12:34:56Z")) + .set("tinyint_column") + .to(1) + .set("year_column") + .to("2024") + .build(); + spannerResourceManager.write(m); + m = + Mutation.newInsertBuilder("AllDatatypeTransformation") + .set("varchar_column") + .to("example") + .set("bigint_column") + .to(12345) + .set("binary_column") + .to(Value.bytes(ByteArray.copyFrom("Some binary data"))) + .set("bit_column") + .to(Value.bytes(ByteArray.copyFrom("1"))) + .set("blob_column") + .to(Value.bytes(ByteArray.copyFrom("Some blob data"))) + .set("bool_column") + .to(Value.bool(Boolean.TRUE)) + .set("date_column") + .to(Value.date(Date.fromYearMonthDay(2024, 01, 01))) + .set("datetime_column") + .to(Value.timestamp(Timestamp.parseTimestamp("2024-01-01T12:34:56Z"))) + .set("decimal_column") + .to(new BigDecimal("12345.67")) + .set("double_column") + .to(123.456) + .set("enum_column") + .to("1") + .set("float_column") + .to(123.45) + .set("int_column") + .to(123) + .set("text_column") + .to("Sample text") + .set("time_column") + .to("14:30:00") + .set("timestamp_column") + .to(Value.timestamp(Timestamp.parseTimestamp("2024-01-01T12:34:56Z"))) + .set("tinyint_column") + .to(1) + .set("year_column") + .to("2024") + .build(); + spannerResourceManager.write(m); + } + + private void assertRowInMySQL() { + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition( + createConfig(jobInfo, Duration.ofMinutes(10)), + () -> jdbcResourceManager.getRowCount(TABLE) == 1); + assertThatResult(result).meetsConditions(); + + result = + pipelineOperator() + .waitForCondition( + createConfig(jobInfo, Duration.ofMinutes(10)), + () -> jdbcResourceManager.getRowCount(TABLE2) == 2); + assertThatResult(result).meetsConditions(); + + List> rows = jdbcResourceManager.readTable(TABLE); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).get("id")).isEqualTo(1); + assertThat(rows.get(0).get("first_name")).isEqualTo("AA"); + assertThat(rows.get(0).get("last_name")).isEqualTo("BB"); + + rows = + jdbcResourceManager.runSQLQuery( + String.format("select * from %s order by %s", TABLE2, "varchar_column")); + assertThat(rows).hasSize(2); + assertThat(rows.get(1).get("varchar_column")).isEqualTo("example2"); + assertThat(rows.get(1).get("bigint_column")).isEqualTo(1000); + assertThat(rows.get(1).get("binary_column")) + .isEqualTo("bin_column".getBytes(StandardCharsets.UTF_8)); + assertThat(rows.get(1).get("bit_column")).isEqualTo("1".getBytes(StandardCharsets.UTF_8)); + assertThat(rows.get(1).get("blob_column")) + .isEqualTo("blob_column".getBytes(StandardCharsets.UTF_8)); + assertThat(rows.get(1).get("bool_column")).isEqualTo(true); + assertThat(rows.get(1).get("date_column")).isEqualTo(java.sql.Date.valueOf("2024-01-01")); + assertThat(rows.get(1).get("datetime_column")) + .isEqualTo(java.time.LocalDateTime.of(2024, 1, 1, 12, 34, 56)); + assertThat(rows.get(1).get("decimal_column")).isEqualTo(new BigDecimal("99999.99")); + assertThat(rows.get(1).get("double_column")).isEqualTo(123456.123); + assertThat(rows.get(1).get("enum_column")).isEqualTo("1"); + assertThat(rows.get(1).get("float_column")).isEqualTo(12345.67f); + assertThat(rows.get(1).get("int_column")).isEqualTo(100); + assertThat(rows.get(1).get("text_column")).isEqualTo("Sample text for entry 2"); + assertThat(rows.get(1).get("time_column")).isEqualTo(java.sql.Time.valueOf("14:30:00")); + assertThat(rows.get(1).get("timestamp_column")) + .isEqualTo(java.sql.Timestamp.valueOf("2024-01-01 12:34:56.0")); + assertThat(rows.get(1).get("tinyint_column")).isEqualTo(2); + assertThat(rows.get(1).get("year_column")).isEqualTo(java.sql.Date.valueOf("2024-01-01")); + + assertThat(rows.get(0).get("varchar_column")).isEqualTo("example"); + assertThat(rows.get(0).get("bigint_column")).isEqualTo(12346); + assertThat(rows.get(0).get("binary_column")) + .isEqualTo("binary_column_appended".getBytes(StandardCharsets.UTF_8)); + assertThat(rows.get(0).get("bit_column")).isEqualTo("5".getBytes(StandardCharsets.UTF_8)); + assertThat(rows.get(0).get("blob_column")) + .isEqualTo("blob_column_appended".getBytes(StandardCharsets.UTF_8)); + assertThat(rows.get(0).get("bool_column")).isEqualTo(false); + assertThat(rows.get(0).get("date_column")).isEqualTo(java.sql.Date.valueOf("2024-01-02")); + assertThat(rows.get(0).get("datetime_column")) + .isEqualTo(java.time.LocalDateTime.of(2024, 1, 1, 12, 34, 55)); + assertThat(rows.get(0).get("decimal_column")).isEqualTo(new BigDecimal("12344.67")); + assertThat(rows.get(0).get("double_column")).isEqualTo(124.456); + assertThat(rows.get(0).get("enum_column")).isEqualTo("3"); + assertThat(rows.get(0).get("float_column")).isEqualTo(124.45f); + assertThat(rows.get(0).get("int_column")).isEqualTo(124); + assertThat(rows.get(0).get("text_column")).isEqualTo("Sample text append"); + assertThat(rows.get(0).get("time_column")).isEqualTo(java.sql.Time.valueOf("14:40:00")); + assertThat(rows.get(0).get("timestamp_column")) + .isEqualTo(java.sql.Timestamp.valueOf("2024-01-01 12:34:55.0")); + assertThat(rows.get(0).get("tinyint_column")).isEqualTo(2); + assertThat(rows.get(0).get("year_column")).isEqualTo(java.sql.Date.valueOf("2025-01-01")); + + rows = + jdbcResourceManager.runSQLQuery( + String.format( + "select * from %s where %s like '%s'", TABLE2, "varchar_column", "example1")); + assertThat(rows).hasSize(0); + } +} diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java index 710526fb5a..dbd023cdef 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java @@ -117,6 +117,7 @@ public void setUp() throws IOException { null, null, null, + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbIT.java index 0a97e21d00..7c3ad39760 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbIT.java @@ -113,6 +113,7 @@ public void setUp() throws IOException { null, null, null, + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java index 781b4c4a2e..64d15895cd 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java @@ -18,6 +18,7 @@ import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; +import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; import com.google.common.io.Resources; import com.google.gson.Gson; import com.google.gson.JsonArray; @@ -123,7 +124,8 @@ public PipelineLauncher.LaunchInfo launchDataflowJob( String identifierSuffix, String shardingCustomJarPath, String shardingCustomClassName, - String sourceDbTimezoneOffset) + String sourceDbTimezoneOffset, + CustomTransformation customTransformation) throws IOException { // default parameters @@ -159,6 +161,12 @@ public PipelineLauncher.LaunchInfo launchDataflowJob( params.put("sourceDbTimezoneOffset", sourceDbTimezoneOffset); } + if (customTransformation != null) { + params.put( + "transformationJarPath", getGcsPath(customTransformation.jarPath(), gcsResourceManager)); + params.put("transformationClassName", customTransformation.classPath()); + } + // Construct template String jobName = PipelineUtils.createJobName("rrev-it" + testName); // /-DunifiedWorker=true when using runner v2 diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbInterleaveMultiShardIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbInterleaveMultiShardIT.java index 25a1b1b991..1f5acdc952 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbInterleaveMultiShardIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbInterleaveMultiShardIT.java @@ -123,6 +123,7 @@ public void setUp() throws IOException { null, null, null, + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbTimezoneIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbTimezoneIT.java index c8c3ce5945..1ab2d78b49 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbTimezoneIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbTimezoneIT.java @@ -112,7 +112,8 @@ public void setUp() throws IOException { null, null, null, - "+10:00"); + "+10:00", + null); } } } diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/mysql-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/mysql-schema.sql new file mode 100644 index 0000000000..6e68b9af51 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/mysql-schema.sql @@ -0,0 +1,27 @@ +CREATE TABLE Users1 ( + id INT NOT NULL, + first_name VARCHAR(25), + last_name VARCHAR(25), + PRIMARY KEY(id)); + +CREATE TABLE AllDatatypeTransformation ( + varchar_column VARCHAR(20) NOT NULL, + tinyint_column TINYINT, + text_column TEXT, + date_column DATE, + int_column INT, + bigint_column BIGINT, + float_column FLOAT(10,2), + double_column DOUBLE, + decimal_column DECIMAL(10,2), + datetime_column DATETIME, + timestamp_column TIMESTAMP, + time_column TIME, + year_column YEAR, + blob_column BLOB, + enum_column ENUM('1','2','3'), + bool_column TINYINT(1), + binary_column VARBINARY(150), + bit_column BIT(8), + PRIMARY KEY (varchar_column) +); \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/session.json b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/session.json new file mode 100644 index 0000000000..e1a764ec4b --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/session.json @@ -0,0 +1,885 @@ +{ + "SessionName": "NewSession", + "EditorName": "", + "DatabaseType": "mysql", + "DatabaseName": "rr_write", + "Dialect": "google_standard_sql", + "Notes": null, + "Tags": null, + "SpSchema": { + "t113": { + "Name": "AllDatatypeTransformation", + "ColIds": [ + "c115", + "c116", + "c117", + "c118", + "c119", + "c120", + "c121", + "c122", + "c123", + "c124", + "c125", + "c126", + "c127", + "c128", + "c129", + "c130", + "c131", + "c132" + ], + "ShardIdColumn": "", + "ColDefs": { + "c115": { + "Name": "varchar_column", + "T": { + "Name": "STRING", + "Len": 20, + "IsArray": false + }, + "NotNull": true, + "Comment": "From: varchar_column varchar(20)", + "Id": "c115", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c116": { + "Name": "tinyint_column", + "T": { + "Name": "INT64", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: tinyint_column tinyint(3)", + "Id": "c116", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c117": { + "Name": "text_column", + "T": { + "Name": "STRING", + "Len": 9223372036854775807, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: text_column text(65535)", + "Id": "c117", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c118": { + "Name": "date_column", + "T": { + "Name": "DATE", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: date_column date", + "Id": "c118", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c119": { + "Name": "int_column", + "T": { + "Name": "INT64", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: int_column int(10)", + "Id": "c119", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c120": { + "Name": "bigint_column", + "T": { + "Name": "INT64", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: bigint_column bigint(19)", + "Id": "c120", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c121": { + "Name": "float_column", + "T": { + "Name": "FLOAT64", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: float_column float(10,2)", + "Id": "c121", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c122": { + "Name": "double_column", + "T": { + "Name": "FLOAT64", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: double_column double(22)", + "Id": "c122", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c123": { + "Name": "decimal_column", + "T": { + "Name": "NUMERIC", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: decimal_column decimal(10,2)", + "Id": "c123", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c124": { + "Name": "datetime_column", + "T": { + "Name": "TIMESTAMP", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: datetime_column datetime", + "Id": "c124", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c125": { + "Name": "timestamp_column", + "T": { + "Name": "TIMESTAMP", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: timestamp_column timestamp", + "Id": "c125", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c126": { + "Name": "time_column", + "T": { + "Name": "STRING", + "Len": 9223372036854775807, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: time_column time", + "Id": "c126", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c127": { + "Name": "year_column", + "T": { + "Name": "STRING", + "Len": 9223372036854775807, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: year_column year", + "Id": "c127", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c128": { + "Name": "blob_column", + "T": { + "Name": "BYTES", + "Len": 9223372036854775807, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: blob_column blob(65535)", + "Id": "c128", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c129": { + "Name": "enum_column", + "T": { + "Name": "STRING", + "Len": 9223372036854775807, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: enum_column enum(1)", + "Id": "c129", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c130": { + "Name": "bool_column", + "T": { + "Name": "BOOL", + "Len": 0, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: bool_column tinyint(1)", + "Id": "c130", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c131": { + "Name": "binary_column", + "T": { + "Name": "BYTES", + "Len": 9223372036854775807, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: binary_column binary(20)", + "Id": "c131", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c132": { + "Name": "bit_column", + "T": { + "Name": "BYTES", + "Len": 9223372036854775807, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: bit_column bit(7)", + "Id": "c132", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + } + }, + "PrimaryKeys": [ + { + "ColId": "c115", + "Desc": false, + "Order": 1 + } + ], + "ForeignKeys": null, + "Indexes": null, + "ParentId": "", + "Comment": "Spanner schema for source table AllDatatypeTransformation", + "Id": "t113" + }, + "t114": { + "Name": "Users1", + "ColIds": [ + "c133", + "c134" + ], + "ShardIdColumn": "", + "ColDefs": { + "c133": { + "Name": "id", + "T": { + "Name": "INT64", + "Len": 0, + "IsArray": false + }, + "NotNull": true, + "Comment": "From: id int(10)", + "Id": "c133", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c134": { + "Name": "name", + "T": { + "Name": "STRING", + "Len": 25, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: name varchar(25)", + "Id": "c134", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + } + }, + "PrimaryKeys": [ + { + "ColId": "c133", + "Desc": false, + "Order": 1 + } + ], + "ForeignKeys": null, + "Indexes": null, + "ParentId": "", + "Comment": "Spanner schema for source table Users", + "Id": "t114" + } + }, + "SyntheticPKeys": {}, + "SrcSchema": { + "t113": { + "Name": "AllDatatypeTransformation", + "Schema": "rr_write", + "ColIds": [ + "c115", + "c116", + "c117", + "c118", + "c119", + "c120", + "c121", + "c122", + "c123", + "c124", + "c125", + "c126", + "c127", + "c128", + "c129", + "c130", + "c131", + "c132" + ], + "ColDefs": { + "c115": { + "Name": "varchar_column", + "Type": { + "Name": "varchar", + "Mods": [ + 20 + ], + "ArrayBounds": null + }, + "NotNull": true, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c115" + }, + "c116": { + "Name": "tinyint_column", + "Type": { + "Name": "tinyint", + "Mods": [ + 3 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c116" + }, + "c117": { + "Name": "text_column", + "Type": { + "Name": "text", + "Mods": [ + 65535 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c117" + }, + "c118": { + "Name": "date_column", + "Type": { + "Name": "date", + "Mods": null, + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c118" + }, + "c119": { + "Name": "int_column", + "Type": { + "Name": "int", + "Mods": [ + 10 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c119" + }, + "c120": { + "Name": "bigint_column", + "Type": { + "Name": "bigint", + "Mods": [ + 19 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c120" + }, + "c121": { + "Name": "float_column", + "Type": { + "Name": "float", + "Mods": [ + 10, + 2 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c121" + }, + "c122": { + "Name": "double_column", + "Type": { + "Name": "double", + "Mods": [ + 22 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c122" + }, + "c123": { + "Name": "decimal_column", + "Type": { + "Name": "decimal", + "Mods": [ + 10, + 2 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c123" + }, + "c124": { + "Name": "datetime_column", + "Type": { + "Name": "datetime", + "Mods": null, + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c124" + }, + "c125": { + "Name": "timestamp_column", + "Type": { + "Name": "timestamp", + "Mods": null, + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c125" + }, + "c126": { + "Name": "time_column", + "Type": { + "Name": "time", + "Mods": null, + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c126" + }, + "c127": { + "Name": "year_column", + "Type": { + "Name": "year", + "Mods": null, + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c127" + }, + "c128": { + "Name": "blob_column", + "Type": { + "Name": "blob", + "Mods": [ + 65535 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c128" + }, + "c129": { + "Name": "enum_column", + "Type": { + "Name": "enum", + "Mods": [ + 1 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c129" + }, + "c130": { + "Name": "bool_column", + "Type": { + "Name": "tinyint", + "Mods": [ + 1 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c130" + }, + "c131": { + "Name": "binary_column", + "Type": { + "Name": "binary", + "Mods": [ + 150 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c131" + }, + "c132": { + "Name": "bit_column", + "Type": { + "Name": "bit", + "Mods": [ + 20 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c132" + } + }, + "PrimaryKeys": [ + { + "ColId": "c115", + "Desc": false, + "Order": 1 + } + ], + "ForeignKeys": null, + "Indexes": null, + "Id": "t113" + }, + "t114": { + "Name": "Users1", + "Schema": "rr_write", + "ColIds": [ + "c133", + "c134", + "c135" + ], + "ColDefs": { + "c133": { + "Name": "id", + "Type": { + "Name": "int", + "Mods": [ + 10 + ], + "ArrayBounds": null + }, + "NotNull": true, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c133" + }, + "c134": { + "Name": "first_name", + "Type": { + "Name": "varchar", + "Mods": [ + 25 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c134" + }, + "c135": { + "Name": "last_name", + "Type": { + "Name": "varchar", + "Mods": [ + 25 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c135" + } + }, + "PrimaryKeys": [ + { + "ColId": "c133", + "Desc": false, + "Order": 1 + } + ], + "ForeignKeys": null, + "Indexes": null, + "Id": "t114" + } + }, + "SchemaIssues": { + "t113": { + "ColumnLevelIssues": { + "c116": [ + 14 + ], + "c119": [ + 14 + ], + "c121": [ + 14 + ], + "c124": [ + 13 + ], + "c126": [ + 15 + ], + "c127": [ + 15 + ] + }, + "TableLevelIssues": null + }, + "t114": { + "ColumnLevelIssues": { + "c133": [ + 14 + ] + }, + "TableLevelIssues": null + } + }, + "Location": {}, + "TimezoneOffset": "+00:00", + "SpDialect": "google_standard_sql", + "UniquePKey": {}, + "Rules": [], + "IsSharded": false, + "SpRegion": "", + "ResourceValidation": false, + "UI": false +} \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/spanner-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/spanner-schema.sql new file mode 100644 index 0000000000..a5a1f125ff --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/spanner-schema.sql @@ -0,0 +1,31 @@ +CREATE TABLE IF NOT EXISTS Users1 ( + id INT64 NOT NULL, + name STRING(25), +) PRIMARY KEY(id); + +CREATE TABLE AllDatatypeTransformation ( + varchar_column STRING(20) NOT NULL, + tinyint_column INT64, + text_column STRING(MAX), + date_column DATE, + int_column INT64, + bigint_column INT64, + float_column FLOAT64, + double_column FLOAT64, + decimal_column NUMERIC, + datetime_column TIMESTAMP, + timestamp_column TIMESTAMP, + time_column STRING(MAX), + year_column STRING(MAX), + blob_column BYTES(MAX), + enum_column STRING(MAX), + bool_column BOOL, + binary_column BYTES(MAX), + bit_column BYTES(MAX), +) PRIMARY KEY (varchar_column); + +CREATE CHANGE STREAM allstream + FOR ALL OPTIONS ( + value_capture_type = 'NEW_ROW', + retention_period = '7d' +); \ No newline at end of file