From 743d0351367870aa107b05110d7f57281ea35335 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 6 Feb 2024 11:36:29 +0800 Subject: [PATCH] fix log print --- .../apache/doris/flink/sink/writer/DorisStreamLoad.java | 8 +++++--- .../org/apache/doris/flink/sink/writer/DorisWriter.java | 4 +--- .../doris/flink/sink/writer/TestDorisStreamLoad.java | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 604eb5c81..5e2a697dd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -85,6 +85,7 @@ public class DorisStreamLoad implements Serializable { private final CloseableHttpClient httpClient; private final ExecutorService executorService; private boolean loadBatchFirstRecord; + private volatile String currentLabel; public DorisStreamLoad( String hostPort, @@ -246,9 +247,9 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw throw new StreamLoadException("stream load error: " + response.getStatusLine().toString()); } - public RespContent stopLoad(String label) throws IOException { + public RespContent stopLoad() throws IOException { recordStream.endInput(); - LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort); + LOG.info("table {} stream load stopped for {} on host {}", table, currentLabel, hostPort); Preconditions.checkState(pendingLoadFuture != null); try { return handlePreCommitResponse(pendingLoadFuture.get()); @@ -268,6 +269,7 @@ public void startLoad(String label, boolean isResume) throws IOException { HttpPutBuilder putBuilder = new HttpPutBuilder(); recordStream.startInput(isResume); LOG.info("table {} stream load started for {} on host {}", table, label, hostPort); + this.currentLabel = label; try { InputStreamEntity entity = new InputStreamEntity(recordStream); putBuilder @@ -284,7 +286,7 @@ public void startLoad(String label, boolean isResume) throws IOException { pendingLoadFuture = executorService.submit( () -> { - LOG.info("table {} start execute load", table); + LOG.info("table {} start execute load for label {}", table, label); return httpClient.execute(putBuilder.build()); }); } catch (Exception e) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index c6f81245e..54facc704 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -233,9 +233,7 @@ public Collection prepareCommit() throws IOException, Interrup continue; } DorisStreamLoad dorisStreamLoad = streamLoader.getValue(); - LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier); - String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); - RespContent respContent = dorisStreamLoad.stopLoad(currentLabel); + RespContent respContent = dorisStreamLoad.stopLoad(); // refresh metrics if (sinkMetricsMap.containsKey(tableIdentifier)) { DorisWriteMetrics dorisWriteMetrics = sinkMetricsMap.get(tableIdentifier); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java index fab2fcd06..5ead3e859 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java @@ -120,7 +120,7 @@ public void testWriteOneRecordInCsv() throws Exception { httpClient); dorisStreamLoad.startLoad("1", false); dorisStreamLoad.writeRecord(writeBuffer); - dorisStreamLoad.stopLoad("label"); + dorisStreamLoad.stopLoad(); byte[] buff = new byte[4]; int n = dorisStreamLoad.getRecordStream().read(buff); dorisStreamLoad.getRecordStream().read(new byte[4]); @@ -147,7 +147,7 @@ public void testWriteTwoRecordInCsv() throws Exception { dorisStreamLoad.startLoad("1", false); dorisStreamLoad.writeRecord(writeBuffer); dorisStreamLoad.writeRecord(writeBuffer); - dorisStreamLoad.stopLoad("label"); + dorisStreamLoad.stopLoad(); byte[] buff = new byte[9]; int n = dorisStreamLoad.getRecordStream().read(buff); int ret = dorisStreamLoad.getRecordStream().read(new byte[9]); @@ -179,7 +179,7 @@ public void testWriteTwoRecordInJson() throws Exception { dorisStreamLoad.startLoad("1", false); dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8)); dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8)); - dorisStreamLoad.stopLoad("label"); + dorisStreamLoad.stopLoad(); byte[] buff = new byte[expectBuffer.length]; int n = dorisStreamLoad.getRecordStream().read(buff);