Skip to content

Commit

Permalink
Merge pull request #1323 from shreyakhajanchi:integration-test
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 621159743
  • Loading branch information
cloud-teleport committed Apr 2, 2024
2 parents c9f1c66 + f8653ef commit 655c523
Show file tree
Hide file tree
Showing 14 changed files with 1,233 additions and 328 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,11 @@ protected LaunchInfo launchDataflowJob(
Map<String, String> jobParameters)
throws IOException {

gcsClient.uploadArtifact(
gcsPathPrefix + "/session.json", Resources.getResource(sessionFileResourceName).getPath());
if (sessionFileResourceName != null) {
gcsClient.uploadArtifact(
gcsPathPrefix + "/session.json",
Resources.getResource(sessionFileResourceName).getPath());
}

if (transformationContextFileResourceName != null) {
gcsClient.uploadArtifact(
Expand Down Expand Up @@ -197,14 +200,17 @@ protected LaunchInfo launchDataflowJob(
put("databaseId", spannerResourceManager.getDatabaseId());
put("projectId", PROJECT);
put("deadLetterQueueDirectory", getGcsPath(gcsPathPrefix + "/dlq/"));
put("sessionFilePath", getGcsPath(gcsPathPrefix + "/session.json"));
put("gcsPubSubSubscription", subscription.toString());
put("dlqGcsPubSubSubscription", dlqSubscription.toString());
put("datastreamSourceType", "mysql");
put("inputFileFormat", "avro");
}
};

if (sessionFileResourceName != null) {
params.put("sessionFilePath", getGcsPath(gcsPathPrefix + "/session.json"));
}

if (transformationContextFileResourceName != null) {
params.put(
"transformationContextFilePath",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class DataStreamToSpannerSimpleIT extends DataStreamToSpannerITBase {

private static final String TABLE1 = "Users";
private static final String TABLE2 = "Movie";
private static final String TABLE3 = "Category";

private static final String SESSION_FILE_RESOURCE =
"DataStreamToSpannerSimpleIT/mysql-session.json";
Expand Down Expand Up @@ -237,6 +238,59 @@ public void interleavedAndFKAndIndexTest() {
assertArticlesTable();
}

@Test
public void migrationTestWithRenameAndDropColumn() {
// Construct a ChainedConditionCheck with 4 stages.
// 1. Send initial wave of events
// 2. Wait on Spanner to have events
ChainedConditionCheck conditionCheck =
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo,
TABLE3,
"backfill.jsonl",
"DataStreamToSpannerSimpleIT/mysql-backfill-Category.jsonl"),
SpannerRowsCheck.builder(spannerResourceManager, TABLE3)
.setMinRows(2)
.setMaxRows(2)
.build()))
.build();

// Wait for conditions
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

// Assert Conditions
assertThatResult(result).meetsConditions();

assertCategoryTableBackfillContents();

conditionCheck =
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo,
TABLE3,
"cdc1.jsonl",
"DataStreamToSpannerSimpleIT/mysql-cdc-Category.jsonl"),
SpannerRowsCheck.builder(spannerResourceManager, TABLE3)
.setMinRows(3)
.setMaxRows(3)
.build()))
.build();

result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

// Assert Conditions
assertThatResult(result).meetsConditions();

assertCategoryTableCdcContents();
}

