From 97cdaf2756a8b1950e251356cfb393d1fd487458 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Tue, 20 Feb 2024 17:24:52 +0800 Subject: [PATCH] fixed e2e --- .../SqlServerSyncDatabaseTableListITCase.java | 26 ++++--- .../sqlserver/tablelist_test_setup.sql | 67 +++++++++++++------ 2 files changed, 65 insertions(+), 28 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/sqlserver/SqlServerSyncDatabaseTableListITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/sqlserver/SqlServerSyncDatabaseTableListITCase.java index fd8dcb237ac8f..d332af64ded42 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/sqlserver/SqlServerSyncDatabaseTableListITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/sqlserver/SqlServerSyncDatabaseTableListITCase.java @@ -49,6 +49,11 @@ public static void startContainers() { public void testActionRunResult() throws Exception { Map sqlserverConfig = getBasicSqlServerConfig(); sqlserverConfig.put("database-name", "shard_database"); + sqlserverConfig.put( + "schema-name", + ThreadLocalRandom.current().nextBoolean() + ? ".*shard_.*" + : "shard_1|shard_2|shard_3|x_shard_1"); MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED; SqlServerSyncDatabaseAction action = syncDatabaseActionBuilder(sqlserverConfig) @@ -56,25 +61,30 @@ public void testActionRunResult() throws Exception { .mergeShards(false) .withMode(mode.configString()) .includingTables("t.+|s.+") - .excludingTables("ta|sa|ta2") + .excludingTables("ta|sa") .build(); runActionWithDefaultEnv(action); assertExactlyExistTables( - "shard_database_dbo_t11", - "shard_database_dbo_t2", - "shard_database_dbo_t3", - "shard_database_dbo_taa", - "shard_database_dbo_s2"); + "shard_database_shard_1_t11", + "shard_database_shard_1_t2", + "shard_database_shard_1_t3", + "shard_database_shard_1_taa", + "shard_database_shard_1_s2", + "shard_database_shard_2_t1", + "shard_database_shard_2_t22", + "shard_database_shard_2_t3", + "shard_database_shard_2_tb", + "shard_database_x_shard_1_t1"); try (Statement statement = getStatement()) { // ensure the job steps into incremental phase statement.executeUpdate("USE shard_database"); - statement.executeUpdate("INSERT INTO t2 VALUES (1, 'A')"); + statement.executeUpdate("INSERT INTO shard_1.t2 VALUES (1, 'A')"); } waitForResult( Collections.singletonList("+I[1, A]"), - getFileStoreTable("shard_database_dbo_t2"), + getFileStoreTable("shard_database_shard_1_t2"), RowType.of( new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(100)}, new String[] {"k", "name"}), diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/sqlserver/tablelist_test_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/sqlserver/tablelist_test_setup.sql index bdca67b13fd15..dca76b392c3aa 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/sqlserver/tablelist_test_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/sqlserver/tablelist_test_setup.sql @@ -26,23 +26,50 @@ CREATE DATABASE shard_database; USE shard_database; EXEC sys.sp_cdc_enable_db; -CREATE TABLE t1 (k INT, name VARCHAR(100)); -- ignored because of pk absence -CREATE TABLE t11 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured -CREATE TABLE t2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured -CREATE TABLE t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured -CREATE TABLE ta (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored -CREATE TABLE taa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured -CREATE TABLE s1 (k INT, name VARCHAR(100)); -- ignored because of pk absence -CREATE TABLE s2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured -CREATE TABLE sa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored -CREATE TABLE m (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't11', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't2', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't3', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'ta', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'taa', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 's1', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 's2', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'sa', @role_name = NULL, @supports_net_changes = 0; -EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'm', @role_name = NULL, @supports_net_changes = 0; +CREATE SCHEMA shard_1; + +CREATE TABLE shard_1.t1 (k INT, name VARCHAR(100)); -- ignored because of pk absence +CREATE TABLE shard_1.t11 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +CREATE TABLE shard_1.t2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +CREATE TABLE shard_1.t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +CREATE TABLE shard_1.ta (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored +CREATE TABLE shard_1.taa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +CREATE TABLE shard_1.s1 (k INT, name VARCHAR(100)); -- ignored because of pk absence +CREATE TABLE shard_1.s2 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +CREATE TABLE shard_1.sa (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored +CREATE TABLE shard_1.m (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 't11', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 't2', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 't3', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 'ta', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 'taa', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 's1', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 's2', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 'sa', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_1', @source_name = 'm', @role_name = NULL, @supports_net_changes = 0; + + +CREATE SCHEMA shard_2; +CREATE TABLE shard_2.t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +CREATE TABLE shard_2.t2 (k INT, name VARCHAR(100)); -- ignored because of pk absence +CREATE TABLE shard_2.t22 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +CREATE TABLE shard_2.t3 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +CREATE TABLE shard_2.tb (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured + +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 't2', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 't22', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 't3', @role_name = NULL, @supports_net_changes = 0; +EXEC sys.sp_cdc_enable_table @source_schema = 'shard_2', @source_name = 'tb', @role_name = NULL, @supports_net_changes = 0; + +CREATE SCHEMA x_shard_1; + +CREATE TABLE x_shard_1.t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- captured +EXEC sys.sp_cdc_enable_table @source_schema = 'x_shard_1', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0; + + +CREATE SCHEMA ignored; + +CREATE TABLE ignored.t1 (k INT, name VARCHAR(100), PRIMARY KEY (k)); -- ignored +EXEC sys.sp_cdc_enable_table @source_schema = 'ignored', @source_name = 't1', @role_name = NULL, @supports_net_changes = 0;