Skip to content

Commit

Permalink
[flink] add coordinate and worker operator for small changelog files …
Browse files Browse the repository at this point in the history
…compaction
  • Loading branch information
LsomeYeah committed Oct 25, 2024
1 parent 815fd1c commit 4b61315
Show file tree
Hide file tree
Showing 11 changed files with 899 additions and 34 deletions.
8 changes: 2 additions & 6 deletions docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,5 @@ If Flink's checkpoint interval is short (for example, 30 seconds) and the number
each snapshot may produce lots of small changelog files.
Too many files may put a burden on the distributed storage cluster.

In order to compact small changelog files into large ones, you can set the table option `changelog.compact.parallelism`.
This option will add a compact operator after the writer operator, which copies changelog files into large ones.
If the parallelism becomes larger, file copying will become faster.
However, the number of resulting files will also become larger.
As file copying is fast in most storage system,
we suggest that you start experimenting with `'changelog.compact.parallelism' = '1'` and increase the value if needed.
In order to compact small changelog files into large ones, you can set the table option `changelog.precommit-compact = true`.
Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer operator, which copies changelog files into large ones.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
</thead>
<tbody>
<tr>
<td><h5>changelog.compact.parallelism</h5></td>
<td><h5>changelog.precommit-compact</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Compact several changelog files from the same partition into one file, in order to decrease the number of small files. This property sets the parallelism of the compact operator. More parallelism means faster file copy, however the number of resulting files will also become larger.</td>
<td>Boolean</td>
<td>If true, it will compact several changelog files from the same partition into larger ones, in order to decrease the number of small files.</td>
</tr>
<tr>
<td><h5>end-input.watermark</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,16 +397,14 @@ public class FlinkConnectorOptions {
.withDescription(
"Optional endInput watermark used in case of batch mode or bounded stream.");

public static final ConfigOption<Integer> CHANGELOG_COMPACT_PARALLELISM =
key("changelog.compact.parallelism")
.intType()
.noDefaultValue()
public static final ConfigOption<Boolean> CHANGELOG_PRECOMMIT_COMPACT =
key("changelog.precommit-compact")
.booleanType()
.defaultValue(false)
.withDescription(
"Compact several changelog files from the same partition into one file, "
+ "in order to decrease the number of small files. "
+ "This property sets the parallelism of the compact operator. "
+ "More parallelism means faster file copy, "
+ "however the number of resulting files will also become larger.");
"If true, it will add a changelog compact coordinator and worker operator after the writer operator,"
+ "in order to compact several changelog files from the same partition into large ones, "
+ "which can decrease the number of small files. ");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact.changelog;

import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Coordinator operator for compacting changelog files.
*
* <p>{@link UnawareAppendTableCompactionCoordinator} calculates the file size of changelog files
* contained in all buckets within each partition from {@link Committable} message emitted from
* writer operator. And emit {@link ChangelogCompactTask} to {@link ChangelogCompactWorkerOperator}.
*/
public class ChangelogCompactCoordinateOperator
extends AbstractStreamOperator<Either<Committable, ChangelogCompactTask>>
implements OneInputStreamOperator<Committable, Either<Committable, ChangelogCompactTask>>,
BoundedOneInput {
private final FileStoreTable table;

private transient long checkpointId;
private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;

public ChangelogCompactCoordinateOperator(FileStoreTable table) {
this.table = table;
}

@Override
public void open() throws Exception {
super.open();

checkpointId = Long.MIN_VALUE;
partitionChangelogs = new HashMap<>();
}

public void processElement(StreamRecord<Committable> record) {
Committable committable = record.getValue();
checkpointId = Math.max(checkpointId, committable.checkpointId());
if (committable.kind() != Committable.Kind.FILE) {
output.collect(new StreamRecord<>(Either.Left(record.getValue())));
return;
}

CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable();
if (message.newFilesIncrement().changelogFiles().isEmpty()
&& message.compactIncrement().changelogFiles().isEmpty()) {
output.collect(new StreamRecord<>(Either.Left(record.getValue())));
return;
}

BinaryRow partition = message.partition();
Integer bucket = message.bucket();
for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
partitionChangelogs
.computeIfAbsent(partition, k -> new PartitionChangelog())
.addNewChangelogFile(bucket, meta);
PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) {
emitPartitionChanglogCompactTask(partition);
}
}
for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
partitionChangelogs
.computeIfAbsent(partition, k -> new PartitionChangelog())
.addCompactChangelogFile(bucket, meta);
PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) {
emitPartitionChanglogCompactTask(partition);
}
}

CommitMessageImpl newMessage =
new CommitMessageImpl(
message.partition(),
message.bucket(),
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
Collections.emptyList()),
new CompactIncrement(
message.compactIncrement().compactBefore(),
message.compactIncrement().compactAfter(),
Collections.emptyList()),
message.indexIncrement());
Committable newCommittable =
new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
output.collect(new StreamRecord<>(Either.Left(newCommittable)));
}

public void prepareSnapshotPreBarrier(long checkpointId) {
emitAllPartitionsChanglogCompactTask();
}

public void endInput() {
emitAllPartitionsChanglogCompactTask();
}

private void emitPartitionChanglogCompactTask(BinaryRow partition) {
PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
output.collect(
new StreamRecord<>(
Either.Right(
new ChangelogCompactTask(
checkpointId,
partition,
partitionChangelog.newFileChangelogFiles,
partitionChangelog.compactChangelogFiles))));
partitionChangelogs.remove(partition);
}

private void emitAllPartitionsChanglogCompactTask() {
List<BinaryRow> partitions = new ArrayList<>(partitionChangelogs.keySet());
for (BinaryRow partition : partitions) {
emitPartitionChanglogCompactTask(partition);
}
}

private static class PartitionChangelog {
private long totalFileSize;
private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles;
private final Map<Integer, List<DataFileMeta>> compactChangelogFiles;

public long totalFileSize() {
return totalFileSize;
}

public PartitionChangelog() {
totalFileSize = 0;
newFileChangelogFiles = new HashMap<>();
compactChangelogFiles = new HashMap<>();
}

public void addNewChangelogFile(Integer bucket, DataFileMeta file) {
totalFileSize += file.fileSize();
newFileChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file);
}

public void addCompactChangelogFile(Integer bucket, DataFileMeta file) {
totalFileSize += file.fileSize();
compactChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file);
}
}
}
Loading

0 comments on commit 4b61315

Please sign in to comment.