Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support parallel close writers #4297

Closed
wants to merge 9 commits into from

Conversation

neuyilan
Copy link
Member

@neuyilan neuyilan commented Oct 9, 2024

support parallel close writes.

Currently, when generating snapshots, the files of each bucket under each partition are closed sequentially, which takes a long time. The purpose of this PR is to enable parallel close of these files.

Documentation

Users only need to modify the properties of the table to adjust the number of threads that concurrently close files.

-- flink sql
ALTER TABLE my_table SET (
    'table.close-writers-thread-number' = '16' 
);
-- spark sql
ALTER TABLE my_table SET TBLPROPERTIES (
    'table.close-writers-thread-number' = '16' 
);

@neuyilan neuyilan closed this Oct 14, 2024
@neuyilan neuyilan reopened this Oct 14, 2024
@JingsongLi
Copy link
Contributor

it looks like the test failed.

@neuyilan neuyilan closed this Oct 16, 2024
@neuyilan neuyilan reopened this Oct 16, 2024
@neuyilan neuyilan marked this pull request as draft October 17, 2024 00:36
@neuyilan neuyilan marked this pull request as ready for review October 18, 2024 06:14
@neuyilan
Copy link
Member Author

it looks like the test failed.

Hi Jingsong, Thanks for your view.

The logic of this PR is relatively straightforward; it aims to perform concurrent flushing and closing of files in the buckets under the partition during the precommit phase. With this modification, all close operations for writers in the buckets under the partition during precommit are now handled in a thread pool.

However, this PR is encountering the following error during IT testing[1]. Could you offer some suggestions? I'm not quite clear on the cause of this error.

image

[1] https://github.com/apache/paimon/actions/runs/11399443830/job/31718241423

@JingsongLi
Copy link
Contributor

JingsongLi commented Oct 31, 2024

You should set thread context classloader in the async thread.

@JingsongLi
Copy link
Contributor

But overall, I don't think this PR should be optimized like this, because in this situation, you should try to minimize partitions as much as possible (I guess it's because there are too many partitions causing too many writers)?

@JingsongLi JingsongLi closed this Oct 31, 2024
@neuyilan
Copy link
Member Author

But overall, I don't think this PR should be optimized like this, because in this situation, you should try to minimize partitions as much as possible (I guess it's because there are too many partitions causing too many writers)?

In fact, there are not many partitions and buckets, but in our scenario, the data in Flink is consumed from Talos (like Kafka), so there may be many cross partition and cross bucket data streams within a checkpoint time interval. So it will result in a checkpoint where the bucket data for each partition is flushed sequentially, leading to longer processing times.

@neuyilan
Copy link
Member Author

neuyilan commented Nov 5, 2024

@JingsongLi What do you think? I think it's common sence

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants