From 4b61315599c0ccaca2b671c07083011d6b08ea9f Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 18 Oct 2024 15:46:47 +0800 Subject: [PATCH] [flink] add coordinate and worker operator for small changelog files compaction --- docs/content/maintenance/write-performance.md | 8 +- .../flink_connector_configuration.html | 6 +- .../paimon/flink/FlinkConnectorOptions.java | 16 +- .../ChangelogCompactCoordinateOperator.java | 177 ++++++++++++ .../changelog/ChangelogCompactTask.java | 255 ++++++++++++++++++ .../ChangelogCompactTaskSerializer.java | 113 ++++++++ .../ChangelogCompactWorkerOperator.java | 54 ++++ .../changelog/ChangelogTaskTypeInfo.java | 85 ++++++ .../apache/paimon/flink/sink/FlinkSink.java | 21 +- .../flink/PrimaryKeyFileStoreTableITCase.java | 104 ++++++- .../ChangelogCompactTaskSerializerTest.java | 94 +++++++ 11 files changed, 899 insertions(+), 34 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java diff --git a/docs/content/maintenance/write-performance.md b/docs/content/maintenance/write-performance.md index 4b3b5788b787..1456f79ede5b 100644 --- a/docs/content/maintenance/write-performance.md +++ b/docs/content/maintenance/write-performance.md @@ -167,9 +167,5 @@ If Flink's checkpoint interval is short (for example, 30 seconds) and the number each snapshot may produce lots of small changelog files. Too many files may put a burden on the distributed storage cluster. -In order to compact small changelog files into large ones, you can set the table option `changelog.compact.parallelism`. -This option will add a compact operator after the writer operator, which copies changelog files into large ones. -If the parallelism becomes larger, file copying will become faster. -However, the number of resulting files will also become larger. -As file copying is fast in most storage system, -we suggest that you start experimenting with `'changelog.compact.parallelism' = '1'` and increase the value if needed. +In order to compact small changelog files into large ones, you can set the table option `changelog.precommit-compact = true`. +Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer operator, which copies changelog files into large ones. diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 030eb7691f8d..f7c078c55f74 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -27,10 +27,10 @@ -
changelog.compact.parallelism
+
changelog.precommit-compact
(none) - Integer - Compact several changelog files from the same partition into one file, in order to decrease the number of small files. This property sets the parallelism of the compact operator. More parallelism means faster file copy, however the number of resulting files will also become larger. + Boolean + If true, it will compact several changelog files from the same partition into larger ones, in order to decrease the number of small files.
end-input.watermark
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 66b4c376238f..0d806a5d5889 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -397,16 +397,14 @@ public class FlinkConnectorOptions { .withDescription( "Optional endInput watermark used in case of batch mode or bounded stream."); - public static final ConfigOption CHANGELOG_COMPACT_PARALLELISM = - key("changelog.compact.parallelism") - .intType() - .noDefaultValue() + public static final ConfigOption CHANGELOG_PRECOMMIT_COMPACT = + key("changelog.precommit-compact") + .booleanType() + .defaultValue(false) .withDescription( - "Compact several changelog files from the same partition into one file, " - + "in order to decrease the number of small files. " - + "This property sets the parallelism of the compact operator. " - + "More parallelism means faster file copy, " - + "however the number of resulting files will also become larger."); + "If true, it will add a changelog compact coordinator and worker operator after the writer operator," + + "in order to compact several changelog files from the same partition into large ones, " + + "which can decrease the number of small files. "); public static List> getOptions() { final Field[] fields = FlinkConnectorOptions.class.getFields(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java new file mode 100644 index 000000000000..eae5683df9b8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Coordinator operator for compacting changelog files. + * + *

{@link UnawareAppendTableCompactionCoordinator} calculates the file size of changelog files + * contained in all buckets within each partition from {@link Committable} message emitted from + * writer operator. And emit {@link ChangelogCompactTask} to {@link ChangelogCompactWorkerOperator}. + */ +public class ChangelogCompactCoordinateOperator + extends AbstractStreamOperator> + implements OneInputStreamOperator>, + BoundedOneInput { + private final FileStoreTable table; + + private transient long checkpointId; + private transient Map partitionChangelogs; + + public ChangelogCompactCoordinateOperator(FileStoreTable table) { + this.table = table; + } + + @Override + public void open() throws Exception { + super.open(); + + checkpointId = Long.MIN_VALUE; + partitionChangelogs = new HashMap<>(); + } + + public void processElement(StreamRecord record) { + Committable committable = record.getValue(); + checkpointId = Math.max(checkpointId, committable.checkpointId()); + if (committable.kind() != Committable.Kind.FILE) { + output.collect(new StreamRecord<>(Either.Left(record.getValue()))); + return; + } + + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + if (message.newFilesIncrement().changelogFiles().isEmpty() + && message.compactIncrement().changelogFiles().isEmpty()) { + output.collect(new StreamRecord<>(Either.Left(record.getValue()))); + return; + } + + BinaryRow partition = message.partition(); + Integer bucket = message.bucket(); + for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) { + partitionChangelogs + .computeIfAbsent(partition, k -> new PartitionChangelog()) + .addNewChangelogFile(bucket, meta); + PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); + if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { + emitPartitionChanglogCompactTask(partition); + } + } + for (DataFileMeta meta : message.compactIncrement().changelogFiles()) { + partitionChangelogs + .computeIfAbsent(partition, k -> new PartitionChangelog()) + .addCompactChangelogFile(bucket, meta); + PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); + if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { + emitPartitionChanglogCompactTask(partition); + } + } + + CommitMessageImpl newMessage = + new CommitMessageImpl( + message.partition(), + message.bucket(), + new DataIncrement( + message.newFilesIncrement().newFiles(), + message.newFilesIncrement().deletedFiles(), + Collections.emptyList()), + new CompactIncrement( + message.compactIncrement().compactBefore(), + message.compactIncrement().compactAfter(), + Collections.emptyList()), + message.indexIncrement()); + Committable newCommittable = + new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); + output.collect(new StreamRecord<>(Either.Left(newCommittable))); + } + + public void prepareSnapshotPreBarrier(long checkpointId) { + emitAllPartitionsChanglogCompactTask(); + } + + public void endInput() { + emitAllPartitionsChanglogCompactTask(); + } + + private void emitPartitionChanglogCompactTask(BinaryRow partition) { + PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); + output.collect( + new StreamRecord<>( + Either.Right( + new ChangelogCompactTask( + checkpointId, + partition, + partitionChangelog.newFileChangelogFiles, + partitionChangelog.compactChangelogFiles)))); + partitionChangelogs.remove(partition); + } + + private void emitAllPartitionsChanglogCompactTask() { + List partitions = new ArrayList<>(partitionChangelogs.keySet()); + for (BinaryRow partition : partitions) { + emitPartitionChanglogCompactTask(partition); + } + } + + private static class PartitionChangelog { + private long totalFileSize; + private final Map> newFileChangelogFiles; + private final Map> compactChangelogFiles; + + public long totalFileSize() { + return totalFileSize; + } + + public PartitionChangelog() { + totalFileSize = 0; + newFileChangelogFiles = new HashMap<>(); + compactChangelogFiles = new HashMap<>(); + } + + public void addNewChangelogFile(Integer bucket, DataFileMeta file) { + totalFileSize += file.fileSize(); + newFileChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file); + } + + public void addCompactChangelogFile(Integer bucket, DataFileMeta file) { + totalFileSize += file.fileSize(); + compactChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java new file mode 100644 index 000000000000..d75ccea9b45a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.Preconditions; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +/** + * {@link ChangelogCompactTask} to compact several changelog files from the same partition into one + * file, in order to reduce the number of small files. + */ +public class ChangelogCompactTask implements Serializable { + private final long checkpointId; + private final BinaryRow partition; + private final Map> newFileChangelogFiles; + private final Map> compactChangelogFiles; + + private transient OutputStream outputStream; + private final transient List results = new ArrayList<>(); + + public ChangelogCompactTask( + long checkpointId, + BinaryRow partition, + Map> newFileChangelogFiles, + Map> compactChangelogFiles) { + this.checkpointId = checkpointId; + this.partition = partition; + this.newFileChangelogFiles = newFileChangelogFiles; + this.compactChangelogFiles = compactChangelogFiles; + } + + public long checkpointId() { + return checkpointId; + } + + public BinaryRow partition() { + return partition; + } + + public Map> newFileChangelogFiles() { + return newFileChangelogFiles; + } + + public Map> compactChangelogFiles() { + return compactChangelogFiles; + } + + public List doCompact(FileStoreTable table) throws Exception { + FileStorePathFactory pathFactory = table.store().pathFactory(); + + // copy all changelog files to a new big file + for (Map.Entry> entry : newFileChangelogFiles.entrySet()) { + Integer bucket = entry.getKey(); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, bucket); + for (DataFileMeta meta : entry.getValue()) { + copyFile(table, dataFilePathFactory.toPath(meta.fileName()), bucket, false, meta); + } + } + for (Map.Entry> entry : compactChangelogFiles.entrySet()) { + Integer bucket = entry.getKey(); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, bucket); + for (DataFileMeta meta : entry.getValue()) { + copyFile(table, dataFilePathFactory.toPath(meta.fileName()), bucket, true, meta); + } + } + outputStream.out.close(); + + return produceNewCommittables(table, pathFactory); + } + + private void copyFile( + FileStoreTable table, Path path, int bucket, boolean isCompactResult, DataFileMeta meta) + throws Exception { + if (outputStream == null) { + Path outputPath = + new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID()); + outputStream = + new OutputStream(outputPath, table.fileIO().newOutputStream(outputPath, false)); + } + long offset = outputStream.out.getPos(); + try (SeekableInputStream in = table.fileIO().newInputStream(path)) { + IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false); + } + table.fileIO().deleteQuietly(path); + results.add( + new Result( + bucket, isCompactResult, meta, offset, outputStream.out.getPos() - offset)); + } + + private List produceNewCommittables( + FileStoreTable table, FileStorePathFactory pathFactory) throws IOException { + Result baseResult = results.get(0); + Preconditions.checkArgument(baseResult.offset == 0); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, baseResult.bucket); + // see Java docs of `CompactedChangelogFormatReaderFactory` + String realName = + "compacted-changelog-" + + UUID.randomUUID() + + "$" + + baseResult.bucket + + "-" + + baseResult.length; + table.fileIO() + .rename( + outputStream.path, + dataFilePathFactory.toPath( + realName + + "." + + CompactedChangelogReadOnlyFormat.getIdentifier( + baseResult.meta.fileFormat()))); + + List newCommittables = new ArrayList<>(); + + Map> bucketedResults = new HashMap<>(); + for (Result result : results) { + bucketedResults.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result); + } + + for (Map.Entry> entry : bucketedResults.entrySet()) { + List newFilesChangelog = new ArrayList<>(); + List compactChangelog = new ArrayList<>(); + for (Result result : entry.getValue()) { + // see Java docs of `CompactedChangelogFormatReaderFactory` + String name = + (result.offset == 0 + ? realName + : realName + "-" + result.offset + "-" + result.length) + + "." + + CompactedChangelogReadOnlyFormat.getIdentifier( + result.meta.fileFormat()); + if (result.isCompactResult) { + compactChangelog.add(result.meta.rename(name)); + } else { + newFilesChangelog.add(result.meta.rename(name)); + } + } + + CommitMessageImpl newMessage = + new CommitMessageImpl( + partition, + entry.getKey(), + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + newFilesChangelog), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + compactChangelog)); + newCommittables.add(new Committable(checkpointId, Committable.Kind.FILE, newMessage)); + } + return newCommittables; + } + + public int hashCode() { + return Objects.hash(checkpointId, partition, newFileChangelogFiles, compactChangelogFiles); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ChangelogCompactTask that = (ChangelogCompactTask) o; + return checkpointId == that.checkpointId + && Objects.equals(partition, that.partition) + && Objects.equals(newFileChangelogFiles, that.newFileChangelogFiles) + && Objects.equals(compactChangelogFiles, that.compactChangelogFiles); + } + + @Override + public String toString() { + return String.format( + "ChangelogCompactionTask {" + + "partition = %s, " + + "newFileChangelogFiles = %s, " + + "compactChangelogFiles = %s}", + partition, newFileChangelogFiles, compactChangelogFiles); + } + + private static class OutputStream { + + private final Path path; + private final PositionOutputStream out; + + private OutputStream(Path path, PositionOutputStream out) { + this.path = path; + this.out = out; + } + } + + private static class Result { + + private final int bucket; + private final boolean isCompactResult; + private final DataFileMeta meta; + private final long offset; + private final long length; + + private Result( + int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) { + this.bucket = bucket; + this.isCompactResult = isCompactResult; + this.meta = meta; + this.offset = offset; + this.length = length; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java new file mode 100644 index 000000000000..34f9d035d8d2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; +import org.apache.paimon.io.DataOutputViewStreamWrapper; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.CollectionUtil; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link ChangelogCompactTask}. */ +public class ChangelogCompactTaskSerializer + implements SimpleVersionedSerializer { + private static final int CURRENT_VERSION = 1; + + private final DataFileMetaSerializer dataFileSerializer; + + public ChangelogCompactTaskSerializer() { + this.dataFileSerializer = new DataFileMetaSerializer(); + } + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(ChangelogCompactTask obj) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + serialize(obj, view); + return out.toByteArray(); + } + + @Override + public ChangelogCompactTask deserialize(int version, byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + return deserialize(version, view); + } + + private void serialize(ChangelogCompactTask task, DataOutputView view) throws IOException { + view.writeLong(task.checkpointId()); + serializeBinaryRow(task.partition(), view); + // serialize newFileChangelogFiles map + serializeMap(task.newFileChangelogFiles(), view); + serializeMap(task.compactChangelogFiles(), view); + } + + private ChangelogCompactTask deserialize(int version, DataInputView view) throws IOException { + if (version != getVersion()) { + throw new RuntimeException("Can not deserialize version: " + version); + } + + return new ChangelogCompactTask( + view.readLong(), + deserializeBinaryRow(view), + deserializeMap(view), + deserializeMap(view)); + } + + private void serializeMap(Map> map, DataOutputView view) + throws IOException { + view.writeInt(map.size()); + for (Map.Entry> entry : map.entrySet()) { + view.writeInt(entry.getKey()); + if (entry.getValue() == null) { + throw new IllegalArgumentException( + "serialize error. no value for bucket-" + entry.getKey()); + } + dataFileSerializer.serializeList(entry.getValue(), view); + } + } + + private Map> deserializeMap(DataInputView view) throws IOException { + final int size = view.readInt(); + + final Map> map = + CollectionUtil.newHashMapWithExpectedSize(size); + for (int i = 0; i < size; i++) { + map.put(view.readInt(), dataFileSerializer.deserializeList(view)); + } + + return map; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java new file mode 100644 index 000000000000..260c25a31561 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; + +import java.util.List; + +/** + * Receive and process the {@link ChangelogCompactTask}s emitted by {@link + * ChangelogCompactCoordinateOperator}. + */ +public class ChangelogCompactWorkerOperator extends AbstractStreamOperator + implements OneInputStreamOperator, Committable> { + private final FileStoreTable table; + + public ChangelogCompactWorkerOperator(FileStoreTable table) { + this.table = table; + } + + public void processElement(StreamRecord> record) + throws Exception { + + if (record.getValue().isLeft()) { + output.collect(new StreamRecord<>(record.getValue().left())); + } else { + ChangelogCompactTask task = record.getValue().right(); + List committables = task.doCompact(table); + committables.forEach(committable -> output.collect(new StreamRecord<>(committable))); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java new file mode 100644 index 000000000000..5cae899a0704 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** Type information for {@link ChangelogCompactTask}. */ +public class ChangelogTaskTypeInfo extends TypeInformation { + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return ChangelogCompactTask.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer(ExecutionConfig config) { + // we don't need copy for task + return new NoneCopyVersionedSerializerTypeSerializerProxy( + ChangelogCompactTaskSerializer::new) {}; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof ChangelogTaskTypeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof ChangelogTaskTypeInfo; + } + + @Override + public String toString() { + return "ChangelogCompactionTask"; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index cbf193adfee9..6acb5b77c2a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -21,7 +21,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.TagCreationMode; -import org.apache.paimon.flink.compact.changelog.ChangelogCompactOperator; +import org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator; +import org.apache.paimon.flink.compact.changelog.ChangelogCompactWorkerOperator; +import org.apache.paimon.flink.compact.changelog.ChangelogTaskTypeInfo; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -32,6 +34,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.CheckpointingMode; @@ -55,7 +58,7 @@ import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.createCommitUser; -import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_COMPACT_PARALLELISM; +import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT; @@ -229,13 +232,19 @@ public DataStream doWrite( declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); } - if (options.contains(CHANGELOG_COMPACT_PARALLELISM)) { + if (options.contains(CHANGELOG_PRECOMMIT_COMPACT)) { written = written.transform( - "Changelog Compactor", + "Changelog Compact Coordinator", + new EitherTypeInfo<>( + new CommittableTypeInfo(), new ChangelogTaskTypeInfo()), + new ChangelogCompactCoordinateOperator(table)) + .forceNonParallel() + .transform( + "Changelog Compact Worker", new CommittableTypeInfo(), - new ChangelogCompactOperator(table)) - .setParallelism(options.get(CHANGELOG_COMPACT_PARALLELISM)); + new ChangelogCompactWorkerOperator(table)) + .setParallelism(written.getParallelism()); } return written; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 879a4a076cb3..3daf9d34de60 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -337,7 +337,7 @@ public void testBatchJobWithConflictAndRestart() throws Exception { @Test @Timeout(120) - public void testChangelogCompact() throws Exception { + public void testChangelogCompactInBatchWrite() throws Exception { TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); String catalogDdl = "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + path + "' )"; @@ -347,9 +347,9 @@ public void testChangelogCompact() throws Exception { "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) " + "PARTITIONED BY (pt) " + "WITH (" - + " 'bucket' = '2',\n" + + " 'bucket' = '10',\n" + " 'changelog-producer' = 'lookup',\n" - + " 'changelog.compact.parallelism' = '1',\n" + + " 'changelog.precommit-compact' = 'true',\n" + " 'snapshot.num-retained.min' = '3',\n" + " 'snapshot.num-retained.max' = '3'\n" + ")"); @@ -360,7 +360,7 @@ public void testChangelogCompact() throws Exception { sEnv.executeSql("USE CATALOG mycat"); List values = new ArrayList<>(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { values.add(String.format("(0, %d, %d)", i, i)); values.add(String.format("(1, %d, %d)", i, i)); } @@ -371,7 +371,7 @@ public void testChangelogCompact() throws Exception { assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); List expected = new ArrayList<>(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { expected.add(Row.ofKind(RowKind.INSERT, 0, i, i)); expected.add(Row.ofKind(RowKind.INSERT, 1, i, i)); } @@ -380,7 +380,7 @@ public void testChangelogCompact() throws Exception { expected); values.clear(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { values.add(String.format("(0, %d, %d)", i, i + 1)); values.add(String.format("(1, %d, %d)", i, i + 1)); } @@ -389,7 +389,7 @@ public void testChangelogCompact() throws Exception { assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4); assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i)); expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i)); expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1)); @@ -400,7 +400,7 @@ public void testChangelogCompact() throws Exception { expected); values.clear(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { values.add(String.format("(0, %d, %d)", i, i + 2)); values.add(String.format("(1, %d, %d)", i, i + 2)); } @@ -413,8 +413,8 @@ public void testChangelogCompact() throws Exception { assertThat(fileIO.exists(new Path(p))).isFalse(); } - expected = expected.subList(200, 600); - for (int i = 0; i < 100; i++) { + expected = expected.subList(2000, 6000); + for (int i = 0; i < 1000; i++) { expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i + 1)); expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i + 1)); expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 2)); @@ -425,6 +425,81 @@ public void testChangelogCompact() throws Exception { expected); } + @Test + @Timeout(120) + public void testChangelogCompactInStreamWrite() throws Exception { + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(2000) + .parallelism(4) + .build(); + + sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + sEnv.executeSql("USE CATALOG testCatalog"); + sEnv.executeSql( + "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) " + + "PARTITIONED BY (pt) " + + "WITH (" + + " 'bucket' = '10',\n" + + " 'changelog-producer' = 'lookup',\n" + + " 'changelog.precommit-compact' = 'true'\n" + + ")"); + + Path inputPath = new Path(path, "input"); + LocalFileIO.create().mkdirs(inputPath); + sEnv.executeSql( + "CREATE TABLE `default_catalog`.`default_database`.`s` ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED) " + + "WITH ( 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '" + + inputPath + + "', 'source.monitor-interval' = '500ms' )"); + + sEnv.executeSql("INSERT INTO t SELECT * FROM `default_catalog`.`default_database`.`s`"); + CloseableIterator it = sEnv.executeSql("SELECT * FROM t").collect(); + + // write initial data + List values = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + values.add(String.format("(0, %d, %d)", i, i)); + values.add(String.format("(1, %d, %d)", i, i)); + } + sEnv.executeSql( + "INSERT INTO `default_catalog`.`default_database`.`s` VALUES " + + String.join(", ", values)) + .await(); + + List expected = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + expected.add(Row.ofKind(RowKind.INSERT, 0, i, i)); + expected.add(Row.ofKind(RowKind.INSERT, 1, i, i)); + } + assertStreamingResult(it, expected); + + List compactedChangelogs2 = listAllFilesWithPrefix("compacted-changelog-"); + assertThat(compactedChangelogs2).hasSize(2); + assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); + + // write update data + values.clear(); + for (int i = 0; i < 100; i++) { + values.add(String.format("(0, %d, %d)", i, i + 1)); + values.add(String.format("(1, %d, %d)", i, i + 1)); + } + sEnv.executeSql( + "INSERT INTO `default_catalog`.`default_database`.`s` VALUES " + + String.join(", ", values)) + .await(); + for (int i = 0; i < 100; i++) { + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i)); + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 1)); + } + assertStreamingResult(it, expected.subList(200, 600)); + assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4); + assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); + } + private List listAllFilesWithPrefix(String prefix) throws Exception { try (Stream stream = Files.walk(java.nio.file.Paths.get(path))) { return stream.filter(Files::isRegularFile) @@ -444,6 +519,15 @@ private void assertStreamingResult(TableResult result, List expected) throw assertThat(actual).hasSameElementsAs(expected); } + private void assertStreamingResult(CloseableIterator it, List expected) { + List actual = new ArrayList<>(); + while (actual.size() < expected.size() && it.hasNext()) { + actual.add(it.next()); + } + + assertThat(actual).hasSameElementsAs(expected); + } + // ------------------------------------------------------------------------ // Random Tests // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java new file mode 100644 index 000000000000..906fac850973 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row; +import static org.apache.paimon.stats.StatsTestUtils.newSimpleStats; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ChangelogCompactTaskSerializer}. */ +public class ChangelogCompactTaskSerializerTest { + private final ChangelogCompactTaskSerializer serializer = new ChangelogCompactTaskSerializer(); + + @Test + public void testSerializer() throws Exception { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeInt(0, 0); + writer.complete(); + + ChangelogCompactTask task = + new ChangelogCompactTask( + 1L, + partition, + new HashMap>() { + { + put(0, newFiles(20)); + put(1, newFiles(20)); + } + }, + new HashMap>() { + { + put(0, newFiles(10)); + put(1, newFiles(10)); + } + }); + ChangelogCompactTask serializeTask = serializer.deserialize(1, serializer.serialize(task)); + assertThat(task).isEqualTo(serializeTask); + } + + private List newFiles(int num) { + List list = new ArrayList<>(); + for (int i = 0; i < num; i++) { + list.add(newFile()); + } + return list; + } + + private DataFileMeta newFile() { + return new DataFileMeta( + UUID.randomUUID().toString(), + 0, + 1, + row(0), + row(0), + newSimpleStats(0, 1), + newSimpleStats(0, 1), + 0, + 1, + 0, + 0, + 0L, + null, + FileSource.APPEND, + null); + } +}