From 51309352953a7c6b0184846524be18b8c126e531 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 27 Dec 2023 15:54:19 +0800 Subject: [PATCH] first version --- .../org/apache/paimon/types/DataField.java | 53 ++++-- .../apache/paimon/types/DataFieldStats.java | 140 +++++++++++++++ .../paimon/types/DataTypeJsonParser.java | 7 +- .../apache/paimon/utils/OptionalUtils.java | 28 +++ .../apache/paimon/schema/SchemaChange.java | 45 +++++ .../apache/paimon/schema/SchemaManager.java | 40 ++--- .../stats/FieldStatsArraySerializer.java | 4 +- .../paimon/catalog/CatalogTestBase.java | 30 ++++ .../flink/SchemaChangeSerializationTest.java | 2 + .../org/apache/paimon/spark/SparkCatalog.java | 4 +- .../apache/paimon/spark/SparkProcedures.java | 2 + .../spark/procedure/AnalyzeProcedure.java | 161 ++++++++++++++++++ .../apache/paimon/spark/PaimonBaseScan.scala | 5 +- .../paimon/spark/PaimonStatistics.scala | 45 ++++- .../scala/org/apache/spark/sql/Utils.scala | 10 +- .../procedure/AnalyzeProcedureTest.scala | 138 +++++++++++++++ 16 files changed, 667 insertions(+), 47 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/types/DataFieldStats.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AnalyzeProcedure.java create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AnalyzeProcedureTest.scala diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java index 4e51372c25f23..9cacd9f5d8200 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java @@ -39,7 +39,7 @@ @Public public final class DataField implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; public static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'"; @@ -53,15 +53,27 @@ public final class DataField implements Serializable { private final @Nullable String description; + private final @Nullable DataFieldStats stats; + public DataField(int id, String name, DataType dataType) { this(id, name, dataType, null); } public DataField(int id, String name, DataType type, @Nullable String description) { + this(id, name, type, description, null); + } + + public DataField( + int id, + String name, + DataType type, + @Nullable String description, + @Nullable DataFieldStats stats) { this.id = id; this.name = name; this.type = type; this.description = description; + this.stats = stats; } public int id() { @@ -76,25 +88,39 @@ public DataType type() { return type; } - public DataField newId(int newid) { - return new DataField(newid, name, type, description); + @Nullable + public String description() { + return description; + } + + @Nullable + public DataFieldStats stats() { + return stats; + } + + public DataField newId(int newId) { + return new DataField(newId, name, type, description, stats); } public DataField newName(String newName) { - return new DataField(id, newName, type, description); + return new DataField(id, newName, type, description, stats); + } + + public DataField newRowType(DataType newType) { + return new DataField(id, name, newType, description, stats); } public DataField newDescription(String newDescription) { - return new DataField(id, name, type, newDescription); + return new DataField(id, name, type, newDescription, stats); } - @Nullable - public String description() { - return description; + public DataField newStats(DataFieldStats newStats) { + return new DataField(id, name, type, description, newStats); } public DataField copy() { - return new DataField(id, name, type.copy(), description); + return new DataField( + id, name, type.copy(), description, stats == null ? null : stats.copy()); } public String asSQLString() { @@ -122,6 +148,10 @@ public void serializeJson(JsonGenerator generator) throws IOException { if (description() != null) { generator.writeStringField("description", description()); } + if (stats() != null) { + generator.writeFieldName("stats"); + stats().serializeJson(generator); + } generator.writeEndObject(); } @@ -137,12 +167,13 @@ public boolean equals(Object o) { return Objects.equals(id, field.id) && Objects.equals(name, field.name) && Objects.equals(type, field.type) - && Objects.equals(description, field.description); + && Objects.equals(description, field.description) + && Objects.equals(stats, field.stats); } @Override public int hashCode() { - return Objects.hash(id, name, type, description); + return Objects.hash(id, name, type, description, stats); } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataFieldStats.java b/paimon-common/src/main/java/org/apache/paimon/types/DataFieldStats.java new file mode 100644 index 0000000000000..3648a3e8fa865 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataFieldStats.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.types; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + +/** + * Table level col stats, supports the following stats. + * + * + * + * Todo: Support min, max + */ +public class DataFieldStats implements Serializable { + private static final long serialVersionUID = 2L; + private final @Nullable Long distinctCount; + private final @Nullable Long nullCount; + private final @Nullable Long avgLen; + private final @Nullable Long maxLen; + + public DataFieldStats( + @Nullable Long distinctCount, + @Nullable Long nullCount, + @Nullable Long avgLen, + @Nullable Long maxLen) { + this.distinctCount = distinctCount; + this.nullCount = nullCount; + this.avgLen = avgLen; + this.maxLen = maxLen; + } + + public @Nullable Long avgLen() { + return avgLen; + } + + public @Nullable Long distinctCount() { + return distinctCount; + } + + public @Nullable Long maxLen() { + return maxLen; + } + + public @Nullable Long nullCount() { + return nullCount; + } + + public void serializeJson(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + if (distinctCount != null) { + generator.writeNumberField("distinctCount", distinctCount); + } + if (nullCount != null) { + generator.writeNumberField("nullCount", nullCount); + } + if (avgLen != null) { + generator.writeNumberField("avgLen", avgLen); + } + if (maxLen != null) { + generator.writeNumberField("maxLen", maxLen); + } + generator.writeEndObject(); + } + + public static DataFieldStats deserializeJson(JsonNode jsonNode) { + return new DataFieldStats( + jsonNode.get("distinctCount") != null + ? jsonNode.get("distinctCount").asLong() + : null, + jsonNode.get("nullCount") != null ? jsonNode.get("nullCount").asLong() : null, + jsonNode.get("avgLen") != null ? jsonNode.get("avgLen").asLong() : null, + jsonNode.get("maxLen") != null ? jsonNode.get("maxLen").asLong() : null); + } + + public DataFieldStats copy() { + return new DataFieldStats(distinctCount, nullCount, avgLen, maxLen); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + DataFieldStats that = (DataFieldStats) object; + return Objects.equals(distinctCount, that.distinctCount) + && Objects.equals(nullCount, that.nullCount) + && Objects.equals(avgLen, that.avgLen) + && Objects.equals(maxLen, that.maxLen); + } + + @Override + public int hashCode() { + return Objects.hash(distinctCount, nullCount, avgLen, maxLen); + } + + @Override + public String toString() { + return "DataFieldStats{" + + "distinctCount=" + + distinctCount + + ", nullCount=" + + nullCount + + ", avgLen=" + + avgLen + + ", maxLen=" + + maxLen + + '}'; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java index 0af68df9de2be..fdb3c0eae104c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java @@ -40,11 +40,16 @@ public static DataField parseDataField(JsonNode json) { String name = json.get("name").asText(); DataType type = parseDataType(json.get("type")); JsonNode descriptionNode = json.get("description"); + JsonNode statsNode = json.get("stats"); String description = null; if (descriptionNode != null) { description = descriptionNode.asText(); } - return new DataField(id, name, type, description); + DataFieldStats fieldStats = null; + if (statsNode != null) { + fieldStats = DataFieldStats.deserializeJson(statsNode); + } + return new DataField(id, name, type, description, fieldStats); } public static DataType parseDataType(JsonNode json) { diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java new file mode 100644 index 0000000000000..ccd4167a8db79 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.util.OptionalLong; + +/** Utils for Optional. * */ +public class OptionalUtils { + public static OptionalLong of(Long value) { + return value == null ? OptionalLong.empty() : OptionalLong.of(value); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index 28c49af7cd6b8..4beeff2b6f447 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -19,6 +19,7 @@ package org.apache.paimon.schema; import org.apache.paimon.annotation.Public; +import org.apache.paimon.types.DataFieldStats; import org.apache.paimon.types.DataType; import javax.annotation.Nullable; @@ -87,6 +88,10 @@ static SchemaChange updateColumnPosition(Move move) { return new UpdateColumnPosition(move); } + static SchemaChange updateColumnStats(String fieldName, DataFieldStats newStats) { + return new UpdateColumnStats(fieldName, newStats); + } + /** A SchemaChange to set a table option. */ final class SetOption implements SchemaChange { @@ -511,4 +516,44 @@ public int hashCode() { return result; } } + + /** A SchemaChange to update field stats. */ + final class UpdateColumnStats implements SchemaChange { + + private static final long serialVersionUID = 1L; + + private final String fieldName; + private final DataFieldStats newStats; + + public UpdateColumnStats(String fieldName, DataFieldStats newStats) { + this.fieldName = fieldName; + this.newStats = newStats; + } + + public String fieldName() { + return fieldName; + } + + public DataFieldStats newStats() { + return newStats; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UpdateColumnStats that = (UpdateColumnStats) o; + return Objects.equals(fieldName, that.fieldName) + && Objects.equals(newStats, that.newStats); + } + + @Override + public int hashCode() { + return Objects.hash(fieldName, newStats); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 01b395a844866..a1882ba1dfbb7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -34,6 +34,7 @@ import org.apache.paimon.schema.SchemaChange.UpdateColumnComment; import org.apache.paimon.schema.SchemaChange.UpdateColumnNullability; import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition; +import org.apache.paimon.schema.SchemaChange.UpdateColumnStats; import org.apache.paimon.schema.SchemaChange.UpdateColumnType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -221,12 +222,7 @@ public TableSchema commitChanges(List changes) newFields, new String[] {rename.fieldName()}, 0, - (field) -> - new DataField( - field.id(), - rename.newName(), - field.type(), - field.description())); + (field) -> field.newName(rename.newName())); } else if (change instanceof DropColumn) { DropColumn drop = (DropColumn) change; validateNotPrimaryAndPartitionKey(schema, drop.fieldName()); @@ -266,11 +262,7 @@ public TableSchema commitChanges(List changes) "Update column to nested row type '%s' is not supported.", update.newDataType())); } - return new DataField( - field.id(), - field.name(), - update.newDataType(), - field.description()); + return field.newRowType(update.newDataType()); }); } else if (change instanceof UpdateColumnNullability) { UpdateColumnNullability update = (UpdateColumnNullability) change; @@ -285,23 +277,14 @@ public TableSchema commitChanges(List changes) update.fieldNames(), 0, (field) -> - new DataField( - field.id(), - field.name(), - field.type().copy(update.newNullability()), - field.description())); + field.newRowType(field.type().copy(update.newNullability()))); } else if (change instanceof UpdateColumnComment) { UpdateColumnComment update = (UpdateColumnComment) change; updateNestedColumn( newFields, update.fieldNames(), 0, - (field) -> - new DataField( - field.id(), - field.name(), - field.type(), - update.newDescription())); + (field) -> field.newDescription(update.newDescription())); } else if (change instanceof UpdateColumnPosition) { UpdateColumnPosition update = (UpdateColumnPosition) change; SchemaChange.Move move = update.move(); @@ -327,6 +310,12 @@ public TableSchema commitChanges(List changes) } } + } else if (change instanceof UpdateColumnStats) { + UpdateColumnStats update = (UpdateColumnStats) change; + updateColumn( + newFields, + update.fieldName(), + (field) -> field.newStats(update.newStats())); } else { throw new UnsupportedOperationException( "Unsupported change: " + change.getClass()); @@ -413,12 +402,9 @@ private void updateNestedColumn( updateNestedColumn(nestedFields, updateFieldNames, index + 1, updateFunc); newFields.set( i, - new DataField( - field.id(), - field.name(), + field.newRowType( new org.apache.paimon.types.RowType( - field.type().isNullable(), nestedFields), - field.description())); + field.type().isNullable(), nestedFields))); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsArraySerializer.java b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsArraySerializer.java index 17db3a53eeb16..8a0293db97fa2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsArraySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsArraySerializer.java @@ -53,7 +53,9 @@ public FieldStatsArraySerializer(RowType type) { } public FieldStatsArraySerializer( - RowType type, int[] indexMapping, CastExecutor[] converterMapping) { + RowType type, + @Nullable int[] indexMapping, + @Nullable CastExecutor[] converterMapping) { RowType safeType = toAllFieldsNullableRowType(type); this.serializer = new InternalRowSerializer(safeType); this.fieldGetters = diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index c31419488403b..3772b881ec4e8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -27,6 +27,7 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataFieldStats; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -801,4 +802,33 @@ public void testAlterTableUpdateColumnNullability() throws Exception { UnsupportedOperationException.class, "Cannot change nullability of primary key")); } + + @Test + public void testAlterTableUpdateColumnStats() throws Exception { + catalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "test_table"); + catalog.createTable( + identifier, + new Schema( + Lists.newArrayList( + new DataField(0, "col1", DataTypes.STRING(), "field1"), + new DataField(1, "col2", DataTypes.STRING(), "field2")), + Collections.emptyList(), + Collections.emptyList(), + Maps.newHashMap(), + ""), + false); + + catalog.alterTable( + identifier, + Lists.newArrayList( + SchemaChange.updateColumnStats("col2", new DataFieldStats(1L, 2L, 3L, 4L))), + false); + + Table table = catalog.getTable(identifier); + assertThat(table.rowType().getFields().get(0).stats()).isEqualTo(null); + assertThat(table.rowType().getFields().get(1).stats()) + .isEqualTo(new DataFieldStats(1L, 2L, 3L, 4L)); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java index 4606bd0d0fb80..16161cabe07ed 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.types.DataFieldStats; import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.Test; @@ -46,6 +47,7 @@ public void testSerialization() throws Exception { runTest(SchemaChange.updateColumnNullability(new String[] {"col1", "col2"}, true)); runTest(SchemaChange.updateColumnComment(new String[] {"col1", "col2"}, "comment")); runTest(SchemaChange.updateColumnPosition(SchemaChange.Move.after("col", "ref"))); + runTest(SchemaChange.updateColumnStats("col1", new DataFieldStats(1L, 2L, 3L, 4L))); } private void runTest(SchemaChange schemaChange) throws Exception { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 62b7a00b4a533..f604042d4561f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -426,7 +426,7 @@ private void validateAlterProperty(String alterKey) { } } - private boolean isValidateNamespace(String[] namespace) { + private static boolean isValidateNamespace(String[] namespace) { return namespace.length == 1; } @@ -444,7 +444,7 @@ public void renameTable(Identifier oldIdent, Identifier newIdent) // --------------------- tools ------------------------------------------ - protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) + public static org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) throws NoSuchTableException { if (!isValidateNamespace(ident.namespace())) { throw new NoSuchTableException(ident); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index e0921ed86fe5c..39851634a9d43 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark; +import org.apache.paimon.spark.procedure.AnalyzeProcedure; import org.apache.paimon.spark.procedure.CompactProcedure; import org.apache.paimon.spark.procedure.CreateTagProcedure; import org.apache.paimon.spark.procedure.DeleteTagProcedure; @@ -56,6 +57,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("migrate_table", MigrateTableProcedure::builder); procedureBuilders.put("migrate_file", MigrateFileProcedure::builder); procedureBuilders.put("remove_orphan_files", RemoveOrphanFilesProcedure::builder); + procedureBuilders.put("analyze", AnalyzeProcedure::builder); return procedureBuilders.build(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AnalyzeProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AnalyzeProcedure.java new file mode 100644 index 0000000000000..1a385336eb98d --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AnalyzeProcedure.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.spark.SparkCatalog; +import org.apache.paimon.spark.catalog.WithPaimonCatalog; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataFieldStats; + +import org.apache.spark.sql.Utils; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Analyze procedure. Usage: + * + *

