diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 5e2a697dd..d049a2bb9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -154,20 +154,20 @@ public Future getPendingLoadFuture() { } /** - * try to discard pending transactions with labels beginning with labelSuffix. + * try to discard pending transactions with labels beginning with labelPrefix. * - * @param labelSuffix the suffix of the stream load. + * @param labelPrefix the prefix of the stream load. * @param chkID checkpoint id of task. * @throws Exception */ - public void abortPreCommit(String labelSuffix, long chkID) throws Exception { + public void abortPreCommit(String labelPrefix, long chkID) throws Exception { long startChkID = chkID; - LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); + LOG.info("abort for labelPrefix {}. start chkId {}.", labelPrefix, chkID); while (true) { try { - // TODO: According to label abort txn. Currently, it can only be aborted based on - // txnid, - // so we must first request a streamload based on the label to get the txnid. + // TODO: According to label abort txn. + // Currently, it can only be aborted based on txnid, so we must + // first request a streamload based on the label to get the txnid. String label = labelGenerator.generateTableLabel(startChkID); HttpPutBuilder builder = new HttpPutBuilder(); builder.setUrl(loadUrlStr) @@ -214,7 +214,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { throw e; } } - LOG.info("abort for labelSuffix {} finished", labelSuffix); + LOG.info("abort for labelPrefix {} finished", labelPrefix); } /** @@ -315,14 +315,18 @@ public void abortTransaction(long txnID) throws Exception { ObjectMapper mapper = new ObjectMapper(); String loadResult = EntityUtils.toString(response.getEntity()); + LOG.info("abort Result {}", loadResult); Map res = mapper.readValue(loadResult, new TypeReference>() {}); if (!SUCCESS.equals(res.get("status"))) { - if (ResponseUtil.isCommitted(res.get("msg"))) { + String msg = res.get("msg"); + if (msg != null && ResponseUtil.isCommitted(msg)) { throw new DorisException( "try abort committed transaction, " + "do you recover from old savepoint?"); } - LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnID, res.get("msg")); + + LOG.error("Fail to abort transaction. txnId: {}, error: {}", txnID, msg); + throw new DorisException("Fail to abort transaction, " + loadResult); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java index 5ead3e859..1352a261d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java @@ -20,6 +20,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.OptionUtils; import org.apache.http.client.methods.CloseableHttpResponse; @@ -88,7 +89,7 @@ public void testAbortTransaction() throws Exception { dorisStreamLoad.abortTransaction(anyLong()); } - @Test + @Test(expected = DorisException.class) public void testAbortTransactionFailed() throws Exception { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); CloseableHttpResponse abortFailedResponse =