Skip to content

Commit

Permalink
writer metric
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Nov 12, 2023
1 parent 99e28b1 commit c934cac
Show file tree
Hide file tree
Showing 31 changed files with 533 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;

private WriterMetrics writerMetrics;

public AppendOnlyWriter(
FileIO fileIO,
IOManager ioManager,
Expand All @@ -89,7 +92,8 @@ public AppendOnlyWriter(
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
FieldStatsCollector.Factory[] statsCollectors) {
FieldStatsCollector.Factory[] statsCollectors,
WriterMetrics writerMetrics) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
Expand All @@ -114,10 +118,12 @@ public AppendOnlyWriter(
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
}
this.writerMetrics = writerMetrics;
}

@Override
public void write(InternalRow rowData) throws Exception {
long start = System.currentTimeMillis();
Preconditions.checkArgument(
rowData.getRowKind() == RowKind.INSERT,
"Append-only writer can only accept insert row kind, but current row kind is: %s",
Expand All @@ -133,6 +139,11 @@ public void write(InternalRow rowData) throws Exception {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}

if (writerMetrics != null) {
writerMetrics.incWriteRecordNum();
writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start);
}
}

@Override
Expand All @@ -152,20 +163,29 @@ public Collection<DataFileMeta> dataFiles() {

@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
long start = System.currentTimeMillis();
flush(false, false);
trySyncLatestCompaction(waitCompaction || forceCompact);
return drainIncrement();
CommitIncrement increment = drainIncrement();
if (writerMetrics != null) {
writerMetrics.updatePrepareCommitCostMS(System.currentTimeMillis() - start);
}
return increment;
}

private void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
long start = System.currentTimeMillis();
List<DataFileMeta> flushedFiles = sinkWriter.flush();

// add new generated files
flushedFiles.forEach(compactManager::addNewFile);
trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
newFiles.addAll(flushedFiles);
if (writerMetrics != null) {
writerMetrics.updateBufferFlushCostMS(System.currentTimeMillis() - start);
}
}

@Override
Expand Down Expand Up @@ -204,13 +224,15 @@ private RowDataRollingFileWriter createRollingRowWriter() {

private void trySyncLatestCompaction(boolean blocking)
throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
compactManager
.getCompactionResult(blocking)
.ifPresent(
result -> {
compactBefore.addAll(result.before());
compactAfter.addAll(result.after());
});
writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start);
}

