Skip to content

Commit

Permalink
fix log print
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Feb 6, 2024
1 parent 99f2da9 commit 743d035
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class DorisStreamLoad implements Serializable {
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private boolean loadBatchFirstRecord;
private volatile String currentLabel;

public DorisStreamLoad(
String hostPort,
Expand Down Expand Up @@ -246,9 +247,9 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw
throw new StreamLoadException("stream load error: " + response.getStatusLine().toString());
}

public RespContent stopLoad(String label) throws IOException {
public RespContent stopLoad() throws IOException {
recordStream.endInput();
LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort);
LOG.info("table {} stream load stopped for {} on host {}", table, currentLabel, hostPort);
Preconditions.checkState(pendingLoadFuture != null);
try {
return handlePreCommitResponse(pendingLoadFuture.get());
Expand All @@ -268,6 +269,7 @@ public void startLoad(String label, boolean isResume) throws IOException {
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput(isResume);
LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
this.currentLabel = label;
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
putBuilder
Expand All @@ -284,7 +286,7 @@ public void startLoad(String label, boolean isResume) throws IOException {
pendingLoadFuture =
executorService.submit(
() -> {
LOG.info("table {} start execute load", table);
LOG.info("table {} start execute load for label {}", table, label);
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,7 @@ public Collection<DorisCommittable> prepareCommit() throws IOException, Interrup
continue;
}
DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
String currentLabel = labelGenerator.generateTableLabel(curCheckpointId);
RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
RespContent respContent = dorisStreamLoad.stopLoad();
// refresh metrics
if (sinkMetricsMap.containsKey(tableIdentifier)) {
DorisWriteMetrics dorisWriteMetrics = sinkMetricsMap.get(tableIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testWriteOneRecordInCsv() throws Exception {
httpClient);
dorisStreamLoad.startLoad("1", false);
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.stopLoad("label");
dorisStreamLoad.stopLoad();
byte[] buff = new byte[4];
int n = dorisStreamLoad.getRecordStream().read(buff);
dorisStreamLoad.getRecordStream().read(new byte[4]);
Expand All @@ -147,7 +147,7 @@ public void testWriteTwoRecordInCsv() throws Exception {
dorisStreamLoad.startLoad("1", false);
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.stopLoad("label");
dorisStreamLoad.stopLoad();
byte[] buff = new byte[9];
int n = dorisStreamLoad.getRecordStream().read(buff);
int ret = dorisStreamLoad.getRecordStream().read(new byte[9]);
Expand Down Expand Up @@ -179,7 +179,7 @@ public void testWriteTwoRecordInJson() throws Exception {
dorisStreamLoad.startLoad("1", false);
dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8));
dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8));
dorisStreamLoad.stopLoad("label");
dorisStreamLoad.stopLoad();
byte[] buff = new byte[expectBuffer.length];
int n = dorisStreamLoad.getRecordStream().read(buff);

Expand Down

0 comments on commit 743d035

Please sign in to comment.