private void assertUsersTableContents() {
List<Map<String, Object>> events = new ArrayList<>();

Expand Down Expand Up @@ -299,6 +353,47 @@ private void assertMovieTableContents() {
Assert.assertEquals(931.512, numericVals.get(1).getBigDecimal(0).doubleValue(), 0.001);
}

private void assertCategoryTableBackfillContents() {
List<Map<String, Object>> events = new ArrayList<>();

Map<String, Object> row1 = new HashMap<>();
row1.put("category_id", 1);
row1.put("full_name", "xyz");

Map<String, Object> row2 = new HashMap<>();
row2.put("category_id", 2);
row2.put("full_name", "abc");

events.add(row1);
events.add(row2);

SpannerAsserts.assertThatStructs(spannerResourceManager.runQuery("select * from Category"))
.hasRecordsUnorderedCaseInsensitiveColumns(events);
}

private void assertCategoryTableCdcContents() {
List<Map<String, Object>> events = new ArrayList<>();

Map<String, Object> row1 = new HashMap<>();
row1.put("category_id", 2);
row1.put("full_name", "abc1");

Map<String, Object> row2 = new HashMap<>();
row2.put("category_id", 3);
row2.put("full_name", "def");

Map<String, Object> row3 = new HashMap<>();
row3.put("category_id", 4);
row3.put("full_name", "ghi");

events.add(row1);
events.add(row2);
events.add(row3);

SpannerAsserts.assertThatStructs(spannerResourceManager.runQuery("select * from Category"))
.hasRecordsUnorderedCaseInsensitiveColumns(events);
}

private void assertAuthorsTable() {
List<Map<String, Object>> events = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"uuid":"5deaf70b-2f02-486c-831a-b92400000000","read_timestamp":"2024-02-07T07:03:46.226Z","source_timestamp":"2024-02-07T07:02:44.000Z","object":"it_AllDatatypeColumns","read_method":"mysql-cdc-binlog","stream_name":"projects/545418958905/locations/us-central1/streams/int-test-shreya","schema_key":"0df5741bfd9144e7b58ce8cd018728d345ed6acd","sort_keys":[1707289364000,"mysql-bin.000029",9610730],"source_metadata":{"table":"AllDatatypeColumns","database":"it","primary_keys":["varchar_column"],"log_file":"mysql-bin.000029","log_position":9610730,"change_type":"INSERT","is_deleted":false},"payload":{"varchar_column":"value1","tinyint_column":10,"text_column":"746578745f646174615f310a","date_column":"2024-02-08T00:00:00.000Z","smallint_column":50,"mediumint_column":1000,"int_column":50000,"bigint_column":987654321,"float_column":45.67,"double_column":123.789,"decimal_column":456.12,"datetime_column":"2024-02-08T08:15:30.000Z","timestamp_column":"2024-02-08T08:15:30.000Z","time_column":29730000000,"year_column":2022,"char_column":"63686172310a","tinyblob_column":"74696e79626c6f625f646174615f31","tinytext_column":"74696e79746578745f646174615f310a","blob_column":"626c6f625f646174615f31","mediumblob_column":"6d656469756d626c6f625f646174615f31","mediumtext_column":"6d656469756d746578745f646174615f31","longblob_column":"6c6f6e67626c6f625f646174615f31","longtext_column":"6c6f6e67746578745f646174615f31","enum_column":"2","bool_column":0,"other_bool_column":1,"binary_column":"62696e6172795f31","varbinary_column":"76617262696e6172795f646174615f31","bit_column":102}}
{"uuid":"5deaf70b-2f02-486c-831a-b92400000001","read_timestamp":"2024-02-07T07:03:46.226Z","source_timestamp":"2024-02-07T07:02:55.000Z","object":"it_AllDatatypeColumns","read_method":"mysql-cdc-binlog","stream_name":"projects/545418958905/locations/us-central1/streams/int-test-shreya","schema_key":"0df5741bfd9144e7b58ce8cd018728d345ed6acd","sort_keys":[1707289375000,"mysql-bin.000029",9611373],"source_metadata":{"table":"AllDatatypeColumns","database":"it","primary_keys":["varchar_column"],"log_file":"mysql-bin.000029","log_position":9611373,"change_type":"INSERT","is_deleted":false},"payload":{"varchar_column":"value2","tinyint_column":5,"text_column":"746578745f646174615f320a","date_column":"2024-02-09T00:00:00.000Z","smallint_column":25,"mediumint_column":500,"int_column":25000,"bigint_column":987654,"float_column":12.34,"double_column":56.789,"decimal_column":123.45,"datetime_column":"2024-02-09T15:30:45.000Z","timestamp_column":"2024-02-09T15:30:45.000Z","time_column":55845000000,"year_column":2023,"char_column":"63686172320a","tinyblob_column":"74696e79626c6f625f646174615f32","tinytext_column":"74696e79746578745f646174615f320a","blob_column":"626c6f625f646174615f32","mediumblob_column":"6d656469756d626c6f625f646174615f32","mediumtext_column":"6d656469756d746578745f646174615f32","longblob_column":"6c6f6e67626c6f625f646174615f32","longtext_column":"6c6f6e67746578745f646174615f32","enum_column":"3","bool_column":1,"other_bool_column":0,"binary_column":"62696e6172795f32","varbinary_column":"76617262696e6172795f646174615f32","bit_column":25}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"uuid":"2a646066-40f2-44b7-bec0-61d100000000","read_timestamp":"2024-02-08T09:32:05.695Z","source_timestamp":"2024-02-08T09:30:59.000Z","object":"it_AllDatatypeColumns2","read_method":"mysql-cdc-binlog","stream_name":"projects/545418958905/locations/us-central1/streams/int-test-shreya","schema_key":"20f68e0ee69f7b864b730a1273b6f1b698ab4c28","sort_keys":[1707384659000,"mysql-bin.000029",9615213],"source_metadata":{"table":"AllDatatypeColumns2","database":"it","primary_keys":["varchar_column"],"log_file":"mysql-bin.000029","log_position":9615213,"change_type":"INSERT","is_deleted":false},"payload":{"varchar_column":"value1","tinyint_column":10,"text_column":"text1","date_column":"2024-02-08T00:00:00.000Z","smallint_column":50,"mediumint_column":1000,"int_column":50000,"bigint_column":987654321,"float_column":45.67,"double_column":123.789,"decimal_column":456.12,"datetime_column":"2024-02-08T08:15:30.000Z","timestamp_column":"2024-02-08T08:15:30.000Z","time_column":29730000000,"year_column":2022,"char_column":"char_1","tinyblob_column":"74696e79626c6f625f646174615f31","tinytext_column":"tinytext_data_1","blob_column":"626c6f625f646174615f31","mediumblob_column":"6d656469756d626c6f625f646174615f31","mediumtext_column":"mediumtext_data_1","longblob_column":"6c6f6e67626c6f625f646174615f31","longtext_column":"longtext_data_1","enum_column":"2","bool_column":0,"binary_column":"62696e6172795f646174615f3100000000000000","varbinary_column":"76617262696e6172795f646174615f31","bit_column":102}}
{"uuid":"2a646066-40f2-44b7-bec0-61d100000001","read_timestamp":"2024-02-08T09:32:05.695Z","source_timestamp":"2024-02-08T09:30:59.000Z","object":"it_AllDatatypeColumns2","read_method":"mysql-cdc-binlog","stream_name":"projects/545418958905/locations/us-central1/streams/int-test-shreya","schema_key":"20f68e0ee69f7b864b730a1273b6f1b698ab4c28","sort_keys":[1707384659000,"mysql-bin.000029",9615856],"source_metadata":{"table":"AllDatatypeColumns2","database":"it","primary_keys":["varchar_column"],"log_file":"mysql-bin.000029","log_position":9615856,"change_type":"INSERT","is_deleted":false},"payload":{"varchar_column":"value2","tinyint_column":5,"text_column":"text2","date_column":"2024-02-09T00:00:00.000Z","smallint_column":25,"mediumint_column":500,"int_column":25000,"bigint_column":987654,"float_column":12.34,"double_column":56.789,"decimal_column":123.45,"datetime_column":"2024-02-09T15:30:45.000Z","timestamp_column":"2024-02-09T15:30:45.000Z","time_column":55845000000,"year_column":2023,"char_column":"char_2","tinyblob_column":"74696e79626c6f625f646174615f32","tinytext_column":"tinytext_data_2","blob_column":"626c6f625f646174615f32","mediumblob_column":"6d656469756d626c6f625f646174615f32","mediumtext_column":"mediumtext_data_2","longblob_column":"6c6f6e67626c6f625f646174615f32","longtext_column":"longtext_data_2","enum_column":"3","bool_column":1,"binary_column":"62696e6172795f646174615f3200000000000000","varbinary_column":"76617262696e6172795f646174615f32","bit_column":25}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"uuid":"45c148ba-a57f-4882-879e-831700000001","read_timestamp":"2024-02-09T09:59:32.777Z","source_timestamp":"2024-02-09T09:59:19.000Z","object":"it_AllDatatypeColumns","read_method":"mysql-cdc-binlog","stream_name":"projects/545418958905/locations/us-central1/streams/int-test-shreya","schema_key":"0df5741bfd9144e7b58ce8cd018728d345ed6acd","sort_keys":[1707472759000,"mysql-bin.000029",9618920],"source_metadata":{"table":"AllDatatypeColumns","database":"it","primary_keys":["varchar_column"],"log_file":"mysql-bin.000029","log_position":9618920,"change_type":"DELETE","is_deleted":true},"payload":{"varchar_column":"value2","tinyint_column":5,"text_column":"746578745f646174615f320a","date_column":"2024-02-09T00:00:00.000Z","smallint_column":25,"mediumint_column":500,"int_column":25000,"bigint_column":987654,"float_column":12.34,"double_column":56.789,"decimal_column":123.45,"datetime_column":"2024-02-09T15:30:45.000Z","timestamp_column":"2024-02-09T15:30:45.000Z","time_column":55845000000,"year_column":2023,"char_column":"63686172320a","tinyblob_column":"74696e79626c6f625f646174615f32","tinytext_column":"74696e79746578745f646174615f320a","blob_column":"626c6f625f646174615f32","mediumblob_column":"6d656469756d626c6f625f646174615f32","mediumtext_column":"6d656469756d746578745f646174615f32","longblob_column":"6c6f6e67626c6f625f646174615f32","longtext_column":"6c6f6e67746578745f646174615f32","enum_column":"3","bool_column":1,"other_bool_column":0,"binary_column":"62696e6172795f32","varbinary_column":"76617262696e6172795f646174615f32","bit_column":25}}
{"uuid":"45c148ba-a57f-4882-879e-831700000000","read_timestamp":"2024-02-09T09:59:32.777Z","source_timestamp":"2024-02-09T09:58:59.000Z","object":"it_AllDatatypeColumns","read_method":"mysql-cdc-binlog","stream_name":"projects/545418958905/locations/us-central1/streams/int-test-shreya","schema_key":"0df5741bfd9144e7b58ce8cd018728d345ed6acd","sort_keys":[1707472739000,"mysql-bin.000029",9618285],"source_metadata":{"table":"AllDatatypeColumns","database":"it","primary_keys":["varchar_column"],"log_file":"mysql-bin.000029","log_position":9618285,"change_type":"UPDATE-INSERT","is_deleted":false},"payload":{"varchar_column":"value1","tinyint_column":15,"text_column":"746578745f646174615f310a","date_column":"2024-02-08T00:00:00.000Z","smallint_column":50,"mediumint_column":1000,"int_column":50000,"bigint_column":987654321,"float_column":45.67,"double_column":123.789,"decimal_column":456.12,"datetime_column":"2024-02-08T08:15:30.000Z","timestamp_column":"2024-02-08T08:15:30.000Z","time_column":29730000000,"year_column":2022,"char_column":"63686172310a","tinyblob_column":"74696e79626c6f625f646174615f31","tinytext_column":"74696e79746578745f646174615f310a","blob_column":"626c6f625f646174615f31","mediumblob_column":"6d656469756d626c6f625f646174615f31","mediumtext_column":"6d656469756d746578745f646174615f31","longblob_column":"6c6f6e67626c6f625f646174615f31","longtext_column":"6c6f6e67746578745f646174615f31","enum_column":"2","bool_column":0,"other_bool_column":1,"binary_column":"62696e6172795f31","varbinary_column":"76617262696e6172795f646174615f31","bit_column":102}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"uuid":"631f1c50-33fc-46ea-aa50-61a900000000","read_timestamp":"2024-02-09T05:09:12.430Z","source_timestamp":"2024-02-09T05:08:13.000Z","object":"it_AllDatatypeColumns2","read_method":"mysql-cdc-binlog","stream_name":"projects/545418958905/locations/us-central1/streams/int-test-shreya","schema_key":"20f68e0ee69f7b864b730a1273b6f1b698ab4c28","sort_keys":[1707455293000,"mysql-bin.000029",9616753],"source_metadata":{"table":"AllDatatypeColumns2","database":"it","primary_keys":["varchar_column"],"log_file":"mysql-bin.000029","log_position":9616753,"change_type":"UPDATE-INSERT","is_deleted":false},"payload":{"varchar_column":"value1","tinyint_column":15,"text_column":"text1","date_column":"2024-02-08T00:00:00.000Z","smallint_column":50,"mediumint_column":1000,"int_column":50000,"bigint_column":987654321,"float_column":45.67,"double_column":123.789,"decimal_column":456.12,"datetime_column":"2024-02-08T08:15:30.000Z","timestamp_column":"2024-02-08T08:15:30.000Z","time_column":29730000000,"year_column":2022,"char_column":"char_1","tinyblob_column":"74696e79626c6f625f646174615f31","tinytext_column":"tinytext_data_1","blob_column":"626c6f625f646174615f31","mediumblob_column":"6d656469756d626c6f625f646174615f31","mediumtext_column":"mediumtext_data_1","longblob_column":"6c6f6e67626c6f625f646174615f31","longtext_column":"longtext_data_1","enum_column":"2","bool_column":0,"binary_column":"62696e6172795f646174615f3100000000000000","varbinary_column":"76617262696e6172795f646174615f31","bit_column":102}}
{"uuid":"631f1c50-33fc-46ea-aa50-61a900000001","read_timestamp":"2024-02-09T05:09:12.430Z","source_timestamp":"2024-02-09T05:08:42.000Z","object":"it_AllDatatypeColumns2","read_method":"mysql-cdc-binlog","stream_name":"projects/545418958905/locations/us-central1/streams/int-test-shreya","schema_key":"20f68e0ee69f7b864b730a1273b6f1b698ab4c28","sort_keys":[1707455322000,"mysql-bin.000029",9617388],"source_metadata":{"table":"AllDatatypeColumns2","database":"it","primary_keys":["varchar_column"],"log_file":"mysql-bin.000029","log_position":9617388,"change_type":"DELETE","is_deleted":true},"payload":{"varchar_column":"value2","tinyint_column":5,"text_column":"text2","date_column":"2024-02-09T00:00:00.000Z","smallint_column":25,"mediumint_column":500,"int_column":25000,"bigint_column":987654,"float_column":12.34,"double_column":56.789,"decimal_column":123.45,"datetime_column":"2024-02-09T15:30:45.000Z","timestamp_column":"2024-02-09T15:30:45.000Z","time_column":55845000000,"year_column":2023,"char_column":"char_2","tinyblob_column":"74696e79626c6f625f646174615f32","tinytext_column":"tinytext_data_2","blob_column":"626c6f625f646174615f32","mediumblob_column":"6d656469756d626c6f625f646174615f32","mediumtext_column":"mediumtext_data_2","longblob_column":"6c6f6e67626c6f625f646174615f32","longtext_column":"longtext_data_2","enum_column":"3","bool_column":1,"binary_column":"62696e6172795f646174615f3200000000000000","varbinary_column":"76617262696e6172795f646174615f32","bit_column":25}}
Loading

0 comments on commit 655c523

Please sign in to comment.