Skip to content

Commit

Permalink
fixed e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 20, 2024
1 parent 0ecab68 commit 97f1ae6
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static String combinedModeTableList(
}

public static SqlServerSourceBuilder.SqlServerIncrementalSource<String> buildSqlServerSource(
Configuration sqlServerSourceConfig, List<String> databaseList, String tableList) {
Configuration sqlServerSourceConfig, String tableList) {
validateSqlServerConfig(sqlServerSourceConfig);

Map<String, Object> converterConfigs = new HashMap<>();
Expand All @@ -196,7 +196,9 @@ public static SqlServerSourceBuilder.SqlServerIncrementalSource<String> buildSql
.port(sqlServerSourceConfig.get(SqlServerSourceOptions.PORT))
.username(sqlServerSourceConfig.get(SqlServerSourceOptions.USERNAME))
.password(sqlServerSourceConfig.get(SqlServerSourceOptions.PASSWORD))
.databaseList(databaseList.toArray(new String[0]))
.databaseList(
sqlServerSourceConfig.getString(
SqlServerSourceOptions.DATABASE_NAME))
.tableList(tableList)
.debeziumProperties(getDebeziumProperties(sqlServerSourceConfig.toMap()))
.startupOptions(getStartupOptions(sqlServerSourceConfig))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
import org.apache.paimon.schema.Schema;
Expand All @@ -49,16 +48,13 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.schemaCompatible;

/** An {@link Action} which synchronize the whole SqlServer database into one Paimon database. */
public class SqlServerSyncDatabaseAction extends SyncDatabaseActionBase {
private static final Logger LOG = LoggerFactory.getLogger(SqlServerSyncDatabaseAction.class);

private Map<String, String> tableConfig = new HashMap<>();
private boolean ignoreIncompatible = false;
private TypeMapping typeMapping = TypeMapping.defaultMapping();

private final List<Pair<Identifier, String>> monitoredTables = new ArrayList<>();
private final List<Pair<Identifier, String>> excludedTables = new ArrayList<>();

Expand All @@ -73,6 +69,7 @@ public SqlServerSyncDatabaseAction(
catalogConfig,
sqlServerConfig,
SyncJobHandler.SourceType.SQLSERVER);
this.mode = DIVIDED;
}

public SqlServerSyncDatabaseAction ignoreIncompatible(boolean ignoreIncompatible) {
Expand Down Expand Up @@ -176,21 +173,9 @@ private static Map<String, Set<String>> getSchemaMapping(JdbcSchemasInfo jdbcSch

@Override
protected Object buildSource() {
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
try {
JdbcSchemasInfo sqlServerSchemasInfo =
SqlServerActionUtils.getSqlServerTableInfos(
cdcSourceConfig,
tableName ->
shouldMonitorTable(
tableName, includingPattern, excludingPattern),
excludedTables,
typeMapping);
return SqlServerActionUtils.buildSqlServerSource(
cdcSourceConfig,
SqlServerActionUtils.databaseList(sqlServerSchemasInfo.pkTables()),
SqlServerActionUtils.tableList(
mode,
cdcSourceConfig.get(SqlServerSourceOptions.SCHEMA_NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {

action.withTableConfig(optionalConfigMap(params, TABLE_CONF));

action.ignoreIncompatible(Boolean.parseBoolean(params.get("ignore-incompatible")))
action.ignoreIncompatible(Boolean.parseBoolean(params.get("ignore_incompatible")))
.mergeShards(
!params.has("merge-shards")
|| Boolean.parseBoolean(params.get("merge-shards")))
!params.has("merge_shards")
|| Boolean.parseBoolean(params.get("merge_shards")))
.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.includingTables(params.get(INCLUDING_TABLES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ protected SqlServerSourceBuilder.SqlServerIncrementalSource<String> buildSource(
sqlServerSchemasInfo.pkTables().stream()
.map(i -> i.schemaName() + "." + i.identifier().getObjectName())
.collect(Collectors.joining("|"));
return buildSqlServerSource(
cdcSourceConfig,
SqlServerActionUtils.databaseList(sqlServerSchemasInfo.pkTables()),
tableList);
return buildSqlServerSource(cdcSourceConfig, tableList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ protected static void start() {
}

protected static Statement getStatement() throws SQLException {
System.out.println("jdbc url = " + MSSQL_SERVER_CONTAINER.getJdbcUrl());
System.out.println("jdbc username = " + MSSQL_SERVER_CONTAINER.getUsername());
System.out.println("jdbc password = " + MSSQL_SERVER_CONTAINER.getPassword());
System.out.println("SQLServer jdbc url: " + MSSQL_SERVER_CONTAINER.getJdbcUrl());
Connection conn =
DriverManager.getConnection(
MSSQL_SERVER_CONTAINER.getJdbcUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ public void testSyncMultipleShards() throws Exception {
sqlServerConfig.put(
"schema-name",
ThreadLocalRandom.current().nextBoolean() ? "schema_.*" : "schema_1|schema_2");

MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
SqlServerSyncDatabaseAction action =
syncDatabaseActionBuilder(sqlServerConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -729,12 +728,4 @@ public void testCatalogAndTableConfig() {
private FileStoreTable getFileStoreTable() throws Exception {
return getFileStoreTable(tableName);
}

private void resetEnableCdcTable(Statement statement, String schema, String tableName)
throws SQLException {
statement.executeUpdate(
String.format(
"EXEC sys.sp_cdc_enable_table @source_schema = '%s', @source_name = '%s', @role_name = NULL, @supports_net_changes = 0, @capture_instance = '%s'",
schema, tableName, String.format("%s_%s", schema, tableName)));
}
}

0 comments on commit 97f1ae6

Please sign in to comment.