From f7cf0bfcee205df4569703c3cb3d32ecee186f85 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 5 Nov 2024 11:09:00 +0800 Subject: [PATCH] fix batch mode Stuck --- .../sink/batch/DorisBatchStreamLoad.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 42b832076..3cfda6041 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -198,6 +198,7 @@ public synchronized void writeRecord(String database, String table, byte[] recor lock.lock(); try { while (currentCacheBytes.get() >= maxBlockedBytes) { + checkFlushException(); LOG.info( "Cache full, waiting for flush, currentBytes: {}, maxBlockedBytes: {}", currentCacheBytes.get(), @@ -486,11 +487,22 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { putBuilder.setLabel(label + "_" + retry); reason = respContent.getMessage(); } else { - String errMsg = - String.format( - "stream load error: %s, see more in %s", - respContent.getMessage(), - respContent.getErrorURL()); + String errMsg = null; + if (StringUtils.isBlank(respContent.getMessage()) + && StringUtils.isBlank(respContent.getErrorURL())) { + // sometimes stream load will not return message + errMsg = + String.format( + "stream load error, response is %s", + loadResult); + throw new DorisBatchLoadException(errMsg); + } else { + errMsg = + String.format( + "stream load error: %s, see more in %s", + respContent.getMessage(), + respContent.getErrorURL()); + } throw new DorisBatchLoadException(errMsg); } }