Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][bug] Write Spillable with memory preemption has bug : preempt memory will cause writer flush memory to disk which should not happen #2940

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ public boolean isCompacting() {
return compactManager.isCompacting();
}

private void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
@VisibleForTesting
void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception {
long start = System.currentTimeMillis();
List<DataFileMeta> flushedFiles = sinkWriter.flush();

Expand Down Expand Up @@ -279,7 +279,10 @@ public long memoryOccupancy() {

@Override
public void flushMemory() throws Exception {
flush(false, false);
boolean success = sinkWriter.flushMemory();
if (!success) {
flush(false, false);
}
}

@VisibleForTesting
Expand All @@ -303,6 +306,8 @@ private interface SinkWriter {

List<DataFileMeta> flush() throws IOException;

boolean flushMemory() throws IOException;

long memoryOccupancy();

void close();
Expand Down Expand Up @@ -340,6 +345,11 @@ public List<DataFileMeta> flush() throws IOException {
return flushedFiles;
}

@Override
public boolean flushMemory() throws IOException {
return false;
}

@Override
public long memoryOccupancy() {
return 0;
Expand Down Expand Up @@ -430,5 +440,10 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
public boolean bufferSpillableWriter() {
return spillable;
}

@Override
public boolean flushMemory() throws IOException {
return writeBuffer.flushMemory();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public void reset() {
addCompleted = false;
}

@Override
public boolean flushMemory() throws IOException {
spill();
return true;
}

@Override
public boolean put(InternalRow row) throws IOException {
checkState(!addCompleted, "This buffer has add completed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public void reset() {
}
}

@Override
public boolean flushMemory() throws IOException {
return false;
}

private void returnToSegmentPool() {
pool.returnAll(this.recordBufferSegments);
this.recordBufferSegments.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface RowBuffer {

void reset();

boolean flushMemory() throws IOException;

RowBufferIterator newIterator();

/** Iterator to fetch record from buffer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.DataType;
Expand All @@ -54,6 +55,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -335,6 +337,60 @@ public void testExternalBufferWorks() throws Exception {
writer.close();
}

@Test
public void testSpillWorksAndMoreSmallFilesGenerated() throws Exception {
List<AppendOnlyWriter> writers = new ArrayList<>();
HeapMemorySegmentPool heapMemorySegmentPool = new HeapMemorySegmentPool(2501024L, 1024);
MemoryPoolFactory memoryPoolFactory = new MemoryPoolFactory(heapMemorySegmentPool);
for (int i = 0; i < 1000; i++) {
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
memoryPoolFactory.addOwners(Arrays.asList(writer));
memoryPoolFactory.notifyNewOwner(writer);
writers.add(writer);
}

char[] s = new char[1024];
Arrays.fill(s, 'a');

for (AppendOnlyWriter writer : writers) {
writer.write(row(0, String.valueOf("a"), PART));
}

for (AppendOnlyWriter writer : writers) {
writer.write(row(0, String.valueOf(s), PART));
}

for (int j = 0; j < 100; j++) {
for (AppendOnlyWriter writer : writers) {
writer.write(row(j, String.valueOf(s), PART));
writer.write(row(j, String.valueOf(s), PART));
writer.write(row(j, String.valueOf(s), PART));
writer.write(row(j, String.valueOf(s), PART));
writer.write(row(j, String.valueOf(s), PART));
}
}

writers.forEach(
writer -> {
try {
List<DataFileMeta> fileMetas =
writer.prepareCommit(false).newFilesIncrement().newFiles();
assertThat(fileMetas.size()).isEqualTo(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

writers.forEach(
writer -> {
try {
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

@Test
public void testNoBuffer() throws Exception {
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE);
Expand Down Expand Up @@ -363,7 +419,7 @@ public void testMultipleFlush() throws Exception {
writer.write(row(j, String.valueOf(s), PART));
}

writer.flushMemory();
writer.flush(false, false);
Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L);
Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);
Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(0);
Expand All @@ -374,7 +430,7 @@ public void testMultipleFlush() throws Exception {
for (int j = 0; j < 100; j++) {
writer.write(row(j, String.valueOf(s), PART));
}
writer.flushMemory();
writer.flush(false, false);

Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L);
Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);
Expand Down
Loading