Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Nov 21, 2023
1 parent ca0b6be commit 0ebfa85
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ public void sync() throws Exception {

@Override
public void close() throws Exception {
if (writerMetrics != null) {
writerMetrics.close();
}
// cancel compaction so that it does not block job cancelling
compactManager.cancelCompaction();
sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ private void trySyncLatestCompaction(boolean blocking) throws Exception {

@Override
public void close() throws Exception {
if (writerMetrics != null) {
writerMetrics.close();
}
// cancel compaction so that it does not block job cancelling
compactManager.cancelCompaction();
sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.WriterBufferMetric;
import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
Expand All @@ -55,7 +54,6 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

/**
* Base {@link FileStoreWrite} implementation.
Expand All @@ -82,11 +80,9 @@ public abstract class AbstractFileStoreWrite<T>
protected boolean isStreamingMode = false;
private MetricRegistry metricRegistry = null;

private final String tableName;
protected final String tableName;
private final FileStorePathFactory pathFactory;

protected WriterBufferMetric writerBufferMetric;

protected AbstractFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
Expand Down Expand Up @@ -115,8 +111,6 @@ public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFacto
return this;
}

abstract Supplier<MemoryPoolFactory> writeBufferPoolSupplier();

@Override
public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
this.ignorePreviousFiles = ignorePreviousFiles;
Expand Down Expand Up @@ -365,11 +359,6 @@ public void isStreamingMode(boolean isStreamingMode) {
@Override
public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier = writeBufferPoolSupplier();
if (metricRegistry != null) {
writerBufferMetric =
new WriterBufferMetric(memoryPoolFactorySupplier, metricRegistry, tableName);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.WriterBufferMetric;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
Expand All @@ -37,7 +39,6 @@

import java.util.Iterator;
import java.util.Map;
import java.util.function.Supplier;

import static org.apache.paimon.CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE;

Expand All @@ -54,6 +55,8 @@ public abstract class MemoryFileStoreWrite<T> extends AbstractFileStoreWrite<T>
protected final CacheManager cacheManager;
private MemoryPoolFactory writeBufferPool;

private WriterBufferMetric writerBufferMetric;

public MemoryFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
Expand Down Expand Up @@ -116,7 +119,25 @@ protected void notifyNewWriter(RecordWriter<T> writer) {
writeBufferPool.notifyNewOwner((MemoryOwner) writer);
}

public Supplier<MemoryPoolFactory> writeBufferPoolSupplier() {
return () -> writeBufferPool;
@Override
public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) {
super.withMetricRegistry(metricRegistry);
registerWriterBufferMetric(metricRegistry);
return this;
}

private void registerWriterBufferMetric(MetricRegistry metricRegistry) {
if (metricRegistry != null) {
writerBufferMetric =
new WriterBufferMetric(() -> writeBufferPool, metricRegistry, tableName);
}
}

@Override
public void close() throws Exception {
super.close();
if (this.writerBufferMetric != null) {
this.writerBufferMetric.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public class WriterBufferMetric {
private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte";
private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte";

private MetricGroup metricGroup;

public WriterBufferMetric(
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
MetricRegistry metricRegistry,
String tableName) {
MetricGroup metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
metricGroup.gauge(
BUFFER_PREEMPT_COUNT,
() ->
Expand All @@ -59,4 +61,8 @@ private long getMetricValue(
MemoryPoolFactory memoryPoolFactory = memoryPoolFactorySupplier.get();
return memoryPoolFactory == null ? -1 : function.apply(memoryPoolFactory);
}

public void close() {
this.metricGroup.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@ public class WriterMetrics {

private final Histogram prepareCommitCostMillis;

private MetricGroup metricGroup;

public WriterMetrics(MetricRegistry registry, String tableName, String parition, int bucket) {
MetricGroup metricGroup =
registry.bucketMetricGroup(GROUP_NAME, tableName, parition, bucket);
metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName, parition, bucket);
writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM);

// cost
bufferFlushCostMillis = metricGroup.histogram(FLUSH_COST_MILLIS, WINDOW_SAMPLE_SIZE);

// prepareCommittime
prepareCommitCostMillis =
metricGroup.histogram(PREPARE_COMMIT_COST_MILLIS, WINDOW_SAMPLE_SIZE);
}
Expand All @@ -65,4 +64,8 @@ public void updateBufferFlushCostMillis(long bufferFlushCost) {
public void updatePrepareCommitCostMillis(long cost) {
this.prepareCommitCostMillis.update(cost);
}

public void close() {
metricGroup.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
Expand Down Expand Up @@ -92,10 +91,9 @@ void initStateAndWriter(
// runtime context, we can test to construct a writer here
state = new StoreSinkWriteState(context, stateFilter);

OperatorMetricGroup metricGroup = getMetricGroup();
write =
storeSinkWriteProvider.provide(
table, commitUser, state, ioManager, memoryPool, metricGroup);
table, commitUser, state, ioManager, memoryPool, getMetricGroup());
}

protected abstract boolean containLogSystem();
Expand Down

0 comments on commit 0ebfa85

Please sign in to comment.