private CommitIncrement drainIncrement() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ public class MemoryPoolFactory {

private Iterable<MemoryOwner> owners;

private final long totalBufferSize;
private long bufferPreemptCount;

public MemoryPoolFactory(MemorySegmentPool innerPool) {
this.innerPool = innerPool;
this.totalPages = innerPool.freePages();
this.totalBufferSize = (long) totalPages * innerPool.pageSize();
}

public MemoryPoolFactory addOwners(Iterable<MemoryOwner> newOwners) {
Expand Down Expand Up @@ -81,12 +85,38 @@ private void preemptMemory(MemoryOwner owner) {
if (max != null) {
try {
max.flushMemory();
++bufferPreemptCount;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

public BufferStat bufferStat() {
return new BufferStat();
}

/** stat for buffer metric. */
public class BufferStat {
public long bufferPreemptCount() {
return bufferPreemptCount;
}

public long usedBufferSize() {
long usedBufferSize = 0L;
if (owners != null) {
for (MemoryOwner owner : owners) {
usedBufferSize += owner.memoryOccupancy();
}
}
return usedBufferSize;
}

public long totalBufferSize() {
return totalBufferSize;
}
}

private class OwnerMemoryPool implements MemorySegmentPool {

private final MemoryOwner owner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.RecordWriter;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private long newSequenceNumber;
private WriteBuffer writeBuffer;

private WriterMetrics writerMetrics;

public MergeTreeWriter(
boolean writeBufferSpillable,
int sortMaxFan,
Expand All @@ -85,7 +88,8 @@ public MergeTreeWriter(
KeyValueFileWriterFactory writerFactory,
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment) {
@Nullable CommitIncrement increment,
WriterMetrics writerMetrics) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
this.ioManager = ioManager;
Expand Down Expand Up @@ -114,6 +118,7 @@ public MergeTreeWriter(
compactAfter.addAll(increment.compactIncrement().compactAfter());
compactChangelog.addAll(increment.compactIncrement().changelogFiles());
}
this.writerMetrics = writerMetrics;
}

private long newSequenceNumber() {
Expand All @@ -139,10 +144,12 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {

@Override
public void write(KeyValue kv) throws Exception {
long start = System.currentTimeMillis();
long sequenceNumber =
kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
? newSequenceNumber()
: kv.sequenceNumber();

boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
flushWriteBuffer(false, false);
Expand All @@ -151,6 +158,11 @@ public void write(KeyValue kv) throws Exception {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}

if (writerMetrics != null) {
writerMetrics.incWriteRecordNum();
writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start);
}
}

@Override
Expand Down Expand Up @@ -183,6 +195,7 @@ public void flushMemory() throws Exception {

private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
long start = System.currentTimeMillis();
if (writeBuffer.size() > 0) {
if (compactManager.shouldWaitForLatestCompaction()) {
waitForLatestCompaction = true;
Expand Down Expand Up @@ -222,16 +235,26 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul

trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
if (writerMetrics != null) {
writerMetrics.updateBufferFlushCostMS(System.currentTimeMillis() - start);
}
}

@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
long start = System.currentTimeMillis();
flushWriteBuffer(waitCompaction, false);
trySyncLatestCompaction(
waitCompaction
|| commitForceCompact
|| compactManager.shouldWaitForPreparingCheckpoint());
return drainIncrement();

CommitIncrement increment = drainIncrement();

if (writerMetrics != null) {
writerMetrics.updatePrepareCommitCostMS(System.currentTimeMillis() - start);
}
return increment;
}

@Override
Expand Down Expand Up @@ -281,8 +304,10 @@ private void updateCompactResult(CompactResult result) {
}

private void trySyncLatestCompaction(boolean blocking) throws Exception {
long start = System.currentTimeMillis();
Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
result.ifPresent(this::updateCompactResult);
writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@ public abstract class MetricRegistry {
private static final String KEY_TABLE = "table";
private static final String KEY_PARTITION = "partition";
private static final String KEY_BUCKET = "bucket";
private static final String COMMIT_USER = "commit_user";

public MetricGroup tableMetricGroup(String groupName, String tableName) {
return tableMetricGroup(groupName, tableName, null);
}

public MetricGroup tableMetricGroup(String groupName, String tableName, String commitUser) {
Map<String, String> variables = new LinkedHashMap<>();
variables.put(KEY_TABLE, tableName);
if (commitUser != null) {
variables.put(COMMIT_USER, commitUser);
}

return createMetricGroup(groupName, variables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
Expand Down Expand Up @@ -77,7 +78,10 @@ public abstract class AbstractFileStoreWrite<T>
private boolean closeCompactExecutorWhenLeaving = true;
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;
private MetricRegistry metricRegistry = null;
private MetricRegistry metricRegistry;

protected WriterMetrics writerMetrics;

private final String tableName;
private final FileStorePathFactory pathFactory;

Expand Down Expand Up @@ -369,6 +373,14 @@ public CompactionMetrics getCompactionMetrics(BinaryRow partition, int bucket) {
return null;
}

@Nullable
public WriterMetrics getWriterMetrics() {
if (metricRegistry != null && writerMetrics == null) {
writerMetrics = new WriterMetrics(metricRegistry, tableName, commitUser);
}
return writerMetrics;
}

private String getPartitionString(FileStorePathFactory pathFactory, BinaryRow partition) {
String partitionStr =
pathFactory.getPartitionString(partition).replace(Path.SEPARATOR, "_");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -124,6 +125,9 @@ protected RecordWriter<InternalRow> createWriter(
targetFileSize,
compactRewriter(partition, bucket),
getCompactionMetrics(partition, bucket));
WriterMetrics writerMetrics = getWriterMetrics();
registerMemoryPoolMetric(writerMetrics);

return new AppendOnlyWriter(
fileIO,
ioManager,
Expand All @@ -139,7 +143,8 @@ protected RecordWriter<InternalRow> createWriter(
useWriteBuffer,
spillable,
fileCompression,
statsCollectors);
statsCollectors,
writerMetrics);
}

public AppendOnlyCompactManager.CompactRewriter compactRewriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.RecordWriter;
Expand Down Expand Up @@ -125,4 +126,6 @@ List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)

/** With metrics to measure compaction. */
FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);

WriterMetrics getWriterMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -170,6 +171,10 @@ protected MergeTreeWriter createWriter(
compactExecutor,
levels,
getCompactionMetrics(partition, bucket));

WriterMetrics writerMetrics = getWriterMetrics();
registerMemoryPoolMetric(writerMetrics);

return new MergeTreeWriter(
bufferSpillable(),
options.localSortMaxNumFileHandles(),
Expand All @@ -181,7 +186,8 @@ protected MergeTreeWriter createWriter(
writerFactory,
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement);
restoreIncrement,
writerMetrics);
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit c934cac

Please sign in to comment.