Skip to content

Commit

Permalink
[core] Drop stats in manifest file reading (#4534)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Nov 19, 2024
1 parent d9d8d83 commit 1d38446
Show file tree
Hide file tree
Showing 36 changed files with 323 additions and 255 deletions.
10 changes: 0 additions & 10 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,6 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
<td>Gauge</td>
<td>Number of scanned manifest files in the last scan.</td>
</tr>
<tr>
<td>lastSkippedByPartitionAndStats</td>
<td>Gauge</td>
<td>Skipped table files by partition filter and value / key stats information in the last scan.</td>
</tr>
<tr>
<td>lastSkippedByWholeBucketFilesFilter</td>
<td>Gauge</td>
<td>Skipped table files by bucket level value filter (only primary key table) in the last scan.</td>
</tr>
<tr>
<td>lastScanSkippedTableFiles</td>
<td>Gauge</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public interface Filter<T> {
*/
boolean test(T t);

default Filter<T> and(Filter<? super T> other) {
if (other == null) {
return this;
}
return t -> test(t) && other.test(t);
}

@SuppressWarnings({"unchecked", "rawtypes"})
static <T> Filter<T> alwaysTrue() {
return (Filter) ALWAYS_TRUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ private void advanceIfNeeded() {
if (stack.isEmpty()) {
return;
}
activeList = randomlyExecute(executor, processor, stack.poll());
activeList =
randomlyExecuteSequentialReturn(
executor, processor, stack.poll());
}
}
}
Expand All @@ -132,7 +134,7 @@ public static <U> void randomlyOnlyExecute(
awaitAllFutures(futures);
}

public static <U, T> Iterator<T> randomlyExecute(
public static <U, T> Iterator<T> randomlyExecuteSequentialReturn(
ExecutorService executor, Function<U, List<T>> processor, Collection<U> input) {
List<Future<List<T>>> futures = new ArrayList<>(input.size());
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -441,7 +442,12 @@ public ManifestEntry next() {
}

if (currentIterator.hasNext()) {
return currentIterator.next();
ManifestEntry entry = currentIterator.next();
if (entry.kind() == FileKind.DELETE) {
continue;
} else {
return entry;
}
}
currentIterator = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;

/** Entry representing a file. */
Expand Down Expand Up @@ -214,7 +216,11 @@ static Set<Identifier> readDeletedEntries(
return readDeletedEntries(
m ->
manifestFile.read(
m.fileName(), m.fileSize(), Filter.alwaysTrue(), deletedFilter()),
m.fileName(),
m.fileSize(),
Filter.alwaysTrue(),
deletedFilter(),
Filter.alwaysTrue()),
manifestFiles,
manifestReadParallelism);
}
Expand All @@ -234,11 +240,11 @@ static <T extends FileEntry> Set<Identifier> readDeletedEntries(
.filter(e -> e.kind() == FileKind.DELETE)
.map(FileEntry::identifier)
.collect(Collectors.toList());
Iterable<Identifier> identifiers =
sequentialBatchedExecute(processor, manifestFiles, manifestReadParallelism);
Set<Identifier> result = new HashSet<>();
for (Identifier identifier : identifiers) {
result.add(identifier);
Iterator<Identifier> identifiers =
randomlyExecuteSequentialReturn(processor, manifestFiles, manifestReadParallelism);
Set<Identifier> result = ConcurrentHashMap.newKeySet();
while (identifiers.hasNext()) {
result.add(identifiers.next());
}
return result;
}
Expand All @@ -247,4 +253,9 @@ static Filter<InternalRow> deletedFilter() {
Function<InternalRow, FileKind> getter = ManifestEntrySerializer.kindGetter();
return row -> getter.apply(row) == FileKind.DELETE;
}

static Filter<InternalRow> addFilter() {
Function<InternalRow, FileKind> getter = ManifestEntrySerializer.kindGetter();
return row -> getter.apply(row) == FileKind.ADD;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.manifest;

/** Wrap a {@link ManifestEntry} to contain {@link #selected}. */
public class FilteredManifestEntry extends ManifestEntry {

private final boolean selected;

public FilteredManifestEntry(ManifestEntry entry, boolean selected) {
super(entry.kind(), entry.partition(), entry.bucket(), entry.totalBuckets(), entry.file());
this.selected = selected;
}

public boolean selected() {
return selected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,5 @@ public ManifestFile create() {
suggestedFileSize,
cache);
}

public ObjectsFile<SimpleFileEntry> createSimpleFileEntryReader() {
RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
return new ObjectsFile<>(
fileIO,
new SimpleFileEntrySerializer(),
entryType,
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
compression,
pathFactory.manifestFileFactory(),
cache);
}
}
}
Loading

0 comments on commit 1d38446

Please sign in to comment.