-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] Small changelog files can now be compacted into big files #4255
Conversation
Will the compacted changelog be used when querying? |
Yes. Flink can still read these changelogs. |
* Operator to compact several changelog files from the same partition into one file, in order to | ||
* reduce the number of small files. | ||
*/ | ||
public class ChangelogCompactOperator extends AbstractStreamOperator<Committable> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why choose "add a compact operator" instead of "compact changelog file in write operator"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can you compact changelog files from all buckets into one file inside the write operator? Please suggest your solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry,at first, I guess the dimension of compact changelog is bucket, but after I read code, I discovered that the dimension of compact changelog is partition.
But before I submit comments, I forget delete this comment.
private void copyFile( | ||
Path path, BinaryRow partition, int bucket, boolean isCompactResult, DataFileMeta meta) | ||
throws Exception { | ||
if (!outputStreams.containsKey(partition)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here could change to outputStreams.computeIfAbsent?
outputStreams.put( | ||
partition, | ||
new OutputStream( | ||
outputPath, table.fileIO().newOutputStream(outputPath, false))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why overwrite is true? If file already exist, throw a exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no true
in this line of code. What do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why overwrite is false? If file already exist, throw a exception?
offset, | ||
outputStream.out.getPos() - offset)); | ||
|
||
if (outputStream.out.getPos() >= table.coreOptions().targetFileSize(false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only pk table has changelog, so why here call targetFileSize with false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changelog files are not LSM tree data files. They're just a bunch of records and they don't care about what the primary keys are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, you are right.
+ CompactedChangelogReadOnlyFormat.getIdentifier( | ||
baseResult.meta.fileFormat()))); | ||
|
||
Map<Integer, List<Result>> grouped = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map can init capacity with bucketNum.
} | ||
} | ||
|
||
CommitMessageImpl newMessage = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like line 113, here can add:
// send commit message only with changelog files
throws Exception { | ||
if (!outputStreams.containsKey(partition)) { | ||
Path outputPath = | ||
new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final String xxx = "tmp-compacted-changelog-";
public static class OrcFactory extends AbstractFactory { | ||
|
||
public OrcFactory() { | ||
super("orc"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use "OrcFileFormat.IDENTIFIER" instead.
Also parquet and avro.
return "cc-" + wrappedFormat; | ||
} | ||
|
||
static class AbstractFactory implements FileFormatFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
|
||
import java.util.List; | ||
|
||
/** {@link FileFormat} for compacted changelog. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for compacted changelog which wrapped a real FileFormat(orc / parquet / avro).
If the parallelism becomes larger, file copying will become faster. | ||
However, the number of resulting files will also become larger. | ||
As file copying is fast in most storage system, | ||
we suggest that you start experimenting with `'changelog.compact.parallelism' = '1'` and increase the value if needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea is to have only one switch: changelog.precommit-compact
= true
.
We can add a Coordinator node to this pipeline to decide how to concatenate it into a target file size result file, which can be one or multiple files.
Closing because this is refactored in #4380. |
Purpose
Currently, changelog files are not compacted. If Flink's checkpoint interval is short (for example, 30 seconds) and the number of buckets is large, each snapshot may produce lots of small changelog files. Too many files may put a burden on the distributed storage cluster.
This PR introduces a new feature to compact small changelog files into large ones.
Tests
IT cases.
API and Format
Introduces a special file format for compacted changelogs.
Documentation
Document is also added.