Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 18, 2024
1 parent 9a62197 commit 62538dd
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public enum CdcMetadataProcessor {
SyncJobHandler.SourceType.SQLSERVER,
new CdcMetadataConverter.DatabaseNameConverter(),
new CdcMetadataConverter.TableNameConverter(),
new CdcMetadataConverter.SchemaNameConverter());
new CdcMetadataConverter.SchemaNameConverter(),
new CdcMetadataConverter.OpTsConverter());

private final SyncJobHandler.SourceType sourceType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.flink.action.cdc.postgres.PostgresRecordParser;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.action.cdc.sqlserver.SqlServerRecordParser;
import org.apache.paimon.flink.action.cdc.sqlserver.SqlServerSourceOptions;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;

import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
Expand All @@ -47,6 +48,7 @@
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.POSTGRES_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SQLSERVER_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkOneRequiredOption;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkRequiredOptions;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -139,6 +141,26 @@ public void checkRequiredOption() {
+ "use postgres_sync_table instead.");
}
break;
case SQLSERVER:
checkRequiredOptions(
cdcSourceConfig,
SQLSERVER_CONF,
SqlServerSourceOptions.HOSTNAME,
SqlServerSourceOptions.USERNAME,
SqlServerSourceOptions.PASSWORD,
SqlServerSourceOptions.DATABASE_NAME);
if (isTableSync) {
checkRequiredOptions(
cdcSourceConfig, SQLSERVER_CONF, SqlServerSourceOptions.TABLE_NAME);
} else {
checkArgument(
!cdcSourceConfig.contains(SqlServerSourceOptions.TABLE_NAME),
SqlServerSourceOptions.TABLE_NAME.key()
+ " cannot be set for sqlserver_sync_database. "
+ "If you want to sync several SQLServer tables into one Paimon table, "
+ "use sqlserver_sync_table instead.");
}
break;
case KAFKA:
checkRequiredOptions(
cdcSourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
import static org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils.toPaimonTypeVisitor;
import static org.apache.paimon.flink.action.cdc.sqlserver.SqlServerSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.paimon.flink.action.cdc.sqlserver.SqlServerTypeUtils.toPaimonTypeVisitor;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Utility class for SqlServer action. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/** Factory to create {@link SqlServerSyncDatabaseAction}. */
public class SqlServerSyncDatabaseActionFactory implements ActionFactory {

public static final String IDENTIFIER = "sqlserver-sync-database";
public static final String IDENTIFIER = "sqlserver_sync_database";

@Override
public String identifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/** Factory to create {@link SqlServerSyncTableAction}. */
public class SqlServerSyncTableActionFactory extends SyncTableActionFactoryBase {

public static final String IDENTIFIER = "sqlserver-sync-table";
public static final String IDENTIFIER = "sqlserver_sync_table";

@Override
public String identifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.sqlserver;

import org.apache.paimon.flink.action.cdc.JdbcToPaimonTypeVisitor;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -151,4 +152,23 @@ public static DataType toPaimonDataType(
String.format("Don't support SQLServer type '%s' yet.", type));
}
}

public static JdbcToPaimonTypeVisitor toPaimonTypeVisitor() {
return SqlServerTypeUtils.SqlServerToPaimonTypeVisitor.INSTANCE;
}

private static class SqlServerToPaimonTypeVisitor implements JdbcToPaimonTypeVisitor {

private static final SqlServerTypeUtils.SqlServerToPaimonTypeVisitor INSTANCE =
new SqlServerTypeUtils.SqlServerToPaimonTypeVisitor();

@Override
public DataType visit(
String type,
@Nullable Integer length,
@Nullable Integer scale,
TypeMapping typeMapping) {
return toPaimonDataType(type, length, scale, typeMapping);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncDatabaseActionFactory
org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncTableActionFactory
org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncDatabaseActionFactory
org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableActionFactory
org.apache.paimon.flink.action.cdc.sqlserver.SqlServerSyncDatabaseActionFactory
org.apache.paimon.flink.action.cdc.sqlserver.SqlServerSyncTableActionFactory

Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ private <T> String getConfKey(Class<T> clazz) {
return "--" + CdcActionCommonUtils.PULSAR_CONF;
case "PostgresSyncTableAction":
return "--" + CdcActionCommonUtils.POSTGRES_CONF;
case "SqlServerSyncTableAction":
case "SqlServerSyncDatabaseAction":
return "--" + CdcActionCommonUtils.SQLSERVER_CONF;
default:
throw new UnsupportedOperationException(
"Unknown sync action: " + clazz.getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public void testSpecifiedSqlServerTable() {
assertThatThrownBy(action::run)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"table-name cannot be set for sqlserver-sync-database. "
"table-name cannot be set for sqlserver_sync_database. "
+ "If you want to sync several SqlServer tables into one Paimon table, "
+ "use sqlserver-sync-table instead.");
+ "use sqlserver_sync_table instead.");
}

@Test
Expand Down Expand Up @@ -297,8 +297,8 @@ public void testIgnoreCase() throws Exception {
FileStoreTable table = getFileStoreTable("t");
assertThat(JsonSerdeUtil.toFlatJson(table.schema().fields()))
.isEqualTo(
"[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT NULL\",\"description\":\"\"},"
+ "{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
"[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\"}]");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -72,9 +71,9 @@ public void testSyncSqlServerTable() throws Exception {
runActionWithDefaultEnv(action);

checkTableSchema(
"[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\",\"description\":\"\"},"
+ "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"\"},"
+ "{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"\"}]");
"[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"}]");

try (Statement statement = getStatement()) {
testSyncSqlServerTableImpl(statement);
Expand Down Expand Up @@ -262,7 +261,7 @@ public void testInvalidPrimaryKey() throws Exception {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Specified primary key 'pk' does not exist in source tables or computed columns."));
"Specified primary key 'pk' does not exist in source tables or computed columns [pt, _id, v1]."));
}

@Test
Expand Down Expand Up @@ -483,56 +482,6 @@ public void testCatalogAndTableConfig() {
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}

@Test
@Timeout(60)
public void testSyncShardsMultipleSchemas() throws Exception {
Map<String, String> sqlServerConfig = getBasicSqlServerConfig();

// test table list
ThreadLocalRandom random = ThreadLocalRandom.current();
String schemaPattern = random.nextBoolean() ? "shard_.+" : "shard_1|shard_2";
String tblPattern = random.nextBoolean() ? "t.+" : "t1|t2";

sqlServerConfig.put("database-name", "shard_schema");
sqlServerConfig.put("schema-name", schemaPattern);
sqlServerConfig.put("table-name", tblPattern);

SqlServerSyncTableAction action =
syncTableActionBuilder(sqlServerConfig)
.withPartitionKeys("pt")
.withPrimaryKeys("pk", "pt")
.withComputedColumnArgs("pt=substring(_date,5)")
.build();
runActionWithDefaultEnv(action);

try (Statement statement = getStatement()) {
statement.executeUpdate("USE shard_schema");
statement.executeUpdate("INSERT INTO shard_1.t1 VALUES (1, '2023-07-30')");
statement.executeUpdate("INSERT INTO shard_1.t2 VALUES (2, '2023-07-30')");
statement.executeUpdate("INSERT INTO shard_2.t1 VALUES (3, '2023-07-31')");
statement.executeUpdate("INSERT INTO shard_2.t1 VALUES (4, '2023-07-31')");
}

FileStoreTable table = getFileStoreTable();
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.VARCHAR(10),
DataTypes.STRING().notNull()
},
new String[] {"pk", "_date", "pt"});
waitForResult(
Arrays.asList(
"+I[1, 2023-07-30, 07-30]",
"+I[2, 2023-07-30, 07-30]",
"+I[3, 2023-07-31, 07-31]",
"+I[4, 2023-07-31, 07-31]"),
table,
rowType,
Arrays.asList("pk", "pt"));
}

private FileStoreTable getFileStoreTable() throws Exception {
return getFileStoreTable(tableName);
}
Expand Down

0 comments on commit 62538dd

Please sign in to comment.