-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support parallel reading of local orphan clean #4320
Conversation
}); | ||
} | ||
usedFiles.addAll(dataFiles); | ||
randomlyOnlyExecute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if manifests size is very big, use randomlyOnlyExecute may oom?
Do you think so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is indeed a risk of OOM.
But this can be avoided by reducing the parallelism (if it's equal to 1, then it's similar to serial execution before), and here it's just providing an ability to read in parallel.
If parallelism 1 is still OOM, then distributed mode is considered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about change randomlyOnlyExecute to sequentialBatchedExecute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I repleaced it with sequentialBatchedExecute since It has better control over memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the end, you still need to put it in a set, so as long as there is no inflation during the reading process, there should be no risk of oom here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I test sequentialBatchedExecute
randomlyOnlyExecute
and randomlyExecute
in 16 parallelism on table with 2PB of data , None of them happened OOM.
Besides sequentialBatchedExecute
procedure cost 30 min , randomlyOnlyExecute
and randomlyExecute
cost about 20 min
So I prefer to use randomlyOnlyExecute because this method is more concise and no other variables are introduced, it can also reduce the gc process of temporary data files variable. The disadvantage of this method is that there will be waiting lock overhead while inserting useFiles, but this has no impact on the overall time consumption, because the overall time consumption mainly comes from IO.
What do you think? @JingsongLi @wwj6591812
@@ -472,13 +476,18 @@ public List<Snapshot> safelyGetAllSnapshots() throws IOException { | |||
.map(id -> snapshotPath(id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this::snapshotPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix
} catch (FileNotFoundException ignored) { | ||
} | ||
} | ||
List<Snapshot> snapshots = Collections.synchronizedList(new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.synchronizedList(new ArrayList<>(paths.size()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix
@@ -489,18 +498,36 @@ public List<Changelog> safelyGetAllChangelogs() throws IOException { | |||
.map(id -> longLivedChangelogPath(id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this::longLivedChangelogPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix
} catch (FileNotFoundException ignored) { | ||
} | ||
} | ||
List<Changelog> changelogs = Collections.synchronizedList(new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.synchronizedList(new ArrayList<>(paths.size()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix
|
||
return changelogs; | ||
} | ||
|
||
private void collectSnapshots(Consumer<Path> pathConsumer, List<Path> paths) | ||
throws IOException { | ||
ExecutorService executor = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use ThreadPoolUtils#createCachedThreadPool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix
.forEach(dataFiles::add); | ||
}); | ||
Iterable<String> dataFiles = | ||
sequentialBatchedExecute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why using sequentialBatchedExecute
? Maybe use randomlyExecute
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace sequentialBatchedExecute
with randomlyOnlyExecute
, detail in #4320 (comment)
This reverts commit 43350a6.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Purpose
Increase parallel reading of snapshot, manifest, and data file meta to improve execution efficiency
Tests
Test in table with 7TB size with local mode:
Before this patch: local orphan clean will cost 20 min
After this patcgh, use the same sql in parallelism 16, the procedure only cost 5 min
API and Format
Documentation