From c70a3d16558579360b68c1665adb4b12e98b728f Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 24 Oct 2024 11:28:47 +0800 Subject: [PATCH] add log --- .../doris/flink/backend/BackendClient.java | 35 +++++++++++++------ .../source/assigners/SimpleSplitAssigner.java | 6 ++-- .../flink/table/DorisTableInputSplit.java | 11 ++++++ 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 319fda82a..be976eff1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -134,19 +134,16 @@ private void close() { * @throws ConnectedFailedException throw if cannot connect to Doris BE */ public TScanOpenResult openScanner(TScanOpenParams openParams) { - logger.info( - "OpenScanner to '{}', table is '{}', tablets is '{}'", - routing, - openParams.table, - openParams.tablet_ids); + logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams); if (!isConnected) { open(); } TException ex = null; + TScanOpenResult result = null; for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to openScanner {}.", attempt, routing); try { - TScanOpenResult result = client.openScanner(openParams); + result = client.openScanner(openParams); if (result == null) { logger.warn("Open scanner result from {} is null.", routing); continue; @@ -159,16 +156,29 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) { result.getStatus().getErrorMsgs()); continue; } + logger.info( + "OpenScanner success for Doris BE '{}' with contextId '{}' for tablets '{}'.", + routing, + result.getContextId(), + openParams.tablet_ids); return result; } catch (TException e) { logger.warn("Open scanner from {} failed.", routing, e); ex = e; } } - logger.error( - "Connect to doris {} failed, to open scanner for tablet {}", - routing, - openParams.tablet_ids); + if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) { + logger.error( + ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, + routing, + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + throw new DorisInternalException( + routing.toString(), + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + } + logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); throw new ConnectedFailedException(routing.toString(), ex); } @@ -251,7 +261,10 @@ public void closeScanner(TScanCloseParams closeParams) { logger.warn("Close scanner from {} failed.", routing, e); } } - logger.info("CloseScanner to Doris BE '{}' success.", routing); + logger.info( + "CloseScanner to Doris BE '{}' success for contextId {} ", + routing, + closeParams.getContextId()); close(); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java index 7948244ca..57a56f53f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java @@ -45,9 +45,9 @@ public Optional getNext(@Nullable String hostname) { } @Override - public void addSplits(Collection splits) { - LOG.info("Adding splits: {}", splits); - splits.addAll(splits); + public void addSplits(Collection newSplits) { + LOG.info("Adding splits: {}", newSplits); + splits.addAll(newSplits); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java index 9620c5754..9abe15f2c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java @@ -39,4 +39,15 @@ public DorisTableInputSplit(int splitNumber, PartitionDefinition partition) { public int getSplitNumber() { return splitNumber; } + + @Override + public String toString() { + return String.format( + "DorisTableInputSplit: %s.%s,id=%s,be=%s,tablets=%s", + partition.getDatabase(), + partition.getTable(), + splitNumber, + partition.getBeAddress(), + partition.getTabletIds()); + } }