Skip to content

Commit

Permalink
[core] Introduce manifest.delete-file-drop-stats (#4640)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Dec 4, 2024
1 parent 0266765 commit 66ca698
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@
<td>String</td>
<td>Default file compression for manifest.</td>
</tr>
<tr>
<td><h5>manifest.delete-file-drop-stats</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>For DELETE manifest entry in manifest file, drop stats to reduce memory and storage. Default value is false only for compatibility of old reader.</td>
</tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">"avro"</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,14 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The object location for object table.");

public static final ConfigOption<Boolean> MANIFEST_DELETE_FILE_DROP_STATS =
key("manifest.delete-file-drop-stats")
.booleanType()
.defaultValue(false)
.withDescription(
"For DELETE manifest entry in manifest file, drop stats to reduce memory and storage."
+ " Default value is false only for compatibility of old reader.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String> MATERIALIZED_TABLE_DEFINITION_QUERY =
key("materialized-table.definition-query")
Expand Down Expand Up @@ -1947,6 +1955,10 @@ public boolean needLookup() {
return lookupStrategy().needLookup;
}

public boolean manifestDeleteFileDropStats() {
return options.get(MANIFEST_DELETE_FILE_DROP_STATS);
}

public LookupStrategy lookupStrategy() {
return LookupStrategy.from(
mergeEngine().equals(MergeEngine.FIRST_ROW),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> cal
tableName,
commitUser,
partitionType,
options,
options.partitionDefaultName(),
pathFactory(),
snapshotManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ public FilesIterator(
snapshotReader.withFilter(filter);
}
// drop stats to reduce memory
snapshotReader.dropStats();
if (table.coreOptions().manifestDeleteFileDropStats()) {
snapshotReader.dropStats();
}
this.streamingMode = isStreaming;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
Expand Down Expand Up @@ -96,13 +97,19 @@ protected AbstractFileStoreWrite(
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName,
CoreOptions options,
int totalBuckets,
RowType partitionType,
int writerNumberMax,
boolean legacyPartitionName) {
this.snapshotManager = snapshotManager;
this.scan = scan;
// Statistic is useless in writer
this.scan = scan == null ? null : scan.dropStats();
if (options.manifestDeleteFileDropStats()) {
if (this.scan != null) {
this.scan.dropStats();
}
}
this.indexFactory = indexFactory;
this.dvMaintainerFactory = dvMaintainerFactory;
this.totalBuckets = totalBuckets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
Expand Down Expand Up @@ -146,6 +147,7 @@ public FileStoreCommitImpl(
String tableName,
String commitUser,
RowType partitionType,
CoreOptions options,
String partitionDefaultName,
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
Expand Down Expand Up @@ -176,8 +178,11 @@ public FileStoreCommitImpl(
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
this.indexManifestFile = indexManifestFileFactory.create();
this.scan = scan;
// Stats in DELETE Manifest Entries is useless
this.scan = scan.dropStats();
if (options.manifestDeleteFileDropStats()) {
this.scan.dropStats();
}
this.numBucket = numBucket;
this.manifestTargetSize = manifestTargetSize;
this.manifestFullCompactionSize = manifestFullCompactionSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public MemoryFileStoreWrite(
indexFactory,
dvMaintainerFactory,
tableName,
options,
options.bucket(),
partitionType,
options.writeMaxWritersToSpill(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ public void testManifestCompact() throws Exception {
@Test
public void testDropStatsForOverwrite() throws Exception {
TestFileStore store = createStore(false);
store.options().toConfiguration().set(CoreOptions.MANIFEST_DELETE_FILE_DROP_STATS, true);

List<KeyValue> keyValues = generateDataList(1);
BinaryRow partition = gen.getPartition(keyValues.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon
condition: Expression,
output: Seq[Attribute]): Seq[DataSplit] = {
// low level snapshot reader, it can not be affected by 'scan.mode'
val snapshotReader = table.newSnapshotReader()
// dropStats after filter push down
val snapshotReader = table.newSnapshotReader().dropStats()
if (table.coreOptions().manifestDeleteFileDropStats()) {
snapshotReader.dropStats()
}
if (condition != TrueLiteral) {
val filter =
convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true)
Expand Down

0 comments on commit 66ca698

Please sign in to comment.