Skip to content

Commit

Permalink
Introduce compactionDropDeletedRecordCount metric.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuyaogai committed Nov 9, 2024
1 parent ddf10d4 commit 7dae1ad
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface RecordReader<T> extends Closeable {
@Nullable
RecordIterator<T> readBatch() throws IOException;

/** Return the number of records that skipped by this reader. */
default long skippedRecordCount() {
return 0L;
}

/** Closes the reader and should release all resources. */
@Override
void close() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class CompactResult {

@Nullable private CompactDeletionFile deletionFile;

private long dropDeletedRecordCount;

public CompactResult() {
this(Collections.emptyList(), Collections.emptyList());
}
Expand Down Expand Up @@ -75,10 +77,19 @@ public CompactDeletionFile deletionFile() {
return deletionFile;
}

public void setDropDeletedRecordCount(long dropDeletedRecordCount) {
this.dropDeletedRecordCount = dropDeletedRecordCount;
}

public long getDropDeletedRecordCount() {
return dropDeletedRecordCount;
}

public void merge(CompactResult that) {
before.addAll(that.before);
after.addAll(that.after);
changelog.addAll(that.changelog);
dropDeletedRecordCount += that.dropDeletedRecordCount;

if (deletionFile != null || that.deletionFile != null) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public CompactResult call() throws Exception {
.map(DataFileMeta::fileSize)
.reduce(Long::sum)
.orElse(0L));
metricsReporter.reportDropDeletedRecordCount(
result.getDropDeletedRecordCount());
}
},
LOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/
public class DropDeleteReader implements RecordReader<KeyValue> {

private long dropDeletedRecordCount = 0L;
private final RecordReader<KeyValue> reader;

public DropDeleteReader(RecordReader<KeyValue> reader) {
Expand All @@ -58,6 +59,7 @@ public KeyValue next() throws IOException {
if (kv.isAdd()) {
return kv;
}
++dropDeletedRecordCount;
}
}

Expand All @@ -68,6 +70,11 @@ public void releaseBatch() {
};
}

@Override
public long skippedRecordCount() {
return dropDeletedRecordCount;
}

@Override
public void close() throws IOException {
reader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private CompactResult rewriteOrProduceChangelog(
RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;
Exception collectedExceptions = null;
long dropDeletedRecordCount = 0L;

try {
iterator =
Expand All @@ -143,10 +144,12 @@ private CompactResult rewriteOrProduceChangelog(
while (iterator.hasNext()) {
ChangelogResult result = iterator.next();
KeyValue keyValue = result.result();
if (compactFileWriter != null
&& keyValue != null
&& (!dropDelete || keyValue.isAdd())) {
compactFileWriter.write(keyValue);
if (compactFileWriter != null && keyValue != null) {
if ((!dropDelete || keyValue.isAdd())) {
compactFileWriter.write(keyValue);
} else {
++dropDeletedRecordCount;
}
}
if (produceChangelog) {
for (KeyValue kv : result.changelogs()) {
Expand Down Expand Up @@ -190,7 +193,11 @@ private CompactResult rewriteOrProduceChangelog(
changelogFileWriter != null
? changelogFileWriter.result()
: Collections.emptyList();
return new CompactResult(before, after, changelogFiles);
CompactResult compactResult = new CompactResult(before, after, changelogFiles);
if (dropDelete) {
compactResult.setDropDeletedRecordCount(dropDeletedRecordCount);
}
return compactResult;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ protected CompactResult rewriteCompaction(

List<DataFileMeta> before = extractFilesFromSections(sections);
notifyRewriteCompactBefore(before);
return new CompactResult(before, writer.result());
CompactResult compactResult = new CompactResult(before, writer.result());
if (dropDelete && reader != null) {
compactResult.setDropDeletedRecordCount(reader.skippedRecordCount());
}
return compactResult;
}

protected <T> RecordReader<T> readerForMergeTree(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void rewrite(List<List<SortedRun>> candidate, CompactResult toUpdate) th
}
if (candidate.size() == 1) {
List<SortedRun> section = candidate.get(0);
if (section.size() == 0) {
if (section.isEmpty()) {
return;
} else if (section.size() == 1) {
for (DataFileMeta file : section.get(0).files()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class CompactionMetrics {
public static final String AVG_COMPACTION_TIME = "avgCompactionTime";
public static final String COMPACTION_COMPLETED_COUNT = "compactionCompletedCount";
public static final String COMPACTION_QUEUED_COUNT = "compactionQueuedCount";
public static final String COMPACTION_DROP_DELETED_RECORD_COUNT =
"compactionDropDeletedRecordCount";
public static final String MAX_COMPACTION_INPUT_SIZE = "maxCompactionInputSize";
public static final String MAX_COMPACTION_OUTPUT_SIZE = "maxCompactionOutputSize";
public static final String AVG_COMPACTION_INPUT_SIZE = "avgCompactionInputSize";
Expand All @@ -55,8 +57,9 @@ public class CompactionMetrics {
private final Map<PartitionAndBucket, ReporterImpl> reporters;
private final Map<Long, CompactTimer> compactTimers;
private final Queue<Long> compactionTimes;
private Counter compactionsCompletedCounter;
private Counter compactionsQueuedCounter;
private Counter compactionCompletedCounter;
private Counter compactionQueuedCounter;
private Counter compactionDropDeletedRecordCounter;

public CompactionMetrics(MetricRegistry registry, String tableName) {
this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
Expand Down Expand Up @@ -91,8 +94,10 @@ private void registerGenericCompactionMetrics() {
AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0));
metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum());

compactionsCompletedCounter = metricGroup.counter(COMPACTION_COMPLETED_COUNT);
compactionsQueuedCounter = metricGroup.counter(COMPACTION_QUEUED_COUNT);
compactionCompletedCounter = metricGroup.counter(COMPACTION_COMPLETED_COUNT);
compactionQueuedCounter = metricGroup.counter(COMPACTION_QUEUED_COUNT);
compactionDropDeletedRecordCounter =
metricGroup.counter(COMPACTION_DROP_DELETED_RECORD_COUNT);
}

private LongStream getLevel0FileCountStream() {
Expand Down Expand Up @@ -135,6 +140,8 @@ public interface Reporter {

void decreaseCompactionsQueuedCount();

void reportDropDeletedRecordCount(long dropDeletedRecordCount);

void reportCompactionInputSize(long bytes);

void reportCompactionOutputSize(long bytes);
Expand Down Expand Up @@ -188,17 +195,22 @@ public void reportLevel0FileCount(long count) {

@Override
public void increaseCompactionsCompletedCount() {
compactionsCompletedCounter.inc();
compactionCompletedCounter.inc();
}

@Override
public void increaseCompactionsQueuedCount() {
compactionsQueuedCounter.inc();
compactionQueuedCounter.inc();
}

@Override
public void decreaseCompactionsQueuedCount() {
compactionsQueuedCounter.dec();
compactionQueuedCounter.dec();
}

@Override
public void reportDropDeletedRecordCount(long dropDeletedRecordCount) {
compactionDropDeletedRecordCounter.inc(dropDeletedRecordCount);
}

@Override
Expand Down

0 comments on commit 7dae1ad

Please sign in to comment.