From 0d35dff8311d5d5816009c425f84d97168475f99 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 2 Dec 2024 11:46:50 +0800 Subject: [PATCH] update --- .../flink/sink/writer/LabelGenerator.java | 22 +++++++++++-------- .../flink/sink/writer/TestLabelGenerator.java | 19 ++++++++++------ 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index 7e58bb810..5207b3ca5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -54,22 +54,26 @@ public LabelGenerator(String labelPrefix, boolean enable2PC, int subtaskId) { public String generateTableLabel(long chkId) { Preconditions.checkState(tableIdentifier != null); String label = String.format("%s_%s_%s_%s", labelPrefix, tableIdentifier, subtaskId, chkId); - if (enable2PC) { + if (LABEL_PATTERN.matcher(label).matches()) { return label; + } + // The unicode table name or length exceeds the limit + String uuid = UUID.randomUUID().toString().replace("-", ""); + if (enable2PC) { + // In 2pc, replace uuid with the table name. This will cause some txns to fail to be + // aborted when aborting. + // Later, the label needs to be stored in the state and aborted through label + return String.format("%s_%s_%s_%s", labelPrefix, uuid, subtaskId, chkId); } else { - label = label + "_" + UUID.randomUUID(); - if (LABEL_PATTERN.matcher(label).matches()) { - return label; - } else { - return labelPrefix + "_" + subtaskId + "_" + chkId + "_" + UUID.randomUUID(); - } + return String.format("%s_%s_%s_%s", labelPrefix, subtaskId, chkId, uuid); } } public String generateBatchLabel(String table) { - String label = String.format("%s_%s_%s", labelPrefix, table, UUID.randomUUID()); + String uuid = UUID.randomUUID().toString().replace("-", ""); + String label = String.format("%s_%s_%s", labelPrefix, table, uuid); if (!LABEL_PATTERN.matcher(label).matches()) { - return labelPrefix + "_" + UUID.randomUUID(); + return labelPrefix + "_" + uuid; } return label; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java index 9b7a83739..92a9b897d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java @@ -22,8 +22,8 @@ public class TestLabelGenerator { - private static String UUID_REGEX = - "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; + private static String UUID_REGEX_WITHOUT_LINE = + "[0-9a-f]{8}[0-9a-f]{4}[0-9a-f]{4}[0-9a-f]{4}[0-9a-f]{12}"; @Test public void generateTableLabelTest() { @@ -39,30 +39,35 @@ public void generateTableLabelTest() { "db.tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable", 0); label = labelGenerator.generateTableLabel(1); - Assert.assertTrue(label.matches("test001_0_1_" + UUID_REGEX)); + Assert.assertTrue(label.matches("test001_0_1_" + UUID_REGEX_WITHOUT_LINE)); // mock table name chinese labelGenerator = new LabelGenerator("test001", false, "数据库.数据表", 0); label = labelGenerator.generateTableLabel(1); - Assert.assertTrue(label.matches("test001_0_1_" + UUID_REGEX)); + Assert.assertTrue(label.matches("test001_0_1_" + UUID_REGEX_WITHOUT_LINE)); + + // mock table name chinese and 2pc + labelGenerator = new LabelGenerator("test001", true, "数据库.数据表", 0); + label = labelGenerator.generateTableLabel(1); + Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE + "_0_1")); } @Test public void generateBatchLabelTest() { LabelGenerator labelGenerator = new LabelGenerator("test001", false); String label = labelGenerator.generateBatchLabel("table"); - Assert.assertTrue(label.matches("test001_table_" + UUID_REGEX)); + Assert.assertTrue(label.matches("test001_table_" + UUID_REGEX_WITHOUT_LINE)); // mock label length more than 128 labelGenerator = new LabelGenerator("test001", false); label = labelGenerator.generateBatchLabel( "tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable"); - Assert.assertTrue(label.matches("test001_" + UUID_REGEX)); + Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE)); // mock table name chinese labelGenerator = new LabelGenerator("test001", false); label = labelGenerator.generateBatchLabel("数据库.数据表"); - Assert.assertTrue(label.matches("test001_" + UUID_REGEX)); + Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE)); } }