Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 20, 2024
1 parent 6c237bf commit 0ecab68
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ public String toPaimonTableName() {
// the Paimon table name should be compound of origin database name and table name
// together to avoid name conflict
if (StringUtils.isBlank(schemaName)) {
return identifier.getDatabaseName() + "_" + identifier.getObjectName();
return String.format("%s_%s", identifier.getDatabaseName(), identifier.getObjectName());
} else {
return identifier.getDatabaseName()
+ "_"
+ schemaName
+ "_"
+ identifier.getObjectName();
return String.format(
"%s_%s_%s",
identifier.getDatabaseName(), schemaName, identifier.getObjectName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -133,10 +132,10 @@ public static JdbcSchemasInfo getSqlServerTableInfos(
}

public static List<String> databaseList(List<JdbcSchemasInfo.JdbcSchemaInfo> identifiers) {
Set<JdbcSchemasInfo.JdbcSchemaInfo> identifiersSet = new HashSet<>(identifiers);
return identifiersSet.stream()
.map(jdbcSchemaInfo -> jdbcSchemaInfo.identifier().getDatabaseName())
.collect(Collectors.toList());
return new ArrayList<>(
identifiers.stream()
.map(jdbcSchemaInfo -> jdbcSchemaInfo.identifier().getDatabaseName())
.collect(Collectors.toSet()));
}

public static String tableList(
Expand All @@ -156,7 +155,7 @@ public static String tableList(
private static String dividedModeTableList(List<Pair<Identifier, String>> monitoredTables) {
// In DIVIDED mode, we only concern about existed tables
return monitoredTables.stream()
.map(t -> t.getValue() + "\\." + t.getKey().getObjectName())
.map(t -> t.getRight() + "\\." + t.getLeft().getObjectName())
.collect(Collectors.joining("|"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private boolean isSchemaBytes(String schemaType) {

protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> data) {
return new RichCdcMultiplexRecord(
databaseName + "_" + schemaName,
String.format("%s_%s", databaseName, schemaName),
currentTable,
new LinkedHashMap<>(0),
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class SqlServerSyncDatabaseAction extends SyncDatabaseActionBase {
private boolean ignoreIncompatible = false;
private TypeMapping typeMapping = TypeMapping.defaultMapping();

private final List<Pair<Identifier, String>> includedTables = new ArrayList<>();
private final List<Pair<Identifier, String>> monitoredTables = new ArrayList<>();
private final List<Pair<Identifier, String>> excludedTables = new ArrayList<>();

public SqlServerSyncDatabaseAction(
Expand Down Expand Up @@ -125,15 +125,15 @@ protected void beforeBuildingSourceSink() throws Exception {

if (shouldMonitorTable(table.schema(), fromSqlServer, errMsg)) {
tables.add(table);
setTables(schemaMappings, tableInfo.identifiers(), includedTables);
setTables(schemaMappings, tableInfo.identifiers(), monitoredTables);
} else {
setTables(schemaMappings, tableInfo.identifiers(), excludedTables);
}
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromSqlServer, false);
table = (FileStoreTable) catalog.getTable(identifier);
tables.add(table);
setTables(schemaMappings, tableInfo.identifiers(), includedTables);
setTables(schemaMappings, tableInfo.identifiers(), monitoredTables);
}
}
}
Expand Down Expand Up @@ -162,13 +162,13 @@ private static Map<String, Set<String>> getSchemaMapping(JdbcSchemasInfo jdbcSch
continue;
}
String fullName = jdbcSchemaInfo.identifier().getFullName();
Set<String> existsSchemas = schemaMapping.get(fullName);
if (existsSchemas == null) {
if (!schemaMapping.containsKey(fullName)) {
Set<String> schemaNames = new HashSet<>();
schemaNames.add(jdbcSchemaInfo.schemaName());
schemaMapping.put(fullName, schemaNames);
} else {
existsSchemas.add(fullName);
Set<String> existsSchemas = schemaMapping.get(fullName);
existsSchemas.add(jdbcSchemaInfo.schemaName());
}
}
return schemaMapping;
Expand All @@ -195,7 +195,7 @@ protected Object buildSource() {
mode,
cdcSourceConfig.get(SqlServerSourceOptions.SCHEMA_NAME),
includingTables,
includedTables,
monitoredTables,
excludedTables));
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit 0ecab68

Please sign in to comment.