Skip to content

Commit

Permalink
update for comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jan 22, 2024
1 parent 3b1b23e commit 1b7312d
Show file tree
Hide file tree
Showing 17 changed files with 107 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.FileStorePathFactory;

Expand Down Expand Up @@ -90,7 +90,7 @@ void overwrite(
* Commit new statistics. The {@link Snapshot.CommitKind} of generated snapshot is {@link
* Snapshot.CommitKind#ANALYZE}.
*/
void commitStatistics(Stats stats, long commitIdentifier);
void commitStatistics(Statistics stats, long commitIdentifier);

FileStorePathFactory pathFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
Expand Down Expand Up @@ -513,7 +513,7 @@ public FileStoreCommit withMetrics(CommitMetrics metrics) {
}

@Override
public void commitStatistics(Stats stats, long commitIdentifier) {
public void commitStatistics(Statistics stats, long commitIdentifier) {
String statsFileName = statsFileHandler.writeStats(stats);
tryCommit(
Collections.emptyList(),
Expand Down Expand Up @@ -786,7 +786,7 @@ public boolean tryCommitOnce(
if (newStatsFileName != null) {
statsFileName = newStatsFileName;
} else if (latestSnapshot != null) {
Optional<Stats> previousStatistic = statsFileHandler.readStats(latestSnapshot);
Optional<Statistics> previousStatistic = statsFileHandler.readStats(latestSnapshot);
if (previousStatistic.isPresent()) {
if (previousStatistic.get().schemaId() != latestSchemaId) {
LOG.warn("Schema changed, stats will not be inherited");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* <li>colStats: column stats map
* </ul>
*/
public class Stats {
public class Statistics {

// ID of the snapshot this statistics collected from
private static final String FIELD_SNAPSHOT_ID = "snapshotId";
Expand Down Expand Up @@ -74,7 +74,7 @@ public class Stats {
private final Map<String, ColStats<?>> colStats;

@JsonCreator
public Stats(
public Statistics(
@JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_MERGED_RECORD_COUNT) @Nullable Long mergedRecordCount,
Expand All @@ -87,7 +87,8 @@ public Stats(
this.colStats = colStats;
}

public Stats(long snapshotId, long schemaId, Long mergedRecordCount, Long mergedRecordSize) {
public Statistics(
long snapshotId, long schemaId, Long mergedRecordCount, Long mergedRecordSize) {
this(snapshotId, schemaId, mergedRecordCount, mergedRecordSize, Collections.emptyMap());
}

Expand Down Expand Up @@ -161,14 +162,14 @@ public String toJson() {
return JsonSerdeUtil.toJson(this);
}

public static Stats fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Stats.class);
public static Statistics fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Statistics.class);
}

public static Stats fromPath(FileIO fileIO, Path path) {
public static Statistics fromPath(FileIO fileIO, Path path) {
try {
String json = fileIO.readFileUtf8(path);
return Stats.fromJson(json);
return Statistics.fromJson(json);
} catch (IOException e) {
throw new RuntimeException("Fails to read snapshot from path " + path, e);
}
Expand All @@ -182,7 +183,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
Stats stats = (Stats) o;
Statistics stats = (Statistics) o;
return snapshotId == stats.snapshotId
&& schemaId == stats.schemaId
&& Objects.equals(mergedRecordCount, stats.mergedRecordCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ public StatsFile(FileIO fileIO, PathFactory pathFactory) {
*
* @return stats
*/
public Stats read(String fileName) {
return Stats.fromPath(fileIO, pathFactory.toPath(fileName));
public Statistics read(String fileName) {
return Statistics.fromPath(fileIO, pathFactory.toPath(fileName));
}

/**
* Write stats to a stats file.
*
* @return the written file name
*/
public String write(Stats stats) {
public String write(Statistics stats) {
Path path = pathFactory.newPath();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public StatsFileHandler(
*
* @return the written file name
*/
public String writeStats(Stats stats) {
public String writeStats(Statistics stats) {
stats.serializeFieldsToString(schemaManager.schema(stats.schemaId()));
return statsFile.write(stats);
}
Expand All @@ -53,7 +53,7 @@ public String writeStats(Stats stats) {
*
* @return stats
*/
public Optional<Stats> readStats() {
public Optional<Statistics> readStats() {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
throw new IllegalStateException("Unable to obtain the latest schema");
Expand All @@ -66,15 +66,15 @@ public Optional<Stats> readStats() {
*
* @return stats
*/
public Optional<Stats> readStats(long snapshotId) {
public Optional<Statistics> readStats(long snapshotId) {
return readStats(snapshotManager.snapshot(snapshotId));
}

public Optional<Stats> readStats(Snapshot snapshot) {
public Optional<Statistics> readStats(Snapshot snapshot) {
if (snapshot.statistics() == null) {
return Optional.empty();
} else {
Stats stats = statsFile.read(snapshot.statistics());
Statistics stats = statsFile.read(snapshot.statistics());
stats.deserializeFieldsFromString(schemaManager.schema(stats.schemaId()));
return Optional.of(stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaValidation;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
Expand Down Expand Up @@ -99,13 +99,11 @@ public AbstractFileStoreTable(
}

@Override
public Optional<Stats> statistics() {
public Optional<Statistics> statistics() {
// todo: support time travel
if (coreOptions().startupMode().equals(CoreOptions.StartupMode.LATEST_FULL)) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
if (latestSnapshot != null) {
return store().newStatsFileHandler().readStats(latestSnapshot);
}
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
if (latestSnapshot != null) {
return store().newStatsFileHandler().readStats(latestSnapshot);
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.table;

import org.apache.paimon.stats.Stats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.InnerTableWrite;
Expand Down Expand Up @@ -49,7 +49,7 @@ default Optional<String> comment() {
}

@Override
default Optional<Stats> statistics() {
default Optional<Statistics> statistics() {
return Optional.empty();
}

Expand Down
5 changes: 3 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
Expand Down Expand Up @@ -60,7 +60,8 @@ public interface Table extends Serializable {
Optional<String> comment();

/** Optional statistics of this table. */
Optional<Stats> statistics();
@Experimental
Optional<Statistics> statistics();

// ================= Table Operations ====================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Stats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -791,15 +791,15 @@ public void testWriteStats() throws Exception {
// Analyze and check
HashMap<String, ColStats<?>> fakeColStatsMap = new HashMap<>();
fakeColStatsMap.put("orderId", ColStats.newColStats(3, 10L, 1L, 10L, 0L, 8L, 8L));
Stats fakeStats =
new Stats(
Statistics fakeStats =
new Statistics(
latestSnapshot.id(),
latestSnapshot.schemaId(),
10L,
1000L,
fakeColStatsMap);
fileStoreCommit.commitStatistics(fakeStats, Long.MAX_VALUE);
Optional<Stats> readStats = statsFileHandler.readStats();
Optional<Statistics> readStats = statsFileHandler.readStats();
assertThat(readStats).isPresent();
assertThat(readStats.get()).isEqualTo(fakeStats);

Expand All @@ -823,7 +823,7 @@ public void testWriteStats() throws Exception {
fakeColStatsMap = new HashMap<>();
fakeColStatsMap.put("orderId", ColStats.newColStats(3, 30L, 1L, 30L, 0L, 8L, 8L));
fakeStats =
new Stats(
new Statistics(
latestSnapshot.id(),
latestSnapshot.schemaId(),
30L,
Expand All @@ -836,7 +836,7 @@ public void testWriteStats() throws Exception {

// Analyze without col stats and check
latestSnapshot = store.snapshotManager().latestSnapshot();
fakeStats = new Stats(latestSnapshot.id(), latestSnapshot.schemaId(), 30L, 3000L);
fakeStats = new Statistics(latestSnapshot.id(), latestSnapshot.schemaId(), 30L, 3000L);
fileStoreCommit.commitStatistics(fakeStats, Long.MAX_VALUE);
readStats = statsFileHandler.readStats();
assertThat(readStats).isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Path toPath(String fileName) {
StatsFile file = new StatsFile(LocalFileIO.create(), pathFactory);
HashMap<String, ColStats<?>> colStatsMap = new HashMap<>();
colStatsMap.put("orderId", new ColStats<>(0, 10L, "111", "222", 0L, 8L, 8L));
Stats stats = new Stats(1L, 0L, 10L, 1000L, colStatsMap);
Statistics stats = new Statistics(1L, 0L, 10L, 1000L, colStatsMap);
String fileName = file.write(stats);

assertThat(file.exists(fileName)).isTrue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark.sql

class AnalyzeTableTest extends AnalyzeTableTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark.sql

class AnalyzeTableTest extends AnalyzeTableTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark.sql

class AnalyzeTableTest extends AnalyzeTableTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class PaimonPostHocResolutionRules(session: SparkSession) extends Rule[Logi
PaimonTruncateTableCommand(table, Map.empty)

case a @ AnalyzeTable(
ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: SparkTable, _),
ResolvedTable(catalog, identifier, table: SparkTable, _),
partitionSpec,
noScan) if a.resolved =>
if (partitionSpec.nonEmpty) {
Expand All @@ -67,9 +67,9 @@ case class PaimonPostHocResolutionRules(session: SparkSession) extends Rule[Logi
}

case a @ AnalyzeColumn(
ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: SparkTable, _),
columnNames: Option[Seq[String]],
allColumns: Boolean) if a.resolved =>
ResolvedTable(catalog, identifier, table: SparkTable, _),
columnNames,
allColumns) if a.resolved =>
PaimonAnalyzeTableColumnCommand(catalog, identifier, table, columnNames, allColumns)

case _ => plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.schema.TableSchema
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.stats.{ColStats, Stats}
import org.apache.paimon.stats.{ColStats, Statistics}
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.BatchWriteBuilder

Expand Down Expand Up @@ -76,7 +76,7 @@ case class PaimonAnalyzeTableColumnCommand(
colStatsMap.put(attr.name, toPaimonColStats(attr, stats, tableSchema))
}

val stats = new Stats(
val stats = new Statistics(
currentSnapshot.id(),
currentSnapshot.schemaId(),
mergedRecordCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object Utils {
CommandUtils.computeColumnStats(sparkSession, relation, columns)
}

/** [[IntegralType]] is private in spark, therefore we need add it here. */
def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
case _: IntegralType => true
case _: DecimalType => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Assertions

class AnalyzeTableTest extends PaimonSparkTestBase {
abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {

test("Paimon analyze: analyze table only") {
spark.sql(s"""
Expand Down

0 comments on commit 1b7312d

Please sign in to comment.