diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index de0a28c33417..b73be6941e54 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -289,6 +289,9 @@ public void sync() throws Exception { @Override public void withInsertOnly(boolean insertOnly) { + if (this.isInsertOnly == insertOnly) { + return; + } if (insertOnly && writeBuffer != null && writeBuffer.size() > 0) { throw new IllegalStateException( "Insert-only can only be set before any record is received."); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index bdaf7bc327be..709c2be63c85 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -65,6 +65,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { @Nullable private final MetricGroup metricGroup; + @Nullable private Boolean lastInsertOnly; + public StoreSinkWriteImpl( FileStoreTable table, String commitUser, @@ -154,15 +156,21 @@ private TableWriteImpl newTableWrite(FileStoreTable table) { } if (memoryPoolFactory != null) { - return tableWrite.withMemoryPoolFactory(memoryPoolFactory); + tableWrite.withMemoryPoolFactory(memoryPoolFactory); } else { - return tableWrite.withMemoryPool( + tableWrite.withMemoryPool( memoryPool != null ? memoryPool : new HeapMemorySegmentPool( table.coreOptions().writeBufferSize(), table.coreOptions().pageSize())); } + + if (lastInsertOnly != null) { + tableWrite.withInsertOnly(lastInsertOnly); + } + + return tableWrite; } public void withCompactExecutor(ExecutorService compactExecutor) { @@ -171,6 +179,7 @@ public void withCompactExecutor(ExecutorService compactExecutor) { @Override public void withInsertOnly(boolean insertOnly) { + this.lastInsertOnly = insertOnly; write.withInsertOnly(insertOnly); }