diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index 94605105b..6600dd07e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -296,7 +296,8 @@ public void fillOriginSchema(String tableName, JsonNode columns) { } } - private void buildFieldSchema(Map filedSchemaMap, JsonNode column) { + @VisibleForTesting + public void buildFieldSchema(Map filedSchemaMap, JsonNode column) { String fieldName = column.get("name").asText(); String dorisTypeName = buildDorisTypeName(column); String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression")); @@ -315,7 +316,7 @@ public String buildDorisTypeName(JsonNode column) { } private String handleDefaultValue(String defaultValue) { - if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) { + if (defaultValue == null) { return null; } if (defaultValue.equals("1970-01-01 00:00:00")) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index 68239618c..51ee93159 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -542,6 +542,44 @@ private Map buildDatetimeFieldSchemaMap() { return filedSchemaMap; } + @Test + public void buildFieldSchemaTest() { + Map result = new HashMap<>(); + String columnInfo = + "{\"name\":\"order_ts\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":\"中文注释\",\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]}\n"; + schemaChange.setSourceConnector("mysql"); + JsonNode columns = null; + try { + columns = objectMapper.readTree(columnInfo); + } catch (IOException e) { + e.printStackTrace(); + } + schemaChange.buildFieldSchema(result, columns); + Assert.assertTrue(result.containsKey("order_ts")); + FieldSchema fieldSchema = result.get("order_ts"); + Assert.assertEquals(fieldSchema.getName().toLowerCase(), "order_ts"); + Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "datetimev2(0)"); + Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), "current_timestamp"); + Assert.assertEquals(fieldSchema.getComment(), "中文注释"); + + columnInfo = + "{\"name\":\"other_no\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":23,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":\"comment\",\"hasDefaultValue\":true,\"defaultValueExpression\":\"\",\"enumValues\":[]}\n"; + schemaChange.setSourceConnector("mysql"); + columns = null; + try { + columns = objectMapper.readTree(columnInfo); + } catch (IOException e) { + e.printStackTrace(); + } + schemaChange.buildFieldSchema(result, columns); + Assert.assertTrue(result.containsKey("other_no")); + fieldSchema = result.get("other_no"); + Assert.assertEquals(fieldSchema.getName().toLowerCase(), "other_no"); + Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "varchar(150)"); + Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), ""); + Assert.assertEquals(fieldSchema.getComment(), "comment"); + } + @After public void after() { mockRestService.close();