+ *  CALL sys.analyze(table => 'table_name', [cols => 'co1, co2'])
+ * 
+ */ +public class AnalyzeProcedure extends BaseProcedure { + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.optional("cols", StringType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + protected AnalyzeProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + Set requiredCols = + args.isNullAt(1) + ? null + : new HashSet<>(Arrays.asList(args.getString(1).split(","))); + + FileStoreTable table = (FileStoreTable) loadSparkTable(tableIdent).getTable(); + + LogicalPlan relation = + spark().read() + .format("paimon") + .load(table.coreOptions().path().toString()) + .logicalPlan(); + + Seq requiredAttrs = relation.output(); + if (requiredCols != null) { + requiredAttrs = requiredAttrs.filter(x -> requiredCols.contains(x.name())).toSeq(); + } + + List schemaChanges = + colStatsToSchemaChange(Utils.computeColumnStats(spark(), relation, requiredAttrs)); + try { + ((WithPaimonCatalog) tableCatalog()) + .paimonCatalog() + .alterTable(SparkCatalog.toIdentifier(tableIdent), schemaChanges, false); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException + | NoSuchTableException e) { + throw new RuntimeException(e); + } + + return new InternalRow[] {newInternalRow(true)}; + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public AnalyzeProcedure doBuild() { + return new AnalyzeProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "AnalyzeProcedure"; + } + + private List colStatsToSchemaChange( + Tuple2> sparkColStats) { + return JavaConverters.mapAsJavaMapConverter(sparkColStats._2()).asJava().entrySet().stream() + .map( + entry -> + SchemaChange.updateColumnStats( + entry.getKey().name(), + sparkColumnStatsToPaimon(entry.getValue()))) + .collect(Collectors.toList()); + } + + private DataFieldStats sparkColumnStatsToPaimon(ColumnStat columnStat) { + return new DataFieldStats( + columnStat.distinctCount().isDefined() + ? columnStat.distinctCount().get().longValue() + : null, + columnStat.nullCount().isDefined() + ? columnStat.nullCount().get().longValue() + : null, + columnStat.avgLen().isDefined() ? (Long) columnStat.avgLen().get() : null, + columnStat.maxLen().isDefined() ? (Long) columnStat.maxLen().get() : null); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala index 5ede8883dfdc0..898a2cd17f7bb 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala @@ -22,6 +22,7 @@ import org.apache.paimon.predicate.{Predicate, PredicateBuilder} import org.apache.paimon.spark.sources.PaimonMicroBatchStream import org.apache.paimon.table.{DataTable, FileStoreTable, Table} import org.apache.paimon.table.source.{ReadBuilder, Split} +import org.apache.paimon.types.RowType import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics} @@ -40,7 +41,7 @@ abstract class PaimonBaseScan( with SupportsReportStatistics with ScanHelper { - private val tableRowType = table.rowType + val tableRowType: RowType = table.rowType private lazy val tableSchema = SparkTypeUtils.fromPaimonRowType(tableRowType) @@ -106,6 +107,8 @@ abstract class PaimonBaseScan( super.supportedCustomMetrics() ++ paimonMetrics } + override def toString: String = description() + override def description(): String = { val pushedFiltersStr = if (filters.nonEmpty) { ", PushedFilters: [" + filters.mkString(",") + "]" diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala index 208c5816c98ee..f2a6ccad39947 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala @@ -17,11 +17,20 @@ */ package org.apache.paimon.spark +import org.apache.paimon.types.DataFieldStats +import org.apache.paimon.utils.OptionalUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Utils +import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.connector.read.Statistics +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics + +import java.util.{Optional, OptionalLong} -import java.util.OptionalLong +import scala.collection.JavaConverters._ -case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics { +case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics with Logging { private lazy val rowCount: Long = scan.getSplits.map(_.rowCount).sum @@ -31,5 +40,35 @@ case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics { override def numRows(): OptionalLong = OptionalLong.of(rowCount) - // TODO: extend columnStats for CBO + override def columnStats(): java.util.Map[NamedReference, ColumnStatistics] = { + val requiredFields = scan.readSchema().fieldNames.toList.asJava + val resultMap = new java.util.HashMap[NamedReference, ColumnStatistics]() + scan.tableRowType.getFields + .stream() + .filter(field => requiredFields.contains(field.name)) + .forEach(f => resultMap.put(Utils.fieldReference(f.name()), PaimonColumnStats(f.stats()))) + resultMap + } +} + +case class PaimonColumnStats( + override val nullCount: OptionalLong, + override val min: Optional[Object], + override val max: Optional[Object], + override val distinctCount: OptionalLong, + override val avgLen: OptionalLong, + override val maxLen: OptionalLong) + extends ColumnStatistics + +object PaimonColumnStats { + def apply(stats: DataFieldStats): PaimonColumnStats = { + PaimonColumnStats( + OptionalUtils.of(stats.nullCount), + Optional.empty(), + Optional.empty(), + OptionalUtils.of(stats.distinctCount), + OptionalUtils.of(stats.avgLen), + OptionalUtils.of(stats.maxLen) + ) + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala index e8f1b418aa497..f9ed14a7d9749 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources.Filter import org.apache.spark.util.{Utils => SparkUtils} @@ -68,4 +69,11 @@ object Utils { def bytesToString(size: Long): String = { SparkUtils.bytesToString(size) } + + def computeColumnStats( + sparkSession: SparkSession, + relation: LogicalPlan, + columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = { + CommandUtils.computeColumnStats(sparkSession, relation, columns) + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AnalyzeProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AnalyzeProcedureTest.scala new file mode 100644 index 0000000000000..f8cdbdde5b2ba --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AnalyzeProcedureTest.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.procedure + +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.types.{DataField, DataFieldStats} + +import org.apache.spark.sql.Row +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.Assertions + +import java.util +import java.util.Objects + +/** Test cases for [[AnalyzeProcedure]]. */ +class AnalyzeProcedureTest extends PaimonSparkTestBase { + + test("Paimon procedure: analyze all supported cols") { + spark.sql( + s""" + |CREATE TABLE T (id STRING, name STRING, byte_col BYTE, short_col SHORT, int_col INT, long_col LONG, + |float_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 5), boolean_col BOOLEAN, date_col DATE, + |timestamp_col TIMESTAMP, binary_col BINARY) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id') + |""".stripMargin) + + spark.sql( + s"INSERT INTO T VALUES ('1', 'a', 1, 1, 1, 1, 1.0, 1.0, 12.12345, true, to_date('2020-01-01'), to_timestamp('2020-01-01 00:00:00'), binary('example binary1'))") + spark.sql( + s"INSERT INTO T VALUES ('2', 'aaa', 1, null, 1, 1, 1.0, 1.0, 12.12345, true, to_date('2020-01-02'), to_timestamp('2020-01-02 00:00:00'), binary('example binary1'))") + spark.sql( + s"INSERT INTO T VALUES ('3', 'bbbb', 2, 1, 1, 1, 1.0, 1.0, 22.12345, true, to_date('2020-01-02'), to_timestamp('2020-01-02 00:00:00'), null)") + spark.sql( + s"INSERT INTO T VALUES ('4', 'bbbbbbbb', 2, 2, 2, 2, 2.0, 2.0, 22.12345, false, to_date('2020-01-01'), to_timestamp('2020-01-01 00:00:00'), binary('example binary2'))") + + spark.sql(s"CALL sys.analyze(table => 'T')") + var fields = loadTable("T").schema().fields() + assertStatsEqual(fields, "id", new DataFieldStats(4, 0, 1, 1)) + assertStatsEqual(fields, "name", new DataFieldStats(4, 0, 4, 8)) + assertStatsEqual(fields, "byte_col", new DataFieldStats(2, 0, 1, 1)) + assertStatsEqual(fields, "short_col", new DataFieldStats(2, 1, 2, 2)) + assertStatsEqual(fields, "int_col", new DataFieldStats(2, 0, 4, 4)) + assertStatsEqual(fields, "long_col", new DataFieldStats(2, 0, 8, 8)) + assertStatsEqual(fields, "float_col", new DataFieldStats(2, 0, 4, 4)) + assertStatsEqual(fields, "double_col", new DataFieldStats(2, 0, 8, 8)) + assertStatsEqual(fields, "decimal_col", new DataFieldStats(2, 0, 8, 8)) + assertStatsEqual(fields, "boolean_col", new DataFieldStats(2, 0, 1, 1)) + assertStatsEqual(fields, "date_col", new DataFieldStats(2, 0, 4, 4)) + assertStatsEqual(fields, "timestamp_col", new DataFieldStats(2, 0, 8, 8)) + assertStatsEqual(fields, "binary_col", new DataFieldStats(2, 1, 15, 15)) + + spark.sql( + s"INSERT INTO T VALUES ('5', 'bbbbbbbbbbbbbbbb', 3, 3, 3, 3, 3.0, 3.0, 32.12345, false, to_date('2020-01-01'), to_timestamp('2020-01-01 00:00:00'), binary('binary3'))") + + spark.sql(s"CALL sys.analyze(table => 'T')") + fields = loadTable("T").schema().fields() + assertStatsEqual(fields, "id", new DataFieldStats(5, 0, 1, 1)) + assertStatsEqual(fields, "name", new DataFieldStats(5, 0, 7, 16)) + assertStatsEqual(fields, "byte_col", new DataFieldStats(3, 0, 1, 1)) + assertStatsEqual(fields, "short_col", new DataFieldStats(3, 1, 2, 2)) + assertStatsEqual(fields, "int_col", new DataFieldStats(3, 0, 4, 4)) + assertStatsEqual(fields, "long_col", new DataFieldStats(3, 0, 8, 8)) + assertStatsEqual(fields, "float_col", new DataFieldStats(3, 0, 4, 4)) + assertStatsEqual(fields, "double_col", new DataFieldStats(3, 0, 8, 8)) + assertStatsEqual(fields, "decimal_col", new DataFieldStats(3, 0, 8, 8)) + assertStatsEqual(fields, "boolean_col", new DataFieldStats(2, 0, 1, 1)) + assertStatsEqual(fields, "date_col", new DataFieldStats(2, 0, 4, 4)) + assertStatsEqual(fields, "timestamp_col", new DataFieldStats(2, 0, 8, 8)) + assertStatsEqual(fields, "binary_col", new DataFieldStats(3, 1, 13, 15)) + + spark.sql(s"select * from T") + } + + test("Paimon procedure: analyze unsupported cols") { + spark.sql( + s""" + |CREATE TABLE T (id STRING, m MAP, l ARRAY, s STRUCT) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id') + |""".stripMargin) + + spark.sql(s"INSERT INTO T VALUES ('1', map(1, 'a'), array(1), struct(1, 'a'))") + + assertThatThrownBy(() => spark.sql(s"CALL sys.analyze(table => 'T', cols => 'm')")) + .hasMessageContaining("not supported") + + assertThatThrownBy(() => spark.sql(s"CALL sys.analyze(table => 'T', cols => 'l')")) + .hasMessageContaining("not supported") + + assertThatThrownBy(() => spark.sql(s"CALL sys.analyze(table => 'T', cols => 's')")) + .hasMessageContaining("not supported") + } + + test("Paimon procedure: analyze specialized cols") { + spark.sql(s""" + |CREATE TABLE T (id STRING, name STRING, i INT, l LONG) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id') + |""".stripMargin) + + spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)") + spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)") + + spark.sql(s"CALL sys.analyze(table => 'T', cols => 'name,i')") + val fields = loadTable("T").schema().fields() + assertStatsEqual(fields, "id", null) + assertStatsEqual(fields, "name", new DataFieldStats(2, 0, 2, 3)) + assertStatsEqual(fields, "i", new DataFieldStats(1, 0, 4, 4)) + assertStatsEqual(fields, "l", null) + + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row("1", "a", 1, 1) :: Row("2", "aaa", 1, 2) :: Nil) + } + + def assertStatsEqual(lst: util.List[DataField], name: String, stats: DataFieldStats): Unit = { + Assertions.assertTrue( + Objects + .equals(lst.stream().filter(f => f.name().equals(name)).findFirst().get.stats(), stats)) + } + +}