diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index 2af731eb3a7e..86f72fe36b49 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -19,20 +19,25 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import java.io.IOException; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; @@ -52,6 +57,7 @@ public class StoreCompactOperator extends PrepareCommitOperator> waitToCompact; public StoreCompactOperator( FileStoreTable table, @@ -77,30 +83,39 @@ public void initializeState(StateInitializationContext context) throws Exception StateUtils.getSingleValueFromState( context, "commit_user_state", String.class, initialCommitUser); - state = - new StoreSinkWriteState( - context, - (tableName, partition, bucket) -> - ChannelComputer.select( - partition, - bucket, - getRuntimeContext().getNumberOfParallelSubtasks()) - == getRuntimeContext().getIndexOfThisSubtask()); + initStateAndWriter( + context, + (tableName, partition, bucket) -> + ChannelComputer.select( + partition, + bucket, + getRuntimeContext().getNumberOfParallelSubtasks()) + == getRuntimeContext().getIndexOfThisSubtask(), + getContainingTask().getEnvironment().getIOManager(), + commitUser); + } + + @VisibleForTesting + void initStateAndWriter( + StateInitializationContext context, + StoreSinkWriteState.StateValueFilter stateFilter, + IOManager ioManager, + String commitUser) + throws Exception { + // We put state and write init in this method for convenient testing. Without construct a + // runtime context, we can test to construct a writer here + state = new StoreSinkWriteState(context, stateFilter); write = storeSinkWriteProvider.provide( - table, - commitUser, - state, - getContainingTask().getEnvironment().getIOManager(), - memoryPool, - getMetricGroup()); + table, commitUser, state, ioManager, memoryPool, getMetricGroup()); } @Override public void open() throws Exception { super.open(); dataFileMetaSerializer = new DataFileMetaSerializer(); + waitToCompact = new LinkedHashSet<>(); } @Override @@ -115,19 +130,31 @@ public void processElement(StreamRecord element) throws Exception { if (write.streamingMode()) { write.notifyNewFiles(snapshotId, partition, bucket, files); - write.compact(partition, bucket, false); } else { Preconditions.checkArgument( files.isEmpty(), "Batch compact job does not concern what files are compacted. " + "They only need to know what buckets are compacted."); - write.compact(partition, bucket, true); } + + waitToCompact.add(Pair.of(partition, bucket)); } @Override protected List prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { + + try { + for (Pair partitionBucket : waitToCompact) { + write.compact( + partitionBucket.getKey(), + partitionBucket.getRight(), + !write.streamingMode()); + } + } catch (Exception e) { + throw new RuntimeException("Exception happens while executing compaction.", e); + } + waitToCompact.clear(); return write.prepareCommit(waitCompaction, checkpointId); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java new file mode 100644 index 000000000000..c65190fd4a24 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.FlinkRowData; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.SinkRecord; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.List; + +/** Test for {@link StoreCompactOperator}. */ +public class StoreCompactOperatorTest extends TableTestBase { + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCompactExactlyOnce(boolean streamingMode) throws Exception { + createTableDefault(); + + CompactRememberStoreWrite compactRememberStoreWrite = + new CompactRememberStoreWrite(streamingMode); + StoreCompactOperator storeCompactOperator = + new StoreCompactOperator( + (FileStoreTable) getTableDefault(), + (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + compactRememberStoreWrite, + "10086"); + storeCompactOperator.open(); + StateInitializationContextImpl context = + new StateInitializationContextImpl( + null, + new MockOperatorStateStore() { + @Override + public ListState getUnionListState( + ListStateDescriptor stateDescriptor) throws Exception { + return getListState(stateDescriptor); + } + }, + null, + null, + null); + storeCompactOperator.initStateAndWriter( + context, (a, b, c) -> true, new IOManagerAsync(), "123"); + + storeCompactOperator.processElement(new StreamRecord<>(data(0))); + storeCompactOperator.processElement(new StreamRecord<>(data(0))); + storeCompactOperator.processElement(new StreamRecord<>(data(1))); + storeCompactOperator.processElement(new StreamRecord<>(data(1))); + storeCompactOperator.processElement(new StreamRecord<>(data(2))); + storeCompactOperator.prepareCommit(true, 1); + + Assertions.assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3); + } + + private RowData data(int bucket) { + GenericRow genericRow = + GenericRow.of( + 0L, + BinaryRow.EMPTY_ROW.toBytes(), + bucket, + new byte[] {0x00, 0x00, 0x00, 0x00}); + return new FlinkRowData(genericRow); + } + + private static class CompactRememberStoreWrite implements StoreSinkWrite { + + private final boolean streamingMode; + private int compactTime = 0; + + public CompactRememberStoreWrite(boolean streamingMode) { + this.streamingMode = streamingMode; + } + + @Override + public SinkRecord write(InternalRow rowData) { + return null; + } + + @Override + public SinkRecord toLogRecord(SinkRecord record) { + return null; + } + + @Override + public void compact(BinaryRow partition, int bucket, boolean fullCompaction) { + compactTime++; + } + + @Override + public void notifyNewFiles( + long snapshotId, BinaryRow partition, int bucket, List files) {} + + @Override + public List prepareCommit(boolean waitCompaction, long checkpointId) { + return null; + } + + @Override + public void snapshotState() {} + + @Override + public boolean streamingMode() { + return streamingMode; + } + + @Override + public void close() {} + + @Override + public void replace(FileStoreTable newTable) {} + } +}