diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 9881ec5f0..be976eff1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -139,10 +139,11 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) { open(); } TException ex = null; + TScanOpenResult result = null; for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to openScanner {}.", attempt, routing); try { - TScanOpenResult result = client.openScanner(openParams); + result = client.openScanner(openParams); if (result == null) { logger.warn("Open scanner result from {} is null.", routing); continue; @@ -155,12 +156,28 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) { result.getStatus().getErrorMsgs()); continue; } + logger.info( + "OpenScanner success for Doris BE '{}' with contextId '{}' for tablets '{}'.", + routing, + result.getContextId(), + openParams.tablet_ids); return result; } catch (TException e) { logger.warn("Open scanner from {} failed.", routing, e); ex = e; } } + if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) { + logger.error( + ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, + routing, + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + throw new DorisInternalException( + routing.toString(), + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + } logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); throw new ConnectedFailedException(routing.toString(), ex); } @@ -244,7 +261,10 @@ public void closeScanner(TScanCloseParams closeParams) { logger.warn("Close scanner from {} failed.", routing, e); } } - logger.info("CloseScanner to Doris BE '{}' success.", routing); + logger.info( + "CloseScanner to Doris BE '{}' success for contextId {} ", + routing, + closeParams.getContextId()); close(); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java index dd57a991a..9f2bd07dd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java @@ -144,4 +144,20 @@ public String toString() { + '\'' + '}'; } + + public String toStringWithoutPlan() { + return "PartitionDefinition{" + + "database='" + + database + + '\'' + + ", table='" + + table + + '\'' + + ", beAddress='" + + beAddress + + '\'' + + ", tabletIds=" + + tabletIds + + '}'; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java index 7948244ca..57a56f53f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java @@ -45,9 +45,9 @@ public Optional getNext(@Nullable String hostname) { } @Override - public void addSplits(Collection splits) { - LOG.info("Adding splits: {}", splits); - splits.addAll(splits); + public void addSplits(Collection newSplits) { + LOG.info("Adding splits: {}", newSplits); + splits.addAll(newSplits); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java index e55e1775f..2db4f7984 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -182,6 +182,11 @@ public void run() { } catch (InterruptedException e) { throw new DorisRuntimeException(e); } + } else { + LOG.info( + "Async scan finished , tablets: {}, offset: {}", + partition.getTabletIds(), + offset); } } } finally { @@ -245,6 +250,11 @@ public boolean hasNext() { eos.set(nextResult.isEos()); if (!eos.get()) { rowBatch = new RowBatch(nextResult, schema).readArrow(); + } else { + LOG.info( + "Scan finished, tablets: {}, offset: {}", + partition.getTabletIds(), + offset); } } hasNext = !eos.get(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java index 9e4539349..9e1d091eb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java @@ -34,7 +34,7 @@ public static ValueReader createReader( DorisReadOptions readOptions, Logger logger) throws DorisException { - logger.info("create reader for partition: {}", partition); + logger.info("create reader for partition: {}", partition.toStringWithoutPlan()); if (readOptions.getUseFlightSql()) { return new DorisFlightValueReader( partition, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java index 9620c5754..9abe15f2c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java @@ -39,4 +39,15 @@ public DorisTableInputSplit(int splitNumber, PartitionDefinition partition) { public int getSplitNumber() { return splitNumber; } + + @Override + public String toString() { + return String.format( + "DorisTableInputSplit: %s.%s,id=%s,be=%s,tablets=%s", + partition.getDatabase(), + partition.getTable(), + splitNumber, + partition.getBeAddress(), + partition.getTabletIds()); + } }