diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java index 64c27b711..bc037a8a5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -167,21 +167,23 @@ public void testExtractDDL() throws IOException { @Test public void testExtractDDLListMultipleColumns() throws IOException { - String sql0 = "ALTER TABLE test.t1 ADD COLUMN c2 INT"; - String sql1 = "ALTER TABLE test.t1 ADD COLUMN c555 VARCHAR(400)"; - String sql2 = "ALTER TABLE test.t1 ADD COLUMN c666 INT DEFAULT '100'"; - String sql3 = "ALTER TABLE test.t1 ADD COLUMN c4 BIGINT DEFAULT '555'"; - String sql4 = "ALTER TABLE test.t1 ADD COLUMN c199 INT"; - String sql5 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'"; - String sql6 = "ALTER TABLE test.t1 DROP COLUMN name"; - String sql7 = "ALTER TABLE test.t1 DROP COLUMN test_time"; - String sql8 = "ALTER TABLE test.t1 DROP COLUMN c1"; - String sql9 = "ALTER TABLE test.t1 DROP COLUMN cc"; - List srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3, sql4, sql5, sql6, sql7, sql8, sql9); + String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'"; + String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT"; + String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'"; + String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13"; + List srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3); + + Map originFiledSchemaMap = new LinkedHashMap<>(); + originFiledSchemaMap.put("c2", new FieldSchema()); + originFiledSchemaMap.put("c555", new FieldSchema()); + originFiledSchemaMap.put("c666", new FieldSchema()); + originFiledSchemaMap.put("c4", new FieldSchema()); + originFiledSchemaMap.put("c13", new FieldSchema()); String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode recordRoot = objectMapper.readTree(record); + serializer.setOriginFieldSchemaMap(originFiledSchemaMap); List ddlSQLList = serializer.extractDDLList(recordRoot); for (int i = 0; i < ddlSQLList.size(); i++) { String srcSQL = srcSqlList.get(i);