From e4d60b8732f24b955976a672b6987ec70dcb73ef Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Fri, 6 Oct 2023 20:52:07 +0800 Subject: [PATCH] add memory buffer preempt count --- .../org/apache/paimon/memory/MemoryPoolFactory.java | 7 +++++++ .../paimon/operation/AbstractFileStoreWrite.java | 11 +++++++++++ .../apache/paimon/operation/MemoryFileStoreWrite.java | 1 + .../java/org/apache/paimon/table/sink/TableWrite.java | 1 + .../org/apache/paimon/table/sink/TableWriteImpl.java | 5 +++++ .../org/apache/paimon/flink/sink/StoreSinkWrite.java | 2 ++ .../apache/paimon/flink/sink/StoreSinkWriteImpl.java | 5 +++++ .../apache/paimon/flink/sink/TableWriteOperator.java | 5 +++++ 8 files changed, 37 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java index 8770348e4157..3faeac761390 100644 --- a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java @@ -37,6 +37,8 @@ public class MemoryPoolFactory { private Iterable owners; + private long memoryPreemptCount; + public MemoryPoolFactory(MemorySegmentPool innerPool) { this.innerPool = innerPool; this.totalPages = innerPool.freePages(); @@ -57,6 +59,10 @@ public void notifyNewOwner(MemoryOwner owner) { owner.setMemoryPool(createSubPool(owner)); } + public long getMemoryPreemptCount() { + return memoryPreemptCount; + } + @VisibleForTesting public Iterable memoryOwners() { return owners; @@ -81,6 +87,7 @@ private void preemptMemory(MemoryOwner owner) { if (max != null) { try { max.flushMemory(); + memoryPreemptCount++; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index ad2fbb720d97..c8d35c0dec25 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -74,6 +74,8 @@ public abstract class AbstractFileStoreWrite private boolean ignorePreviousFiles = false; protected boolean isStreamingMode = false; + private MemoryPoolFactory memoryPoolFactory; + protected AbstractFileStoreWrite( String commitUser, SnapshotManager snapshotManager, @@ -95,9 +97,18 @@ public FileStoreWrite withIOManager(IOManager ioManager) { @Override public FileStoreWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) { + this.memoryPoolFactory = memoryPoolFactory; return this; } + public long getMemoryPreemptCount() { + if (memoryPoolFactory != null) { + return this.memoryPoolFactory.getMemoryPreemptCount(); + } else { + return -1L; + } + } + @Override public void withIgnorePreviousFiles(boolean ignorePreviousFiles) { this.ignorePreviousFiles = ignorePreviousFiles; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index 84c00756c265..0d3efb1ceda9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -68,6 +68,7 @@ public MemoryFileStoreWrite( @Override public FileStoreWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) { + super.withMemoryPoolFactory(memoryPoolFactory); this.writeBufferPool = memoryPoolFactory.addOwners(this::memoryOwners); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java index 57bfbd190624..aea6112ede28 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java @@ -32,6 +32,7 @@ */ @Public public interface TableWrite extends AutoCloseable { + long getMemoryPreemptCount(); /** With {@link IOManager}, this is needed if 'write-buffer-spillable' is set to true. */ TableWrite withIOManager(IOManager ioManager); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index d3fe25007f07..7820a6cb7380 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -70,6 +70,11 @@ public TableWriteImpl isStreamingMode(boolean isStreamingMode) { return this; } + @Override + public long getMemoryPreemptCount() { + return write.getMemoryPreemptCount(); + } + @Override public TableWriteImpl withIOManager(IOManager ioManager) { write.withIOManager(ioManager); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java index f7acabb812d9..d04684149aaa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java @@ -40,6 +40,8 @@ public interface StoreSinkWrite { SinkRecord write(InternalRow rowData) throws Exception; + long getMemoryPreemptCount(); + SinkRecord toLogRecord(SinkRecord record); void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index d1baa5e85911..75b293a66cad 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -159,6 +159,11 @@ public SinkRecord write(InternalRow rowData) throws Exception { return write.writeAndReturn(rowData); } + @Override + public long getMemoryPreemptCount() { + return write.getMemoryPreemptCount(); + } + @Override public SinkRecord toLogRecord(SinkRecord record) { return write.toLogRecord(record); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index 860116081d98..da71b8db9b0b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -92,6 +92,11 @@ void initStateAndWriter( state = new StoreSinkWriteState(context, stateFilter); write = storeSinkWriteProvider.provide(table, commitUser, state, ioManager, memoryPool); + addWriterMetric(); + } + + void addWriterMetric() { + getMetricGroup().gauge("writer_preempt_count", () -> write.getMemoryPreemptCount()); } protected abstract boolean containLogSystem();