Skip to content

Commit

Permalink
add memory buffer preempt count
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Oct 6, 2023
1 parent b76e76e commit e4d60b8
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class MemoryPoolFactory {

private Iterable<MemoryOwner> owners;

private long memoryPreemptCount;

public MemoryPoolFactory(MemorySegmentPool innerPool) {
this.innerPool = innerPool;
this.totalPages = innerPool.freePages();
Expand All @@ -57,6 +59,10 @@ public void notifyNewOwner(MemoryOwner owner) {
owner.setMemoryPool(createSubPool(owner));
}

public long getMemoryPreemptCount() {
return memoryPreemptCount;
}

@VisibleForTesting
public Iterable<MemoryOwner> memoryOwners() {
return owners;
Expand All @@ -81,6 +87,7 @@ private void preemptMemory(MemoryOwner owner) {
if (max != null) {
try {
max.flushMemory();
memoryPreemptCount++;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public abstract class AbstractFileStoreWrite<T>
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;

private MemoryPoolFactory memoryPoolFactory;

protected AbstractFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
Expand All @@ -95,9 +97,18 @@ public FileStoreWrite<T> withIOManager(IOManager ioManager) {

@Override
public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
this.memoryPoolFactory = memoryPoolFactory;
return this;
}

public long getMemoryPreemptCount() {
if (memoryPoolFactory != null) {
return this.memoryPoolFactory.getMemoryPreemptCount();
} else {
return -1L;
}
}

@Override
public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
this.ignorePreviousFiles = ignorePreviousFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public MemoryFileStoreWrite(

@Override
public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
super.withMemoryPoolFactory(memoryPoolFactory);
this.writeBufferPool = memoryPoolFactory.addOwners(this::memoryOwners);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/
@Public
public interface TableWrite extends AutoCloseable {
long getMemoryPreemptCount();

/** With {@link IOManager}, this is needed if 'write-buffer-spillable' is set to true. */
TableWrite withIOManager(IOManager ioManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public TableWriteImpl<T> isStreamingMode(boolean isStreamingMode) {
return this;
}

@Override
public long getMemoryPreemptCount() {
return write.getMemoryPreemptCount();
}

@Override
public TableWriteImpl<T> withIOManager(IOManager ioManager) {
write.withIOManager(ioManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public interface StoreSinkWrite {

SinkRecord write(InternalRow rowData) throws Exception;

long getMemoryPreemptCount();

SinkRecord toLogRecord(SinkRecord record);

void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public SinkRecord write(InternalRow rowData) throws Exception {
return write.writeAndReturn(rowData);
}

@Override
public long getMemoryPreemptCount() {
return write.getMemoryPreemptCount();
}

@Override
public SinkRecord toLogRecord(SinkRecord record) {
return write.toLogRecord(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ void initStateAndWriter(
state = new StoreSinkWriteState(context, stateFilter);

write = storeSinkWriteProvider.provide(table, commitUser, state, ioManager, memoryPool);
addWriterMetric();
}

void addWriterMetric() {
getMetricGroup().gauge("writer_preempt_count", () -> write.getMemoryPreemptCount());
}

protected abstract boolean containLogSystem();
Expand Down

0 comments on commit e4d60b8

Please sign in to comment.