Skip to content

Commit

Permalink
add abort log
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Apr 9, 2024
1 parent 7a763e0 commit 987b979
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
// Currently, it can only be aborted based on txnid, so we must
// first request a streamload based on the label to get the txnid.
String label = labelGenerator.generateTableLabel(startChkID);
LOG.info("start a check label {} to load.", label);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -133,6 +135,7 @@ public void initializeLoad(Collection<DorisWriterState> state) {
private void abortLingeringTransactions(Collection<DorisWriterState> recoveredStates)
throws Exception {
List<String> alreadyAborts = new ArrayList<>();
Set<String> alreadyAbortTables = new HashSet<>();
// abort label in state
for (DorisWriterState state : recoveredStates) {
// Todo: When the sink parallelism is reduced,
Expand All @@ -152,6 +155,7 @@ private void abortLingeringTransactions(Collection<DorisWriterState> recoveredSt
DorisStreamLoad streamLoader = getStreamLoader(key);
streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId);
alreadyAborts.add(state.getLabelPrefix());
alreadyAbortTables.add(key);
}

// TODO: In a multi-table scenario, if do not restore from checkpoint,
Expand All @@ -162,6 +166,19 @@ private void abortLingeringTransactions(Collection<DorisWriterState> recoveredSt
// abort current labelPrefix
DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier());
streamLoader.abortPreCommit(labelPrefix, curCheckpointId);
alreadyAbortTables.add(dorisOptions.getTableIdentifier());
}

LOG.info(
"doris stream loader size is {}, already abort table size is {}",
dorisStreamLoadMap.size(),
alreadyAbortTables.size());
if (dorisStreamLoadMap.size() != alreadyAbortTables.size()) {
Set<String> diff = new HashSet<>(dorisStreamLoadMap.keySet());
diff.removeAll(alreadyAbortTables);
LOG.warn(
"there are some tables in dorisStreamLoadMap, but they are not aborted: {}",
diff);
}
}

Expand Down

0 comments on commit 987b979

Please sign in to comment.