Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce stats in snapshot #2677

Merged
merged 10 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.util.OptionalLong;

/** Utils for Optional. * */
public class OptionalUtils {
public static OptionalLong ofNullable(Long value) {
return value == null ? OptionalLong.empty() : OptionalLong.of(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.TagCallback;
Expand Down Expand Up @@ -143,6 +145,14 @@ public IndexFileHandler newIndexFileHandler() {
new HashIndexFile(fileIO, pathFactory().indexFileFactory()));
}

@Override
public StatsFileHandler newStatsFileHandler() {
return new StatsFileHandler(
snapshotManager(),
schemaManager,
new StatsFile(fileIO, pathFactory().statsFileFactory()));
}

@Override
public RowType partitionType() {
return partitionType;
Expand Down Expand Up @@ -176,7 +186,8 @@ public FileStoreCommitImpl newCommit(String commitUser) {
options.manifestFullCompactionThresholdSize(),
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator());
newKeyComparator(),
newStatsFileHandler());
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
Expand Down Expand Up @@ -69,6 +70,8 @@ public interface FileStore<T> extends Serializable {

IndexFileHandler newIndexFileHandler();

StatsFileHandler newStatsFileHandler();

FileStoreRead<T> newRead();

FileStoreWrite<T> newWrite(String commitUser);
Expand Down
35 changes: 28 additions & 7 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class Snapshot {
private static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
private static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
private static final String FIELD_WATERMARK = "watermark";
private static final String FIELD_STATISTICS = "statistics";

// version of snapshot
// null for paimon <= 0.2
Expand Down Expand Up @@ -169,6 +170,13 @@ public class Snapshot {
@Nullable
private final Long watermark;

// stats file name for statistics of this table
// null if no stats file
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_STATISTICS)
@Nullable
private final String statistics;

public Snapshot(
long id,
long schemaId,
Expand All @@ -184,7 +192,8 @@ public Snapshot(
@Nullable Long totalRecordCount,
@Nullable Long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark) {
@Nullable Long watermark,
@Nullable String statistics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the partition-level statistics are supported, how to retrieve those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan to add a new field named partitionStatistics (a manifestList link) when going to support partition-level statistics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both statistics and partitionStatistics are related to statistics, why we split into two?

this(
CURRENT_VERSION,
id,
Expand All @@ -201,7 +210,8 @@ public Snapshot(
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark);
watermark,
statistics);
}

@JsonCreator
Expand All @@ -218,10 +228,11 @@ public Snapshot(
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
@JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets,
@JsonProperty(FIELD_TOTAL_RECORD_COUNT) Long totalRecordCount,
@JsonProperty(FIELD_DELTA_RECORD_COUNT) Long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) Long watermark) {
@JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount,
@JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
this.version = version;
this.id = id;
this.schemaId = schemaId;
Expand All @@ -238,6 +249,7 @@ public Snapshot(
this.deltaRecordCount = deltaRecordCount;
this.changelogRecordCount = changelogRecordCount;
this.watermark = watermark;
this.statistics = statistics;
}

@JsonGetter(FIELD_VERSION)
Expand Down Expand Up @@ -327,6 +339,12 @@ public Long watermark() {
return watermark;
}

@JsonGetter(FIELD_STATISTICS)
@Nullable
public String statistics() {
return statistics;
}

/**
* Return all {@link ManifestFileMeta} instances for either data or changelog manifests in this
* snapshot.
Expand Down Expand Up @@ -487,6 +505,9 @@ public enum CommitKind {
COMPACT,

/** Changes that clear up the whole partition and then add new records. */
OVERWRITE
OVERWRITE,

/** Collect statistics. */
ANALYZE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.FileStorePathFactory;

Expand Down Expand Up @@ -85,6 +86,12 @@ void overwrite(
/** With metrics to measure commits. */
FileStoreCommit withMetrics(CommitMetrics metrics);

/**
* Commit new statistics. The {@link Snapshot.CommitKind} of generated snapshot is {@link
* Snapshot.CommitKind#ANALYZE}.
*/
void commitStatistics(Stats stats, long commitIdentifier);

FileStorePathFactory pathFactory();

FileIO fileIO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {

private CommitMetrics commitMetrics;

private final StatsFileHandler statsFileHandler;

public FileStoreCommitImpl(
FileIO fileIO,
SchemaManager schemaManager,
Expand All @@ -132,7 +136,8 @@ public FileStoreCommitImpl(
MemorySize manifestFullCompactionSize,
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
@Nullable Comparator<InternalRow> keyComparator) {
@Nullable Comparator<InternalRow> keyComparator,
StatsFileHandler statsFileHandler) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
Expand All @@ -150,10 +155,10 @@ public FileStoreCommitImpl(
this.manifestMergeMinCount = manifestMergeMinCount;
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.keyComparator = keyComparator;

this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = null;
this.statsFileHandler = statsFileHandler;
}

@Override
Expand Down Expand Up @@ -248,7 +253,8 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
safeLatestSnapshotId);
safeLatestSnapshotId,
null);
generatedSnapshot += 1;
}

Expand Down Expand Up @@ -276,7 +282,8 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
safeLatestSnapshotId);
safeLatestSnapshotId,
null);
generatedSnapshot += 1;
}
} finally {
Expand Down Expand Up @@ -421,6 +428,7 @@ public void overwrite(
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
null,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -504,6 +512,21 @@ public FileStoreCommit withMetrics(CommitMetrics metrics) {
return this;
}

@Override
public void commitStatistics(Stats stats, long commitIdentifier) {
String statsFileName = statsFileHandler.writeStats(stats);
tryCommit(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
commitIdentifier,
null,
Collections.emptyMap(),
Snapshot.CommitKind.ANALYZE,
null,
statsFileName);
}

@Override
public FileStorePathFactory pathFactory() {
return pathFactory;
Expand Down Expand Up @@ -573,7 +596,8 @@ private int tryCommit(
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Long safeLatestSnapshotId) {
@Nullable Long safeLatestSnapshotId,
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
Expand All @@ -587,7 +611,8 @@ private int tryCommit(
logOffsets,
commitKind,
latestSnapshot,
safeLatestSnapshotId)) {
safeLatestSnapshotId,
statsFileName)) {
break;
}
}
Expand Down Expand Up @@ -650,6 +675,7 @@ private int tryOverwrite(
logOffsets,
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
null,
null)) {
break;
}
Expand All @@ -666,8 +692,9 @@ public boolean tryCommitOnce(
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Snapshot latestSnapshot,
Long safeLatestSnapshotId) {
@Nullable Snapshot latestSnapshot,
@Nullable Long safeLatestSnapshotId,
@Nullable String newStatsFileName) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
Expand Down Expand Up @@ -752,11 +779,25 @@ public boolean tryCommitOnce(
newIndexManifest = indexManifest;
}

long latestSchemaId = schemaManager.latest().get().id();

// write new stats or inherit from the previous snapshot
String statsFileName = null;
if (newStatsFileName != null) {
statsFileName = newStatsFileName;
} else if (latestSnapshot != null && latestSnapshot.statistics() != null) {
if (latestSnapshot.schemaId() != latestSchemaId) {
LOG.warn("Schema changed, stats will not be inherited");
} else {
statsFileName = latestSnapshot.statistics();
}
}

// prepare snapshot file
newSnapshot =
new Snapshot(
newSnapshotId,
schemaManager.latest().get().id(),
latestSchemaId,
previousChangesListName,
newChangesListName,
changelogListName,
Expand All @@ -769,7 +810,8 @@ public boolean tryCommitOnce(
totalRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
currentWatermark);
currentWatermark,
statsFileName);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
cleanUpTmpManifests(
Expand Down
Loading