Skip to content

Commit

Permalink
[flink] fix compaction multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Dec 12, 2023
1 parent 5893dcc commit 629bbe4
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,25 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;

import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;

Expand All @@ -52,6 +57,7 @@ public class StoreCompactOperator extends PrepareCommitOperator<RowData, Committ
private transient StoreSinkWriteState state;
private transient StoreSinkWrite write;
private transient DataFileMetaSerializer dataFileMetaSerializer;
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;

public StoreCompactOperator(
FileStoreTable table,
Expand All @@ -77,30 +83,39 @@ public void initializeState(StateInitializationContext context) throws Exception
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);

state =
new StoreSinkWriteState(
context,
(tableName, partition, bucket) ->
ChannelComputer.select(
partition,
bucket,
getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask());
initStateAndWriter(
context,
(tableName, partition, bucket) ->
ChannelComputer.select(
partition,
bucket,
getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask(),
getContainingTask().getEnvironment().getIOManager(),
commitUser);
}

@VisibleForTesting
void initStateAndWriter(
StateInitializationContext context,
StoreSinkWriteState.StateValueFilter stateFilter,
IOManager ioManager,
String commitUser)
throws Exception {
// We put state and write init in this method for convenient testing. Without construct a
// runtime context, we can test to construct a writer here
state = new StoreSinkWriteState(context, stateFilter);

write =
storeSinkWriteProvider.provide(
table,
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
memoryPool,
getMetricGroup());
table, commitUser, state, ioManager, memoryPool, getMetricGroup());
}

@Override
public void open() throws Exception {
super.open();
dataFileMetaSerializer = new DataFileMetaSerializer();
waitToCompact = new LinkedHashSet<>();
}

@Override
Expand All @@ -115,19 +130,31 @@ public void processElement(StreamRecord<RowData> element) throws Exception {

if (write.streamingMode()) {
write.notifyNewFiles(snapshotId, partition, bucket, files);
write.compact(partition, bucket, false);
} else {
Preconditions.checkArgument(
files.isEmpty(),
"Batch compact job does not concern what files are compacted. "
+ "They only need to know what buckets are compacted.");
write.compact(partition, bucket, true);
}

waitToCompact.add(Pair.of(partition, bucket));
}

@Override
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException {

try {
for (Pair<BinaryRow, Integer> partitionBucket : waitToCompact) {
write.compact(
partitionBucket.getKey(),
partitionBucket.getRight(),
!write.streamingMode());
}
} catch (Exception e) {
throw new RuntimeException("Exception happens while executing compaction.", e);
}
waitToCompact.clear();
return write.prepareCommit(waitCompaction, checkpointId);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.SinkRecord;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.List;

/** Test for {@link StoreCompactOperator}. */
public class StoreCompactOperatorTest extends TableTestBase {

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCompactExactlyOnce(boolean streamingMode) throws Exception {
createTableDefault();

CompactRememberStoreWrite compactRememberStoreWrite =
new CompactRememberStoreWrite(streamingMode);
StoreCompactOperator storeCompactOperator =
new StoreCompactOperator(
(FileStoreTable) getTableDefault(),
(table, commitUser, state, ioManager, memoryPool, metricGroup) ->
compactRememberStoreWrite,
"10086");
storeCompactOperator.open();
StateInitializationContextImpl context =
new StateInitializationContextImpl(
null,
new MockOperatorStateStore() {
@Override
public <S> ListState<S> getUnionListState(
ListStateDescriptor<S> stateDescriptor) throws Exception {
return getListState(stateDescriptor);
}
},
null,
null,
null);
storeCompactOperator.initStateAndWriter(
context, (a, b, c) -> true, new IOManagerAsync(), "123");

storeCompactOperator.processElement(new StreamRecord<>(data(0)));
storeCompactOperator.processElement(new StreamRecord<>(data(0)));
storeCompactOperator.processElement(new StreamRecord<>(data(1)));
storeCompactOperator.processElement(new StreamRecord<>(data(1)));
storeCompactOperator.processElement(new StreamRecord<>(data(2)));
storeCompactOperator.prepareCommit(true, 1);

Assertions.assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
}

private RowData data(int bucket) {
GenericRow genericRow =
GenericRow.of(
0L,
BinaryRow.EMPTY_ROW.toBytes(),
bucket,
new byte[] {0x00, 0x00, 0x00, 0x00});
return new FlinkRowData(genericRow);
}

private static class CompactRememberStoreWrite implements StoreSinkWrite {

private final boolean streamingMode;
private int compactTime = 0;

public CompactRememberStoreWrite(boolean streamingMode) {
this.streamingMode = streamingMode;
}

@Override
public SinkRecord write(InternalRow rowData) {
return null;
}

@Override
public SinkRecord toLogRecord(SinkRecord record) {
return null;
}

@Override
public void compact(BinaryRow partition, int bucket, boolean fullCompaction) {
compactTime++;
}

@Override
public void notifyNewFiles(
long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {}

@Override
public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) {
return null;
}

@Override
public void snapshotState() {}

@Override
public boolean streamingMode() {
return streamingMode;
}

@Override
public void close() {}

@Override
public void replace(FileStoreTable newTable) {}
}
}

0 comments on commit 629bbe4

Please sign in to comment.