Skip to content

Commit

Permalink
[core] Introduce stats in snapshot (#2677)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jan 20, 2024
1 parent 1bc33d9 commit 52d0aa2
Show file tree
Hide file tree
Showing 15 changed files with 881 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.util.OptionalLong;

/** Utils for Optional. * */
public class OptionalUtils {
public static OptionalLong ofNullable(Long value) {
return value == null ? OptionalLong.empty() : OptionalLong.of(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.TagCallback;
Expand Down Expand Up @@ -142,6 +144,14 @@ public IndexFileHandler newIndexFileHandler() {
new HashIndexFile(fileIO, pathFactory().indexFileFactory()));
}

@Override
public StatsFileHandler newStatsFileHandler() {
return new StatsFileHandler(
snapshotManager(),
schemaManager,
new StatsFile(fileIO, pathFactory().statsFileFactory()));
}

@Override
public RowType partitionType() {
return partitionType;
Expand Down Expand Up @@ -175,7 +185,8 @@ public FileStoreCommitImpl newCommit(String commitUser) {
options.manifestFullCompactionThresholdSize(),
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator());
newKeyComparator(),
newStatsFileHandler());
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
Expand Down Expand Up @@ -68,6 +69,8 @@ public interface FileStore<T> extends Serializable {

IndexFileHandler newIndexFileHandler();

StatsFileHandler newStatsFileHandler();

FileStoreRead<T> newRead();

FileStoreWrite<T> newWrite(String commitUser);
Expand Down
35 changes: 28 additions & 7 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class Snapshot {
private static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
private static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
private static final String FIELD_WATERMARK = "watermark";
private static final String FIELD_STATISTICS = "statistics";

// version of snapshot
// null for paimon <= 0.2
Expand Down Expand Up @@ -169,6 +170,13 @@ public class Snapshot {
@Nullable
private final Long watermark;

// stats file name for statistics of this table
// null if no stats file
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_STATISTICS)
@Nullable
private final String statistics;

public Snapshot(
long id,
long schemaId,
Expand All @@ -184,7 +192,8 @@ public Snapshot(
@Nullable Long totalRecordCount,
@Nullable Long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark) {
@Nullable Long watermark,
@Nullable String statistics) {
this(
CURRENT_VERSION,
id,
Expand All @@ -201,7 +210,8 @@ public Snapshot(
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark);
watermark,
statistics);
}

@JsonCreator
Expand All @@ -218,10 +228,11 @@ public Snapshot(
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
@JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets,
@JsonProperty(FIELD_TOTAL_RECORD_COUNT) Long totalRecordCount,
@JsonProperty(FIELD_DELTA_RECORD_COUNT) Long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) Long watermark) {
@JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount,
@JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
this.version = version;
this.id = id;
this.schemaId = schemaId;
Expand All @@ -238,6 +249,7 @@ public Snapshot(
this.deltaRecordCount = deltaRecordCount;
this.changelogRecordCount = changelogRecordCount;
this.watermark = watermark;
this.statistics = statistics;
}

@JsonGetter(FIELD_VERSION)
Expand Down Expand Up @@ -327,6 +339,12 @@ public Long watermark() {
return watermark;
}

@JsonGetter(FIELD_STATISTICS)
@Nullable
public String statistics() {
return statistics;
}

/**
* Return all {@link ManifestFileMeta} instances for either data or changelog manifests in this
* snapshot.
Expand Down Expand Up @@ -487,6 +505,9 @@ public enum CommitKind {
COMPACT,

/** Changes that clear up the whole partition and then add new records. */
OVERWRITE
OVERWRITE,

/** Collect statistics. */
ANALYZE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.FileStorePathFactory;

Expand Down Expand Up @@ -85,6 +86,12 @@ void overwrite(
/** With metrics to measure commits. */
FileStoreCommit withMetrics(CommitMetrics metrics);

/**
* Commit new statistics. The {@link Snapshot.CommitKind} of generated snapshot is {@link
* Snapshot.CommitKind#ANALYZE}.
*/
void commitStatistics(Stats stats, long commitIdentifier);

FileStorePathFactory pathFactory();

FileIO fileIO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {

private CommitMetrics commitMetrics;

private final StatsFileHandler statsFileHandler;

public FileStoreCommitImpl(
FileIO fileIO,
SchemaManager schemaManager,
Expand All @@ -132,7 +136,8 @@ public FileStoreCommitImpl(
MemorySize manifestFullCompactionSize,
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
@Nullable Comparator<InternalRow> keyComparator) {
@Nullable Comparator<InternalRow> keyComparator,
StatsFileHandler statsFileHandler) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
Expand All @@ -150,10 +155,10 @@ public FileStoreCommitImpl(
this.manifestMergeMinCount = manifestMergeMinCount;
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.keyComparator = keyComparator;

this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = null;
this.statsFileHandler = statsFileHandler;
}

@Override
Expand Down Expand Up @@ -248,7 +253,8 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
safeLatestSnapshotId);
safeLatestSnapshotId,
null);
generatedSnapshot += 1;
}

Expand Down Expand Up @@ -276,7 +282,8 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
safeLatestSnapshotId);
safeLatestSnapshotId,
null);
generatedSnapshot += 1;
}
} finally {
Expand Down Expand Up @@ -421,6 +428,7 @@ public void overwrite(
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
null,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -504,6 +512,21 @@ public FileStoreCommit withMetrics(CommitMetrics metrics) {
return this;
}

@Override
public void commitStatistics(Stats stats, long commitIdentifier) {
String statsFileName = statsFileHandler.writeStats(stats);
tryCommit(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
commitIdentifier,
null,
Collections.emptyMap(),
Snapshot.CommitKind.ANALYZE,
null,
statsFileName);
}

@Override
public FileStorePathFactory pathFactory() {
return pathFactory;
Expand Down Expand Up @@ -573,7 +596,8 @@ private int tryCommit(
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Long safeLatestSnapshotId) {
@Nullable Long safeLatestSnapshotId,
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
Expand All @@ -587,7 +611,8 @@ private int tryCommit(
logOffsets,
commitKind,
latestSnapshot,
safeLatestSnapshotId)) {
safeLatestSnapshotId,
statsFileName)) {
break;
}
}
Expand Down Expand Up @@ -650,6 +675,7 @@ private int tryOverwrite(
logOffsets,
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
null,
null)) {
break;
}
Expand All @@ -666,8 +692,9 @@ public boolean tryCommitOnce(
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Snapshot latestSnapshot,
Long safeLatestSnapshotId) {
@Nullable Snapshot latestSnapshot,
@Nullable Long safeLatestSnapshotId,
@Nullable String newStatsFileName) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
Expand Down Expand Up @@ -752,11 +779,28 @@ public boolean tryCommitOnce(
newIndexManifest = indexManifest;
}

long latestSchemaId = schemaManager.latest().get().id();

// write new stats or inherit from the previous snapshot
String statsFileName = null;
if (newStatsFileName != null) {
statsFileName = newStatsFileName;
} else if (latestSnapshot != null) {
Optional<Stats> previousStatistic = statsFileHandler.readStats(latestSnapshot);
if (previousStatistic.isPresent()) {
if (previousStatistic.get().schemaId() != latestSchemaId) {
LOG.warn("Schema changed, stats will not be inherited");
} else {
statsFileName = latestSnapshot.statistics();
}
}
}

// prepare snapshot file
newSnapshot =
new Snapshot(
newSnapshotId,
schemaManager.latest().get().id(),
latestSchemaId,
previousChangesListName,
newChangesListName,
changelogListName,
Expand All @@ -769,7 +813,8 @@ public boolean tryCommitOnce(
totalRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
currentWatermark);
currentWatermark,
statsFileName);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
cleanUpTmpManifests(
Expand Down
Loading

0 comments on commit 52d0aa2

Please sign in to comment.