diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 9881ec5f0..319fda82a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -134,7 +134,11 @@ private void close() { * @throws ConnectedFailedException throw if cannot connect to Doris BE */ public TScanOpenResult openScanner(TScanOpenParams openParams) { - logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams); + logger.info( + "OpenScanner to '{}', table is '{}', tablets is '{}'", + routing, + openParams.table, + openParams.tablet_ids); if (!isConnected) { open(); } @@ -161,7 +165,10 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) { ex = e; } } - logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); + logger.error( + "Connect to doris {} failed, to open scanner for tablet {}", + routing, + openParams.tablet_ids); throw new ConnectedFailedException(routing.toString(), ex); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java index dd57a991a..9f2bd07dd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java @@ -144,4 +144,20 @@ public String toString() { + '\'' + '}'; } + + public String toStringWithoutPlan() { + return "PartitionDefinition{" + + "database='" + + database + + '\'' + + ", table='" + + table + + '\'' + + ", beAddress='" + + beAddress + + '\'' + + ", tabletIds=" + + tabletIds + + '}'; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java index e55e1775f..2db4f7984 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -182,6 +182,11 @@ public void run() { } catch (InterruptedException e) { throw new DorisRuntimeException(e); } + } else { + LOG.info( + "Async scan finished , tablets: {}, offset: {}", + partition.getTabletIds(), + offset); } } } finally { @@ -245,6 +250,11 @@ public boolean hasNext() { eos.set(nextResult.isEos()); if (!eos.get()) { rowBatch = new RowBatch(nextResult, schema).readArrow(); + } else { + LOG.info( + "Scan finished, tablets: {}, offset: {}", + partition.getTabletIds(), + offset); } } hasNext = !eos.get(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java index 9e4539349..9e1d091eb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java @@ -34,7 +34,7 @@ public static ValueReader createReader( DorisReadOptions readOptions, Logger logger) throws DorisException { - logger.info("create reader for partition: {}", partition); + logger.info("create reader for partition: {}", partition.toStringWithoutPlan()); if (readOptions.getUseFlightSql()) { return new DorisFlightValueReader( partition,