Skip to content

Commit

Permalink
Merge branch 'master' into improve-httpclient1204
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Dec 4, 2024
2 parents c080ae8 + 2f26c8f commit 28af67b
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import org.apache.flink.util.Preconditions;

import java.util.UUID;
import java.util.regex.Pattern;

/** Generator label for stream load. */
public class LabelGenerator {
// doris default label regex
private static final String LABEL_REGEX = "^[-_A-Za-z0-9:]{1,128}$";
private static final Pattern LABEL_PATTERN = Pattern.compile(LABEL_REGEX);
private String labelPrefix;
private boolean enable2PC;
private String tableIdentifier;
Expand Down Expand Up @@ -50,11 +54,33 @@ public LabelGenerator(String labelPrefix, boolean enable2PC, int subtaskId) {
public String generateTableLabel(long chkId) {
Preconditions.checkState(tableIdentifier != null);
String label = String.format("%s_%s_%s_%s", labelPrefix, tableIdentifier, subtaskId, chkId);
return enable2PC ? label : label + "_" + UUID.randomUUID();

if (!enable2PC) {
label = label + "_" + UUID.randomUUID();
}

if (LABEL_PATTERN.matcher(label).matches()) {
// The unicode table name or length exceeds the limit
return label;
}

if (enable2PC) {
// In 2pc, replace uuid with the table name. This will cause some txns to fail to be
// aborted when aborting.
// Later, the label needs to be stored in the state and aborted through label
return String.format("%s_%s_%s_%s", labelPrefix, UUID.randomUUID(), subtaskId, chkId);
} else {
return String.format("%s_%s_%s_%s", labelPrefix, subtaskId, chkId, UUID.randomUUID());
}
}

public String generateBatchLabel(String table) {
return String.format("%s_%s_%s", labelPrefix, table, UUID.randomUUID());
String uuid = UUID.randomUUID().toString();
String label = String.format("%s_%s_%s", labelPrefix, table, uuid);
if (!LABEL_PATTERN.matcher(label).matches()) {
return labelPrefix + "_" + uuid;
}
return label;
}

public String generateCopyBatchLabel(String table, long chkId, int fileNum) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer;

import org.junit.Assert;
import org.junit.Test;

public class TestLabelGenerator {

private static String UUID_REGEX_WITHOUT_LINE =
"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";

@Test
public void generateTableLabelTest() {
LabelGenerator labelGenerator = new LabelGenerator("test001", true, "db.table", 0);
String label = labelGenerator.generateTableLabel(1);
Assert.assertEquals("test001_db_table_0_1", label);

// mock label length more than 128
labelGenerator =
new LabelGenerator(
"test001",
false,
"db.tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable",
0);
label = labelGenerator.generateTableLabel(1);
Assert.assertTrue(label.matches("test001_0_1_" + UUID_REGEX_WITHOUT_LINE));

// mock table name chinese
labelGenerator = new LabelGenerator("test001", false, "数据库.数据表", 0);
label = labelGenerator.generateTableLabel(1);
Assert.assertTrue(label.matches("test001_0_1_" + UUID_REGEX_WITHOUT_LINE));

// mock table name chinese and 2pc
labelGenerator = new LabelGenerator("test001", true, "数据库.数据表", 0);
label = labelGenerator.generateTableLabel(1);
Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE + "_0_1"));
}

@Test
public void generateBatchLabelTest() {
LabelGenerator labelGenerator = new LabelGenerator("test001", false);
String label = labelGenerator.generateBatchLabel("table");
Assert.assertTrue(label.matches("test001_table_" + UUID_REGEX_WITHOUT_LINE));

// mock label length more than 128
labelGenerator = new LabelGenerator("test001", false);
label =
labelGenerator.generateBatchLabel(
"tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable");
Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE));

// mock table name chinese
labelGenerator = new LabelGenerator("test001", false);
label = labelGenerator.generateBatchLabel("数据库.数据表");
Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE));
}
}

0 comments on commit 28af67b

Please sign in to comment.