Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce drop-deleted-record-count compaction metric. #4487

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface RecordReader<T> extends Closeable {
@Nullable
RecordIterator<T> readBatch() throws IOException;

/** Return the number of records that skipped by this reader. */
default long skippedRecordCount() {
return 0L;
}

/** Closes the reader and should release all resources. */
@Override
void close() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class CompactResult {

@Nullable private CompactDeletionFile deletionFile;

private long dropDeletedRecordCount;

public CompactResult() {
this(Collections.emptyList(), Collections.emptyList());
}
Expand Down Expand Up @@ -75,10 +77,19 @@ public CompactDeletionFile deletionFile() {
return deletionFile;
}

public void setDropDeletedRecordCount(long dropDeletedRecordCount) {
this.dropDeletedRecordCount = dropDeletedRecordCount;
}

public long getDropDeletedRecordCount() {
return dropDeletedRecordCount;
}

public void merge(CompactResult that) {
before.addAll(that.before);
after.addAll(that.after);
changelog.addAll(that.changelog);
dropDeletedRecordCount += that.dropDeletedRecordCount;

if (deletionFile != null || that.deletionFile != null) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public CompactResult call() throws Exception {
.map(DataFileMeta::fileSize)
.reduce(Long::sum)
.orElse(0L));
reportDropDeletedRecordCount(result.getDropDeletedRecordCount());
}
},
LOG);
Expand All @@ -78,6 +79,12 @@ public CompactResult call() throws Exception {
}
}

private void reportDropDeletedRecordCount(long dropDeletedRecordCount) {
if (metricsReporter != null) {
metricsReporter.reportDropDeletedRecordCount(dropDeletedRecordCount);
}
}

private void decreaseCompactionsQueuedCount() {
if (metricsReporter != null) {
metricsReporter.decreaseCompactionsQueuedCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/
public class DropDeleteReader implements RecordReader<KeyValue> {

private long dropDeleteRecordCount = 0L;
private final RecordReader<KeyValue> reader;

public DropDeleteReader(RecordReader<KeyValue> reader) {
Expand All @@ -58,6 +59,7 @@ public KeyValue next() throws IOException {
if (kv.isAdd()) {
return kv;
}
++dropDeleteRecordCount;
}
}

Expand All @@ -68,6 +70,11 @@ public void releaseBatch() {
};
}

@Override
public long skippedRecordCount() {
return dropDeleteRecordCount;
}

@Override
public void close() throws IOException {
reader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private CompactResult rewriteOrProduceChangelog(
RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;
Exception collectedExceptions = null;
long dropDeletedRecordCount = 0L;

try {
iterator =
Expand All @@ -143,10 +144,12 @@ private CompactResult rewriteOrProduceChangelog(
while (iterator.hasNext()) {
ChangelogResult result = iterator.next();
KeyValue keyValue = result.result();
if (compactFileWriter != null
&& keyValue != null
&& (!dropDelete || keyValue.isAdd())) {
compactFileWriter.write(keyValue);
if (compactFileWriter != null && keyValue != null) {
if ((!dropDelete || keyValue.isAdd())) {
compactFileWriter.write(keyValue);
} else {
++dropDeletedRecordCount;
}
}
if (produceChangelog) {
for (KeyValue kv : result.changelogs()) {
Expand Down Expand Up @@ -190,7 +193,11 @@ private CompactResult rewriteOrProduceChangelog(
changelogFileWriter != null
? changelogFileWriter.result()
: Collections.emptyList();
return new CompactResult(before, after, changelogFiles);
CompactResult compactResult = new CompactResult(before, after, changelogFiles);
if (dropDelete) {
compactResult.setDropDeletedRecordCount(dropDeletedRecordCount);
}
return compactResult;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ protected CompactResult rewriteCompaction(

List<DataFileMeta> before = extractFilesFromSections(sections);
notifyRewriteCompactBefore(before);
return new CompactResult(before, writer.result());
CompactResult compactResult = new CompactResult(before, writer.result());
if (dropDelete && reader != null) {
compactResult.setDropDeletedRecordCount(reader.skippedRecordCount());
}
return compactResult;
}

protected <T> RecordReader<T> readerForMergeTree(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void rewrite(List<List<SortedRun>> candidate, CompactResult toUpdate) th
}
if (candidate.size() == 1) {
List<SortedRun> section = candidate.get(0);
if (section.size() == 0) {
if (section.isEmpty()) {
return;
} else if (section.size() == 1) {
for (DataFileMeta file : section.get(0).files()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class CompactionMetrics {
public static final String AVG_COMPACTION_TIME = "avgCompactionTime";
public static final String COMPACTION_COMPLETED_COUNT = "compactionCompletedCount";
public static final String COMPACTION_QUEUED_COUNT = "compactionQueuedCount";
public static final String COMPACTION_DROP_DELETED_RECORD_COUNT =
"compactionDropDeletedRecordCount";
public static final String MAX_COMPACTION_INPUT_SIZE = "maxCompactionInputSize";
public static final String MAX_COMPACTION_OUTPUT_SIZE = "maxCompactionOutputSize";
public static final String AVG_COMPACTION_INPUT_SIZE = "avgCompactionInputSize";
Expand All @@ -55,8 +57,9 @@ public class CompactionMetrics {
private final Map<PartitionAndBucket, ReporterImpl> reporters;
private final Map<Long, CompactTimer> compactTimers;
private final Queue<Long> compactionTimes;
private Counter compactionsCompletedCounter;
private Counter compactionsQueuedCounter;
private Counter compactionCompletedCounter;
private Counter compactionQueuedCounter;
private Counter compactionDropDeletedRecordCounter;

public CompactionMetrics(MetricRegistry registry, String tableName) {
this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
Expand Down Expand Up @@ -91,8 +94,10 @@ private void registerGenericCompactionMetrics() {
AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0));
metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum());

compactionsCompletedCounter = metricGroup.counter(COMPACTION_COMPLETED_COUNT);
compactionsQueuedCounter = metricGroup.counter(COMPACTION_QUEUED_COUNT);
compactionCompletedCounter = metricGroup.counter(COMPACTION_COMPLETED_COUNT);
compactionQueuedCounter = metricGroup.counter(COMPACTION_QUEUED_COUNT);
compactionDropDeletedRecordCounter =
metricGroup.counter(COMPACTION_DROP_DELETED_RECORD_COUNT);
}

private LongStream getLevel0FileCountStream() {
Expand Down Expand Up @@ -135,6 +140,8 @@ public interface Reporter {

void decreaseCompactionsQueuedCount();

void reportDropDeletedRecordCount(long dropDeletedRecordCount);

void reportCompactionInputSize(long bytes);

void reportCompactionOutputSize(long bytes);
Expand Down Expand Up @@ -188,17 +195,22 @@ public void reportLevel0FileCount(long count) {

@Override
public void increaseCompactionsCompletedCount() {
compactionsCompletedCounter.inc();
compactionCompletedCounter.inc();
}

@Override
public void increaseCompactionsQueuedCount() {
compactionsQueuedCounter.inc();
compactionQueuedCounter.inc();
}

@Override
public void decreaseCompactionsQueuedCount() {
compactionsQueuedCounter.dec();
compactionQueuedCounter.dec();
}

@Override
public void reportDropDeletedRecordCount(long dropDeletedRecordCount) {
compactionDropDeletedRecordCounter.inc(dropDeletedRecordCount);
}

@Override
Expand Down
Loading