diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java index 135c27f25014..287d1e41d972 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java @@ -48,6 +48,11 @@ public interface RecordReader extends Closeable { @Nullable RecordIterator readBatch() throws IOException; + /** Return the number of records that skipped by this reader. */ + default long skippedRecordCount() { + return 0L; + } + /** Closes the reader and should release all resources. */ @Override void close() throws IOException; diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java index 08d7de5dab7f..a017e1711a8f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java @@ -35,6 +35,8 @@ public class CompactResult { @Nullable private CompactDeletionFile deletionFile; + private long dropDeletedRecordCount; + public CompactResult() { this(Collections.emptyList(), Collections.emptyList()); } @@ -75,10 +77,19 @@ public CompactDeletionFile deletionFile() { return deletionFile; } + public void setDropDeletedRecordCount(long dropDeletedRecordCount) { + this.dropDeletedRecordCount = dropDeletedRecordCount; + } + + public long getDropDeletedRecordCount() { + return dropDeletedRecordCount; + } + public void merge(CompactResult that) { before.addAll(that.before); after.addAll(that.after); changelog.addAll(that.changelog); + dropDeletedRecordCount += that.dropDeletedRecordCount; if (deletionFile != null || that.deletionFile != null) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java index c8da0f3ef2c5..9300f80927b4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java @@ -64,6 +64,8 @@ public CompactResult call() throws Exception { .map(DataFileMeta::fileSize) .reduce(Long::sum) .orElse(0L)); + metricsReporter.reportDropDeletedRecordCount( + result.getDropDeletedRecordCount()); } }, LOG); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java index 980a6a1c7e65..4faf8d33074a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java @@ -32,6 +32,7 @@ */ public class DropDeleteReader implements RecordReader { + private long dropDeletedRecordCount = 0L; private final RecordReader reader; public DropDeleteReader(RecordReader reader) { @@ -58,6 +59,7 @@ public KeyValue next() throws IOException { if (kv.isAdd()) { return kv; } + ++dropDeletedRecordCount; } } @@ -68,6 +70,11 @@ public void releaseBatch() { }; } + @Override + public long skippedRecordCount() { + return dropDeletedRecordCount; + } + @Override public void close() throws IOException { reader.close(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index 56c61d87435a..4f8cd1fadc77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -126,6 +126,7 @@ private CompactResult rewriteOrProduceChangelog( RollingFileWriter compactFileWriter = null; RollingFileWriter changelogFileWriter = null; Exception collectedExceptions = null; + long dropDeletedRecordCount = 0L; try { iterator = @@ -143,10 +144,12 @@ private CompactResult rewriteOrProduceChangelog( while (iterator.hasNext()) { ChangelogResult result = iterator.next(); KeyValue keyValue = result.result(); - if (compactFileWriter != null - && keyValue != null - && (!dropDelete || keyValue.isAdd())) { - compactFileWriter.write(keyValue); + if (compactFileWriter != null && keyValue != null) { + if ((!dropDelete || keyValue.isAdd())) { + compactFileWriter.write(keyValue); + } else { + ++dropDeletedRecordCount; + } } if (produceChangelog) { for (KeyValue kv : result.changelogs()) { @@ -190,7 +193,11 @@ private CompactResult rewriteOrProduceChangelog( changelogFileWriter != null ? changelogFileWriter.result() : Collections.emptyList(); - return new CompactResult(before, after, changelogFiles); + CompactResult compactResult = new CompactResult(before, after, changelogFiles); + if (dropDelete) { + compactResult.setDropDeletedRecordCount(dropDeletedRecordCount); + } + return compactResult; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index df299fd84c52..bcdde388f33b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -104,7 +104,11 @@ protected CompactResult rewriteCompaction( List before = extractFilesFromSections(sections); notifyRewriteCompactBefore(before); - return new CompactResult(before, writer.result()); + CompactResult compactResult = new CompactResult(before, writer.result()); + if (dropDelete && reader != null) { + compactResult.setDropDeletedRecordCount(reader.skippedRecordCount()); + } + return compactResult; } protected RecordReader readerForMergeTree( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java index a1b64072d817..d7ea21175578 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java @@ -139,7 +139,7 @@ private void rewrite(List> candidate, CompactResult toUpdate) th } if (candidate.size() == 1) { List section = candidate.get(0); - if (section.size() == 0) { + if (section.isEmpty()) { return; } else if (section.size() == 1) { for (DataFileMeta file : section.get(0).files()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java index a3074daebbde..93a3075d9653 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java @@ -44,6 +44,8 @@ public class CompactionMetrics { public static final String AVG_COMPACTION_TIME = "avgCompactionTime"; public static final String COMPACTION_COMPLETED_COUNT = "compactionCompletedCount"; public static final String COMPACTION_QUEUED_COUNT = "compactionQueuedCount"; + public static final String COMPACTION_DROP_DELETED_RECORD_COUNT = + "compactionDropDeletedRecordCount"; public static final String MAX_COMPACTION_INPUT_SIZE = "maxCompactionInputSize"; public static final String MAX_COMPACTION_OUTPUT_SIZE = "maxCompactionOutputSize"; public static final String AVG_COMPACTION_INPUT_SIZE = "avgCompactionInputSize"; @@ -55,8 +57,9 @@ public class CompactionMetrics { private final Map reporters; private final Map compactTimers; private final Queue compactionTimes; - private Counter compactionsCompletedCounter; - private Counter compactionsQueuedCounter; + private Counter compactionCompletedCounter; + private Counter compactionQueuedCounter; + private Counter compactionDropDeletedRecordCounter; public CompactionMetrics(MetricRegistry registry, String tableName) { this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName); @@ -91,8 +94,10 @@ private void registerGenericCompactionMetrics() { AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0)); metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum()); - compactionsCompletedCounter = metricGroup.counter(COMPACTION_COMPLETED_COUNT); - compactionsQueuedCounter = metricGroup.counter(COMPACTION_QUEUED_COUNT); + compactionCompletedCounter = metricGroup.counter(COMPACTION_COMPLETED_COUNT); + compactionQueuedCounter = metricGroup.counter(COMPACTION_QUEUED_COUNT); + compactionDropDeletedRecordCounter = + metricGroup.counter(COMPACTION_DROP_DELETED_RECORD_COUNT); } private LongStream getLevel0FileCountStream() { @@ -135,6 +140,8 @@ public interface Reporter { void decreaseCompactionsQueuedCount(); + void reportDropDeletedRecordCount(long dropDeletedRecordCount); + void reportCompactionInputSize(long bytes); void reportCompactionOutputSize(long bytes); @@ -188,17 +195,22 @@ public void reportLevel0FileCount(long count) { @Override public void increaseCompactionsCompletedCount() { - compactionsCompletedCounter.inc(); + compactionCompletedCounter.inc(); } @Override public void increaseCompactionsQueuedCount() { - compactionsQueuedCounter.inc(); + compactionQueuedCounter.inc(); } @Override public void decreaseCompactionsQueuedCount() { - compactionsQueuedCounter.dec(); + compactionQueuedCounter.dec(); + } + + @Override + public void reportDropDeletedRecordCount(long dropDeletedRecordCount) { + compactionDropDeletedRecordCounter.inc(dropDeletedRecordCount); } @Override