Skip to content

Commit

Permalink
fixed e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 20, 2024
1 parent 97f1ae6 commit 97cdaf2
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,42 @@ public static void startContainers() {
public void testActionRunResult() throws Exception {
Map<String, String> sqlserverConfig = getBasicSqlServerConfig();
sqlserverConfig.put("database-name", "shard_database");
sqlserverConfig.put(
"schema-name",
ThreadLocalRandom.current().nextBoolean()
? ".*shard_.*"
: "shard_1|shard_2|shard_3|x_shard_1");
MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
SqlServerSyncDatabaseAction action =
syncDatabaseActionBuilder(sqlserverConfig)
.withTableConfig(getBasicTableConfig())
.mergeShards(false)
.withMode(mode.configString())
.includingTables("t.+|s.+")
.excludingTables("ta|sa|ta2")
.excludingTables("ta|sa")
.build();
runActionWithDefaultEnv(action);

assertExactlyExistTables(
"shard_database_dbo_t11",
"shard_database_dbo_t2",
"shard_database_dbo_t3",
"shard_database_dbo_taa",
"shard_database_dbo_s2");
"shard_database_shard_1_t11",
"shard_database_shard_1_t2",
"shard_database_shard_1_t3",
"shard_database_shard_1_taa",
"shard_database_shard_1_s2",
"shard_database_shard_2_t1",
"shard_database_shard_2_t22",
"shard_database_shard_2_t3",
"shard_database_shard_2_tb",
"shard_database_x_shard_1_t1");

try (Statement statement = getStatement()) {
// ensure the job steps into incremental phase
statement.executeUpdate("USE shard_database");
statement.executeUpdate("INSERT INTO t2 VALUES (1, 'A')");
statement.executeUpdate("INSERT INTO shard_1.t2 VALUES (1, 'A')");
}
waitForResult(
Collections.singletonList("+I[1, A]"),
getFileStoreTable("shard_database_dbo_t2"),
getFileStoreTable("shard_database_shard_1_t2"),
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(100)},
new String[] {"k", "name"}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,50 @@ CREATE DATABASE shard_database;
USE shard_database;
EXEC sys.sp_cdc_enable_db;

CREATE TABLE t1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
CREATE TABLE t11 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE t2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
CREATE TABLE taa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE s1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
CREATE TABLE s2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE sa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
CREATE TABLE m (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't11', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't2', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't3', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'ta', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'taa', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 's1', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 's2', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'sa', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'm', @role_name = NULL, @supports_net_changes = 0;
CREATE SCHEMA shard_1;

CREATE TABLE shard_1.t1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
CREATE TABLE shard_1.t11 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE shard_1.t2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE shard_1.t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE shard_1.ta (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
CREATE TABLE shard_1.taa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE shard_1.s1 (k INT, name VARCHAR(100)); -- ignored because of pk absence
CREATE TABLE shard_1.s2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE shard_1.sa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
CREATE TABLE shard_1.m (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 't11', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 't2', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 't3', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 'ta', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 'taa', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 's1', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 's2', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 'sa', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 'm', @role_name = NULL, @supports_net_changes = 0;


CREATE SCHEMA shard_2;
CREATE TABLE shard_2.t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE shard_2.t2 (k INT, name VARCHAR(100)); -- ignored because of pk absence
CREATE TABLE shard_2.t22 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE shard_2.t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
CREATE TABLE shard_2.tb (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured

EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 't2', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 't22', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 't3', @role_name = NULL, @supports_net_changes = 0;
EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 'tb', @role_name = NULL, @supports_net_changes = 0;

CREATE SCHEMA x_shard_1;

CREATE TABLE x_shard_1.t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured
EXEC sys.sp_cdc_enable_table @source_schema = 'x_shard_1', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0;


CREATE SCHEMA ignored;

CREATE TABLE ignored.t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored
EXEC sys.sp_cdc_enable_table @source_schema = 'ignored', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0;

0 comments on commit 97cdaf2

Please sign in to comment.