Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 18, 2024
1 parent 68c7874 commit cef53dc
Showing 1 changed file with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,11 @@ public List<ManifestEntry> files() {

@Override
public List<SimpleFileEntry> readSimpleEntries() {
List<ManifestFileMeta> manifests = readManifests().filteredManifests;
Iterator<SimpleFileEntry> iterator =
readAndMergeSimpleEntries(readManifests().filteredManifests);
scanMode == ScanMode.ALL
? readAndMergeSimpleEntries(manifests, false)
: readAndNoMergeSimpleEntries(manifests, false);
List<SimpleFileEntry> result = new ArrayList<>();
while (iterator.hasNext()) {
result.add(iterator.next());
Expand Down Expand Up @@ -340,14 +343,15 @@ private Iterator<ManifestEntry> readAndMergeManifestEntries(
return readAndMergeFileEntries(manifests, readEntries, useSequential);
}

private Iterator<SimpleFileEntry> readAndMergeSimpleEntries(List<ManifestFileMeta> manifests) {
private Iterator<SimpleFileEntry> readAndMergeSimpleEntries(
List<ManifestFileMeta> manifests, boolean useSequential) {
BiFunction<ManifestFileMeta, Set<Identifier>, List<SimpleFileEntry>> readEntries =
(manifest, deleted) ->
readSimpleEntries(
manifest,
FileEntry.addFilter(),
entry -> !deleted.contains(entry.identifier()));
return readAndMergeFileEntries(manifests, readEntries, false);
return readAndMergeFileEntries(manifests, readEntries, useSequential);
}

private <T extends FileEntry> Iterator<T> readAndMergeFileEntries(
Expand Down Expand Up @@ -383,6 +387,16 @@ private Iterator<ManifestEntry> readAndNoMergeManifestEntries(
}
}

private Iterator<SimpleFileEntry> readAndNoMergeSimpleEntries(
List<ManifestFileMeta> manifests, boolean useSequential) {
if (useSequential) {
return sequentialBatchedExecute(this::readSimpleEntries, manifests, parallelism)
.iterator();
} else {
return randomlyExecuteSequentialReturn(this::readSimpleEntries, manifests, parallelism);
}
}

private ManifestsReader.Result readManifests() {
if (specifiedManifests != null) {
return new ManifestsReader.Result(null, specifiedManifests, specifiedManifests);
Expand Down Expand Up @@ -429,6 +443,11 @@ private List<ManifestEntry> readManifest(
&& filterByStats(entry));
}

/** Note: Keep this thread-safe. */
private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) {
return readSimpleEntries(manifest, null, null);
}

/** Note: Keep this thread-safe. */
private List<SimpleFileEntry> readSimpleEntries(
ManifestFileMeta manifest,
Expand Down

0 comments on commit cef53dc

Please sign in to comment.