diff --git a/docs/content/maintenance/write-performance.md b/docs/content/maintenance/write-performance.md
index 03e734874c05..4b3b5788b787 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -160,3 +160,16 @@ You can use fine-grained-resource-management of Flink to increase committer heap
1. Configure Flink Configuration `cluster.fine-grained-resource-management.enabled: true`. (This is default after Flink 1.18)
2. Configure Paimon Table Options: `sink.committer-memory`, for example 300 MB, depends on your `TaskManager`.
(`sink.committer-cpu` is also supported)
+
+## Changelog Compaction
+
+If Flink's checkpoint interval is short (for example, 30 seconds) and the number of buckets is large,
+each snapshot may produce lots of small changelog files.
+Too many files may put a burden on the distributed storage cluster.
+
+In order to compact small changelog files into large ones, you can set the table option `changelog.compact.parallelism`.
+This option will add a compact operator after the writer operator, which copies changelog files into large ones.
+If the parallelism becomes larger, file copying will become faster.
+However, the number of resulting files will also become larger.
+As file copying is fast in most storage system,
+we suggest that you start experimenting with `'changelog.compact.parallelism' = '1'` and increase the value if needed.
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index ea5dd5dfb163..030eb7691f8d 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,6 +26,12 @@
+
+ changelog.compact.parallelism |
+ (none) |
+ Integer |
+ Compact several changelog files from the same partition into one file, in order to decrease the number of small files. This property sets the parallelism of the compact operator. More parallelism means faster file copy, however the number of resulting files will also become larger. |
+
end-input.watermark |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
index 3df81d2d11d3..29352b4e8161 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
@@ -38,7 +38,7 @@ public final class IOUtils {
private static final Logger LOG = LoggerFactory.getLogger(IOUtils.class);
/** The block size for byte operations in byte. */
- private static final int BLOCKSIZE = 4096;
+ public static final int BLOCKSIZE = 4096;
// ------------------------------------------------------------------------
// Byte copy operations
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 2c12b70a2493..66b4c376238f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -397,6 +397,17 @@ public class FlinkConnectorOptions {
.withDescription(
"Optional endInput watermark used in case of batch mode or bounded stream.");
+ public static final ConfigOption CHANGELOG_COMPACT_PARALLELISM =
+ key("changelog.compact.parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Compact several changelog files from the same partition into one file, "
+ + "in order to decrease the number of small files. "
+ + "This property sets the parallelism of the compact operator. "
+ + "More parallelism means faster file copy, "
+ + "however the number of resulting files will also become larger.");
+
public static List> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List> list = new ArrayList<>(fields.length);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java
new file mode 100644
index 000000000000..973fd5c647c9
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Operator to compact several changelog files from the same partition into one file, in order to
+ * reduce the number of small files.
+ */
+public class ChangelogCompactOperator extends AbstractStreamOperator
+ implements OneInputStreamOperator, BoundedOneInput {
+
+ private final FileStoreTable table;
+
+ private transient FileStorePathFactory pathFactory;
+ private transient long checkpointId;
+ private transient Map outputStreams;
+ private transient Map> results;
+
+ public ChangelogCompactOperator(FileStoreTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ pathFactory = table.store().pathFactory();
+ checkpointId = Long.MIN_VALUE;
+ outputStreams = new HashMap<>();
+ results = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord record) throws Exception {
+ Committable committable = record.getValue();
+ checkpointId = Math.max(checkpointId, committable.checkpointId());
+ if (committable.kind() != Committable.Kind.FILE) {
+ output.collect(record);
+ return;
+ }
+
+ CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable();
+ if (message.newFilesIncrement().changelogFiles().isEmpty()
+ && message.compactIncrement().changelogFiles().isEmpty()) {
+ output.collect(record);
+ return;
+ }
+
+ // copy changelogs from the same partition into one file
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(message.partition(), message.bucket());
+ for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
+ copyFile(
+ dataFilePathFactory.toPath(meta.fileName()),
+ message.partition(),
+ message.bucket(),
+ false,
+ meta);
+ }
+ for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
+ copyFile(
+ dataFilePathFactory.toPath(meta.fileName()),
+ message.partition(),
+ message.bucket(),
+ true,
+ meta);
+ }
+
+ // send commit message without changelog files
+ CommitMessageImpl newMessage =
+ new CommitMessageImpl(
+ message.partition(),
+ message.bucket(),
+ new DataIncrement(
+ message.newFilesIncrement().newFiles(),
+ message.newFilesIncrement().deletedFiles(),
+ Collections.emptyList()),
+ new CompactIncrement(
+ message.compactIncrement().compactBefore(),
+ message.compactIncrement().compactAfter(),
+ Collections.emptyList()),
+ message.indexIncrement());
+ Committable newCommittable =
+ new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
+ if (record.hasTimestamp()) {
+ output.collect(new StreamRecord<>(newCommittable, record.getTimestamp()));
+ } else {
+ output.collect(new StreamRecord<>(newCommittable));
+ }
+ }
+
+ private void copyFile(
+ Path path, BinaryRow partition, int bucket, boolean isCompactResult, DataFileMeta meta)
+ throws Exception {
+ if (!outputStreams.containsKey(partition)) {
+ Path outputPath =
+ new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID());
+ outputStreams.put(
+ partition,
+ new OutputStream(
+ outputPath, table.fileIO().newOutputStream(outputPath, false)));
+ }
+
+ OutputStream outputStream = outputStreams.get(partition);
+ long offset = outputStream.out.getPos();
+ try (SeekableInputStream in = table.fileIO().newInputStream(path)) {
+ IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false);
+ }
+ table.fileIO().deleteQuietly(path);
+ results.computeIfAbsent(partition, p -> new ArrayList<>())
+ .add(
+ new Result(
+ bucket,
+ isCompactResult,
+ meta,
+ offset,
+ outputStream.out.getPos() - offset));
+
+ if (outputStream.out.getPos() >= table.coreOptions().targetFileSize(false)) {
+ flushPartition(partition);
+ }
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ flushAllPartitions();
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ flushAllPartitions();
+ }
+
+ private void flushAllPartitions() throws Exception {
+ List partitions = new ArrayList<>(outputStreams.keySet());
+ for (BinaryRow partition : partitions) {
+ flushPartition(partition);
+ }
+ }
+
+ private void flushPartition(BinaryRow partition) throws Exception {
+ OutputStream outputStream = outputStreams.get(partition);
+ outputStream.out.close();
+
+ Result baseResult = results.get(partition).get(0);
+ Preconditions.checkArgument(baseResult.offset == 0);
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(partition, baseResult.bucket);
+ // see Java docs of `CompactedChangelogFormatReaderFactory`
+ String realName =
+ "compacted-changelog-"
+ + UUID.randomUUID()
+ + "$"
+ + baseResult.bucket
+ + "-"
+ + baseResult.length;
+ table.fileIO()
+ .rename(
+ outputStream.path,
+ dataFilePathFactory.toPath(
+ realName
+ + "."
+ + CompactedChangelogReadOnlyFormat.getIdentifier(
+ baseResult.meta.fileFormat())));
+
+ Map> grouped = new HashMap<>();
+ for (Result result : results.get(partition)) {
+ grouped.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result);
+ }
+
+ for (Map.Entry> entry : grouped.entrySet()) {
+ List newFilesChangelog = new ArrayList<>();
+ List compactChangelog = new ArrayList<>();
+ for (Result result : entry.getValue()) {
+ // see Java docs of `CompactedChangelogFormatReaderFactory`
+ String name =
+ (result.offset == 0
+ ? realName
+ : realName + "-" + result.offset + "-" + result.length)
+ + "."
+ + CompactedChangelogReadOnlyFormat.getIdentifier(
+ result.meta.fileFormat());
+ if (result.isCompactResult) {
+ compactChangelog.add(result.meta.rename(name));
+ } else {
+ newFilesChangelog.add(result.meta.rename(name));
+ }
+ }
+
+ CommitMessageImpl newMessage =
+ new CommitMessageImpl(
+ partition,
+ entry.getKey(),
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ newFilesChangelog),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ compactChangelog));
+ Committable newCommittable =
+ new Committable(checkpointId, Committable.Kind.FILE, newMessage);
+ output.collect(new StreamRecord<>(newCommittable));
+ }
+
+ outputStreams.remove(partition);
+ results.remove(partition);
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Map.Entry entry : outputStreams.entrySet()) {
+ OutputStream outputStream = entry.getValue();
+ try {
+ outputStream.out.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close output stream for file " + outputStream.path, e);
+ }
+ table.fileIO().deleteQuietly(outputStream.path);
+ }
+
+ outputStreams.clear();
+ results.clear();
+ }
+
+ private static class OutputStream {
+
+ private final Path path;
+ private final PositionOutputStream out;
+
+ private OutputStream(Path path, PositionOutputStream out) {
+ this.path = path;
+ this.out = out;
+ }
+ }
+
+ private static class Result {
+
+ private final int bucket;
+ private final boolean isCompactResult;
+ private final DataFileMeta meta;
+ private final long offset;
+ private final long length;
+
+ private Result(
+ int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) {
+ this.bucket = bucket;
+ this.isCompactResult = isCompactResult;
+ this.meta = meta;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
new file mode 100644
index 000000000000..8d17311518df
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog.format;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.reader.RecordReader;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * {@link FormatReaderFactory} for compacted changelog.
+ *
+ * File Name Protocol
+ *
+ *
There are two kinds of file name. In the following description, bid1
and
+ * bid2
are bucket id, off
is offset, len1
and len2
+ * are lengths.
+ *
+ *
+ * bucket-bid1/compacted-changelog-xxx$bid1-len1
: This is the real file name. If
+ * this file name is recorded in manifest file meta, reader should read the bytes of this file
+ * starting from offset 0
with length len1
.
+ * bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2
: This is the fake file
+ * name. Reader should read the bytes of file
+ * bucket-bid1/compacted-changelog-xxx$bid1-len1
starting from offset off
+ * with length len2
.
+ *
+ */
+public class CompactedChangelogFormatReaderFactory implements FormatReaderFactory {
+
+ private final FormatReaderFactory wrapped;
+
+ public CompactedChangelogFormatReaderFactory(FormatReaderFactory wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public RecordReader createReader(Context context) throws IOException {
+ OffsetReadOnlyFileIO fileIO = new OffsetReadOnlyFileIO(context.fileIO());
+ long length = decodePath(context.filePath()).length;
+
+ return wrapped.createReader(
+ new Context() {
+
+ @Override
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
+ @Override
+ public Path filePath() {
+ return context.filePath();
+ }
+
+ @Override
+ public long fileSize() {
+ return length;
+ }
+ });
+ }
+
+ private static DecodeResult decodePath(Path path) {
+ String[] nameAndFormat = path.getName().split("\\.");
+ String[] names = nameAndFormat[0].split("\\$");
+ String[] split = names[1].split("-");
+ if (split.length == 2) {
+ return new DecodeResult(path, 0, Long.parseLong(split[1]));
+ } else {
+ Path realPath =
+ new Path(
+ path.getParent().getParent(),
+ "bucket-"
+ + split[0]
+ + "/"
+ + names[0]
+ + "$"
+ + split[0]
+ + "-"
+ + split[1]
+ + "."
+ + nameAndFormat[1]);
+ return new DecodeResult(realPath, Long.parseLong(split[2]), Long.parseLong(split[3]));
+ }
+ }
+
+ private static class DecodeResult {
+
+ private final Path path;
+ private final long offset;
+ private final long length;
+
+ private DecodeResult(Path path, long offset, long length) {
+ this.path = path;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ private static class OffsetReadOnlyFileIO implements FileIO {
+
+ private final FileIO wrapped;
+
+ private OffsetReadOnlyFileIO(FileIO wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public boolean isObjectStore() {
+ return wrapped.isObjectStore();
+ }
+
+ @Override
+ public void configure(CatalogContext context) {
+ wrapped.configure(context);
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws IOException {
+ DecodeResult result = decodePath(path);
+ return new OffsetSeekableInputStream(
+ wrapped.newInputStream(result.path), result.offset, result.length);
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ DecodeResult result = decodePath(path);
+ FileStatus status = wrapped.getFileStatus(result.path);
+
+ return new FileStatus() {
+
+ @Override
+ public long getLen() {
+ return result.length;
+ }
+
+ @Override
+ public boolean isDir() {
+ return status.isDir();
+ }
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public long getModificationTime() {
+ return status.getModificationTime();
+ }
+ };
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return wrapped.exists(decodePath(path).path);
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class OffsetSeekableInputStream extends SeekableInputStream {
+
+ private final SeekableInputStream wrapped;
+ private final long offset;
+ private final long length;
+
+ private OffsetSeekableInputStream(SeekableInputStream wrapped, long offset, long length)
+ throws IOException {
+ this.wrapped = wrapped;
+ this.offset = offset;
+ this.length = length;
+ wrapped.seek(offset);
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ wrapped.seek(offset + desired);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return wrapped.getPos() - offset;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (getPos() >= length) {
+ throw new EOFException();
+ }
+ return wrapped.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ long realLen = Math.min(len, length - getPos());
+ return wrapped.read(b, off, (int) realLen);
+ }
+
+ @Override
+ public void close() throws IOException {
+ wrapped.close();
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java
new file mode 100644
index 000000000000..39bed81505c6
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog.format;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** {@link FileFormat} for compacted changelog. */
+public class CompactedChangelogReadOnlyFormat extends FileFormat {
+
+ private final FileFormat wrapped;
+
+ protected CompactedChangelogReadOnlyFormat(String formatIdentifier, FileFormat wrapped) {
+ super(formatIdentifier);
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public FormatReaderFactory createReaderFactory(
+ RowType projectedRowType, @Nullable List filters) {
+ return new CompactedChangelogFormatReaderFactory(
+ wrapped.createReaderFactory(projectedRowType, filters));
+ }
+
+ @Override
+ public FormatWriterFactory createWriterFactory(RowType type) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void validateDataFields(RowType rowType) {
+ wrapped.validateDataFields(rowType);
+ }
+
+ public static String getIdentifier(String wrappedFormat) {
+ return "cc-" + wrappedFormat;
+ }
+
+ static class AbstractFactory implements FileFormatFactory {
+
+ private final String format;
+
+ AbstractFactory(String format) {
+ this.format = format;
+ }
+
+ @Override
+ public String identifier() {
+ return getIdentifier(format);
+ }
+
+ @Override
+ public FileFormat create(FormatContext formatContext) {
+ return new CompactedChangelogReadOnlyFormat(
+ getIdentifier(format), FileFormat.fromIdentifier(format, formatContext));
+ }
+ }
+
+ /** {@link FileFormatFactory} for compacted changelog, with orc as the real format. */
+ public static class OrcFactory extends AbstractFactory {
+
+ public OrcFactory() {
+ super("orc");
+ }
+ }
+
+ /** {@link FileFormatFactory} for compacted changelog, with parquet as the real format. */
+ public static class ParquetFactory extends AbstractFactory {
+
+ public ParquetFactory() {
+ super("parquet");
+ }
+ }
+
+ /** {@link FileFormatFactory} for compacted changelog, with avro as the real format. */
+ public static class AvroFactory extends AbstractFactory {
+
+ public AvroFactory() {
+ super("avro");
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index e483e3c19f74..cbf193adfee9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.TagCreationMode;
+import org.apache.paimon.flink.compact.changelog.ChangelogCompactOperator;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -54,6 +55,7 @@
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.createCommitUser;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_COMPACT_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -226,6 +228,16 @@ public DataStream doWrite(
if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
+
+ if (options.contains(CHANGELOG_COMPACT_PARALLELISM)) {
+ written =
+ written.transform(
+ "Changelog Compactor",
+ new CommittableTypeInfo(),
+ new ChangelogCompactOperator(table))
+ .setParallelism(options.get(CHANGELOG_COMPACT_PARALLELISM));
+ }
+
return written;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
new file mode 100644
index 000000000000..6e7553d5c668
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$OrcFactory
+org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$ParquetFactory
+org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$AvroFactory
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 2bdd90a963d9..879a4a076cb3 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -39,6 +39,7 @@
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -48,6 +49,7 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -303,9 +305,7 @@ private void innerTestChangelogProducing(List options) throws Exception
public void testBatchJobWithConflictAndRestart() throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().allowRestart(10).build();
tEnv.executeSql(
- "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '"
- + getTempDirPath()
- + "' )");
+ "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + path + "' )");
tEnv.executeSql("USE CATALOG mycat");
tEnv.executeSql(
"CREATE TABLE t ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) "
@@ -335,6 +335,115 @@ public void testBatchJobWithConflictAndRestart() throws Exception {
}
}
+ @Test
+ @Timeout(120)
+ public void testChangelogCompact() throws Exception {
+ TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
+ String catalogDdl =
+ "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + path + "' )";
+ bEnv.executeSql(catalogDdl);
+ bEnv.executeSql("USE CATALOG mycat");
+ bEnv.executeSql(
+ "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) "
+ + "PARTITIONED BY (pt) "
+ + "WITH ("
+ + " 'bucket' = '2',\n"
+ + " 'changelog-producer' = 'lookup',\n"
+ + " 'changelog.compact.parallelism' = '1',\n"
+ + " 'snapshot.num-retained.min' = '3',\n"
+ + " 'snapshot.num-retained.max' = '3'\n"
+ + ")");
+
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(1000).build();
+ sEnv.executeSql(catalogDdl);
+ sEnv.executeSql("USE CATALOG mycat");
+
+ List values = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ values.add(String.format("(0, %d, %d)", i, i));
+ values.add(String.format("(1, %d, %d)", i, i));
+ }
+ bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await();
+
+ List compactedChangelogs2 = listAllFilesWithPrefix("compacted-changelog-");
+ assertThat(compactedChangelogs2).hasSize(2);
+ assertThat(listAllFilesWithPrefix("changelog-")).isEmpty();
+
+ List expected = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ expected.add(Row.ofKind(RowKind.INSERT, 0, i, i));
+ expected.add(Row.ofKind(RowKind.INSERT, 1, i, i));
+ }
+ assertStreamingResult(
+ sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"),
+ expected);
+
+ values.clear();
+ for (int i = 0; i < 100; i++) {
+ values.add(String.format("(0, %d, %d)", i, i + 1));
+ values.add(String.format("(1, %d, %d)", i, i + 1));
+ }
+ bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await();
+
+ assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4);
+ assertThat(listAllFilesWithPrefix("changelog-")).isEmpty();
+
+ for (int i = 0; i < 100; i++) {
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i));
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 1));
+ }
+ assertStreamingResult(
+ sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"),
+ expected);
+
+ values.clear();
+ for (int i = 0; i < 100; i++) {
+ values.add(String.format("(0, %d, %d)", i, i + 2));
+ values.add(String.format("(1, %d, %d)", i, i + 2));
+ }
+ bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await();
+
+ assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4);
+ assertThat(listAllFilesWithPrefix("changelog-")).isEmpty();
+ LocalFileIO fileIO = LocalFileIO.create();
+ for (String p : compactedChangelogs2) {
+ assertThat(fileIO.exists(new Path(p))).isFalse();
+ }
+
+ expected = expected.subList(200, 600);
+ for (int i = 0; i < 100; i++) {
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i + 1));
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i + 1));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 2));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 2));
+ }
+ assertStreamingResult(
+ sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"),
+ expected);
+ }
+
+ private List listAllFilesWithPrefix(String prefix) throws Exception {
+ try (Stream stream = Files.walk(java.nio.file.Paths.get(path))) {
+ return stream.filter(Files::isRegularFile)
+ .filter(p -> p.getFileName().toString().startsWith(prefix))
+ .map(java.nio.file.Path::toString)
+ .collect(Collectors.toList());
+ }
+ }
+
+ private void assertStreamingResult(TableResult result, List expected) throws Exception {
+ List actual = new ArrayList<>();
+ try (CloseableIterator it = result.collect()) {
+ while (actual.size() < expected.size() && it.hasNext()) {
+ actual.add(it.next());
+ }
+ }
+ assertThat(actual).hasSameElementsAs(expected);
+ }
+
// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
@@ -490,13 +599,16 @@ private void testLookupChangelogProducerRandom(
enableFailure,
"'bucket' = '4',"
+ String.format(
- "'write-buffer-size' = '%s',"
- + "'changelog-producer' = 'lookup',"
- + "'lookup-wait' = '%s',"
- + "'deletion-vectors.enabled' = '%s'",
- random.nextBoolean() ? "4mb" : "8mb",
+ "'bucket' = '4', "
+ + "'writer-buffer-size' = '%s', "
+ + "'changelog-producer' = 'lookup', "
+ + "'lookup-wait' = '%s', "
+ + "'deletion-vectors.enabled' = '%s', "
+ + "'changelog.compact.parallelism' = '%s'",
+ random.nextBoolean() ? "512kb" : "1mb",
random.nextBoolean(),
- enableDeletionVectors));
+ enableDeletionVectors,
+ random.nextInt(1, 3)));
// sleep for a random amount of time to check
// if we can first read complete records then read incremental records correctly