diff --git a/docs/content/spark/auxiliary.md b/docs/content/spark/auxiliary.md index 6330ca27ce31..5de0289565f2 100644 --- a/docs/content/spark/auxiliary.md +++ b/docs/content/spark/auxiliary.md @@ -96,6 +96,17 @@ SHOW PARTITIONS my_table; SHOW PARTITIONS my_table PARTITION (dt=20230817); ``` +## Show Table Extended +The SHOW TABLE EXTENDED statement is used to list table or partition information. + +```sql +-- Lists tables that satisfy regular expressions +SHOW TABLE EXTENDED IN db_name LIKE 'test*'; + +-- Lists the specified partition information for the table +SHOW TABLE EXTENDED IN db_name LIKE 'table_name' PARTITION(pt = '2024'); +``` + ## Analyze table The ANALYZE TABLE statement collects statistics about the table, that are to be used by the query optimizer to find a better query execution plan. diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 9a305ca59a0f..840f1341a69d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -100,7 +100,7 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { } override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] = { - throw new UnsupportedOperationException("Load partition is not supported") + Map.empty[String, String].asJava } override def listPartitionIdentifiers( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala new file mode 100644 index 000000000000..5d6a5a063c06 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.spark.sql.PaimonUtils.{normalizePartitionSpec, requireExactMatchedPartitionSpec} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec.conf +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +object PaimonResolvePartitionSpec { + + def resolve( + catalog: TableCatalog, + tableIndent: Identifier, + partitionSpec: PartitionSpec): ResolvedPartitionSpec = { + val table = catalog.loadTable(tableIndent).asPartitionable + partitionSpec match { + case u: UnresolvedPartitionSpec => + val partitionSchema = table.partitionSchema() + resolvePartitionSpec(table.name(), u, partitionSchema, allowPartitionSpec = false) + case o => o.asInstanceOf[ResolvedPartitionSpec] + } + } + + private def resolvePartitionSpec( + tableName: String, + partSpec: UnresolvedPartitionSpec, + partSchema: StructType, + allowPartitionSpec: Boolean): ResolvedPartitionSpec = { + val normalizedSpec = normalizePartitionSpec(partSpec.spec, partSchema, tableName, conf.resolver) + if (!allowPartitionSpec) { + requireExactMatchedPartitionSpec(tableName, normalizedSpec, partSchema.fieldNames) + } + val partitionNames = normalizedSpec.keySet + val requestedFields = partSchema.filter(field => partitionNames.contains(field.name)) + ResolvedPartitionSpec( + requestedFields.map(_.name), + convertToPartIdent(normalizedSpec, requestedFields), + partSpec.location) + } + + def convertToPartIdent( + partitionSpec: TablePartitionSpec, + schema: Seq[StructField]): InternalRow = { + val partValues = schema.map { + part => + val raw = partitionSpec.get(part.name).orNull + val dt = CharVarcharUtils.replaceCharVarcharWithString(part.dataType) + Cast(Literal.create(raw, StringType), dt, Some(conf.sessionLocalTimeZone)).eval() + } + InternalRow.fromSeq(partValues) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index e8f75d394a81..f73df64fb8ab 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -40,6 +40,8 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectResolutionRule(spark => new PaimonAnalysis(spark)) extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark)) extensions.injectResolutionRule(spark => PaimonViewResolver(spark)) + extensions.injectResolutionRule( + spark => SparkShimLoader.getSparkShim.createCustomResolution(spark)) extensions.injectResolutionRule(spark => PaimonIncompatibleResolutionRules(spark)) extensions.injectPostHocResolutionRule(spark => PaimonPostHocResolutionRules(spark)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala index 4492d856ad50..cc49e787dc81 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala @@ -20,11 +20,15 @@ package org.apache.spark.sql import org.apache.spark.executor.OutputMetrics import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.PartitioningUtils import org.apache.spark.util.{Utils => SparkUtils} /** @@ -87,4 +91,19 @@ object PaimonUtils { outputMetrics.setBytesWritten(bytesWritten) outputMetrics.setRecordsWritten(recordsWritten) } + + def normalizePartitionSpec[T]( + partitionSpec: Map[String, T], + partCols: StructType, + tblName: String, + resolver: Resolver): Map[String, T] = { + PartitioningUtils.normalizePartitionSpec(partitionSpec, partCols, tblName, resolver) + } + + def requireExactMatchedPartitionSpec( + tableName: String, + spec: TablePartitionSpec, + partitionColumnNames: Seq[String]): Unit = { + PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec, partitionColumnNames) + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala new file mode 100644 index 000000000000..f1f20fb6fb31 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +object PaimonCatalogImplicits { + + import CatalogV2Implicits._ + + implicit class PaimonCatalogHelper(plugin: CatalogPlugin) extends CatalogHelper(plugin) + + implicit class PaimonNamespaceHelper(namespace: Array[String]) extends NamespaceHelper(namespace) + +// implicit class PaimonTableHelper(table: Table) extends TableHelper(table) +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala index 2ab3dc494524..5db6894ba093 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.ExternalCatalog +import org.apache.spark.sql.connector.catalog.CatalogV2Util import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.paimon.ReflectUtils @@ -40,4 +41,6 @@ object PaimonCatalogUtils { hadoopConf) } + val TABLE_RESERVED_PROPERTIES: Seq[String] = CatalogV2Util.TABLE_RESERVED_PROPERTIES + } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index bd85282737e9..334bd6e93180 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -24,6 +24,8 @@ import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -39,6 +41,8 @@ trait SparkShim { def createSparkParser(delegate: ParserInterface): ParserInterface + def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] + def createSparkInternalRow(rowType: RowType): SparkInternalRow def createSparkArrayData(elementType: DataType): SparkArrayData diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala index 528dcd3cd107..ae538fa48c4e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala @@ -27,6 +27,76 @@ import java.util.Objects class DescribeTableTest extends PaimonSparkTestBase { + test("Paimon show: show table extended") { + val testDB = "test_show" + withDatabase(testDB) { + spark.sql("CREATE TABLE s1 (id INT)") + + spark.sql(s"CREATE DATABASE $testDB") + spark.sql(s"USE $testDB") + spark.sql("CREATE TABLE s2 (id INT, pt STRING) PARTITIONED BY (pt)") + spark.sql("CREATE TABLE s3 (id INT, pt1 STRING, pt2 STRING) PARTITIONED BY (pt1, pt2)") + + spark.sql("INSERT INTO s2 VALUES (1, '2024'), (2, '2024'), (3, '2025'), (4, '2026')") + spark.sql(""" + |INSERT INTO s3 + |VALUES + |(1, '2024', '11'), (2, '2024', '12'), (3, '2025', '11'), (4, '2025', '12') + |""".stripMargin) + + // SHOW TABL EXTENDED will give four columns: namespace, tableName, isTemporary, information. + checkAnswer( + sql(s"SHOW TABLE EXTENDED IN $dbName0 LIKE '*'") + .select("namespace", "tableName", "isTemporary"), + Row("test", "s1", false)) + checkAnswer( + sql(s"SHOW TABLE EXTENDED IN $testDB LIKE '*'") + .select("namespace", "tableName", "isTemporary"), + Row(testDB, "s2", false) :: Row(testDB, "s3", false) :: Nil + ) + + // check table s1 + val res1 = spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2'").select("information") + Assertions.assertEquals(1, res1.count()) + val information1 = res1 + .collect() + .head + .getString(0) + .split("\n") + .map { + line => + val kv = line.split(": ", 2) + kv(0) -> kv(1) + } + .toMap + Assertions.assertEquals(information1("Catalog"), "paimon") + Assertions.assertEquals(information1("Namespace"), testDB) + Assertions.assertEquals(information1("Table"), "s2") + Assertions.assertEquals(information1("Provider"), "paimon") + Assertions.assertEquals(information1("Location"), loadTable(testDB, "s2").location().toString) + + // check table s2 partition info + val error1 = intercept[Exception] { + spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' PARTITION(pt='2022')") + }.getMessage + assert(error1.contains("PARTITIONS_NOT_FOUND")) + + val error2 = intercept[Exception] { + spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's3' PARTITION(pt1='2024')") + }.getMessage + assert(error2.contains("Partition spec is invalid")) + + val res2 = + spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's3' PARTITION(pt1 = '2024', pt2 = 11)") + checkAnswer( + res2.select("namespace", "tableName", "isTemporary"), + Row(testDB, "s3", false) + ) + Assertions.assertTrue( + res2.select("information").collect().head.getString(0).contains("Partition Values")) + } + } + test(s"Paimon describe: describe table comment") { var comment = "test comment" spark.sql(s""" diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala new file mode 100644 index 000000000000..924df2d1e320 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.spark.commands.{PaimonShowTablePartitionCommand, PaimonShowTablesExtendedCommand} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedNamespace, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTableExtended} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.Identifier + +case class Spark3ResolutionRules(session: SparkSession) + extends Rule[LogicalPlan] + with SQLConfHelper { + + import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { + case ShowTableExtended( + ResolvedNamespace(catalog, ns), + pattern, + partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))), + output) => + partitionSpec + .map { + spec: PartitionSpec => + val table = Identifier.of(ns.toArray, pattern) + val resolvedSpec = + PaimonResolvePartitionSpec.resolve(catalog.asTableCatalog, table, spec) + PaimonShowTablePartitionCommand(output, catalog.asTableCatalog, table, resolvedSpec) + } + .getOrElse { + PaimonShowTablesExtendedCommand(catalog.asTableCatalog, ns, pattern, output) + } + + } + +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala new file mode 100644 index 000000000000..32f94985859c --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName +import org.apache.spark.sql.catalyst.expressions.{Attribute, ToPrettyString} +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog} +import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +case class PaimonShowTablePartitionCommand( + override val output: Seq[Attribute], + catalog: TableCatalog, + tableIndent: Identifier, + partSpec: ResolvedPartitionSpec) + extends PaimonLeafRunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { + val rows = new mutable.ArrayBuffer[Row]() + val table = catalog.loadTable(tableIndent) + val information = getTablePartitionDetails(tableIndent, table.asPartitionable, partSpec) + rows += Row(tableIndent.namespace.quoted, tableIndent.name(), false, s"$information\n") + + rows.toSeq + } + + private def getTablePartitionDetails( + tableIdent: Identifier, + partitionTable: SupportsPartitionManagement, + partSpec: ResolvedPartitionSpec): String = { + val results = new mutable.LinkedHashMap[String, String]() + + // "Partition Values" + val partitionSchema = partitionTable.partitionSchema() + val (names, ident) = (partSpec.names, partSpec.ident) + val partitionIdentifiers = partitionTable.listPartitionIdentifiers(names.toArray, ident) + if (partitionIdentifiers.isEmpty) { + val part = ident + .toSeq(partitionSchema) + .zip(partitionSchema.map(_.name)) + .map(kv => s"${kv._2}" + s" = ${kv._1}") + .mkString(", ") + throw new RuntimeException( + s""" + |[PARTITIONS_NOT_FOUND] The partition(s) PARTITION ($part) cannot be found in table ${tableIdent.toString}. + |Verify the partition specification and table name. + |""".stripMargin) + } + assert(partitionIdentifiers.length == 1) + val row = partitionIdentifiers.head + val len = partitionSchema.length + val partitions = new Array[String](len) + val timeZoneId = conf.sessionLocalTimeZone + for (i <- 0 until len) { + val dataType = partitionSchema(i).dataType + val partValueUTF8String = + ToPrettyString(Literal(row.get(i, dataType), dataType), Some(timeZoneId)).eval(null) + val partValueStr = if (partValueUTF8String == null) "null" else partValueUTF8String.toString + partitions(i) = escapePathName(partitionSchema(i).name) + "=" + escapePathName(partValueStr) + } + val partitionValues = partitions.mkString("[", ", ", "]") + results.put("Partition Values", s"$partitionValues") + + // TODO "Partition Parameters", "Created Time", "Last Access", "Partition Statistics" + + results + .map { + case (key, value) => + if (value.isEmpty) key else s"$key: $value" + } + .mkString("", "\n", "") + } +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala new file mode 100644 index 000000000000..b393982e25d3 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.{QuotingUtils, StringUtils} +import org.apache.spark.sql.connector.catalog.{Identifier, PaimonCatalogUtils, SupportsPartitionManagement, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +case class PaimonShowTablesExtendedCommand( + catalog: TableCatalog, + namespace: Seq[String], + pattern: String, + override val output: Seq[Attribute], + isExtended: Boolean = false, + partitionSpec: Option[TablePartitionSpec] = None) + extends PaimonLeafRunnableCommand { + + override def run(spark: SparkSession): Seq[Row] = { + val rows = new mutable.ArrayBuffer[Row]() + + val tables = catalog.listTables(namespace.toArray) + tables.map { + tableIdent: Identifier => + if (StringUtils.filterPattern(Seq(tableIdent.name()), pattern).nonEmpty) { + val table = catalog.loadTable(tableIdent) + val information = getTableDetails(catalog.name, tableIdent, table) + rows += Row(tableIdent.namespace().quoted, tableIdent.name(), false, s"$information\n") + } + } + + // TODO: view + + rows.toSeq + } + + private def getTableDetails(catalogName: String, identifier: Identifier, table: Table): String = { + val results = new mutable.LinkedHashMap[String, String]() + + results.put("Catalog", catalogName) + results.put("Namespace", identifier.namespace().quoted) + results.put("Table", identifier.name()) + val tableType = if (table.properties().containsKey(TableCatalog.PROP_EXTERNAL)) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + results.put("Type", tableType.name) + + PaimonCatalogUtils.TABLE_RESERVED_PROPERTIES + .filterNot(_ == TableCatalog.PROP_EXTERNAL) + .foreach( + propKey => { + if (table.properties.containsKey(propKey)) { + results.put(propKey.capitalize, table.properties.get(propKey)) + } + }) + + val properties: Seq[String] = + conf + .redactOptions(table.properties.asScala.toMap) + .toList + .filter(kv => !PaimonCatalogUtils.TABLE_RESERVED_PROPERTIES.contains(kv._1)) + .sortBy(_._1) + .map { case (key, value) => key + "=" + value } + if (!table.properties().isEmpty) { + results.put("Table Properties", properties.mkString("[", ", ", "]")) + } + + // Partition Provider & Partition Columns + if (supportsPartitions(table) && table.asPartitionable.partitionSchema().nonEmpty) { + results.put("Partition Provider", "Catalog") + results.put( + "Partition Columns", + table.asPartitionable + .partitionSchema() + .map(field => QuotingUtils.quoteIdentifier(field.name)) + .mkString("[", ", ", "]")) + } + + if (table.schema().nonEmpty) { + results.put("Schema", table.schema().treeString) + } + + results + .map { + case (key, value) => + if (value.isEmpty) key else s"$key: $value" + } + .mkString("", "\n", "") + } + + private def supportsPartitions(table: Table): Boolean = table match { + case _: SupportsPartitionManagement => true + case _ => false + } + +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala index 57d79d6474e9..f508e2605cbc 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.paimon.shims +import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} @@ -25,7 +26,8 @@ import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -38,6 +40,10 @@ class Spark3Shim extends SparkShim { new PaimonSpark3SqlExtensionsParser(delegate) } + override def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] = { + Spark3ResolutionRules(spark) + } + override def createSparkInternalRow(rowType: RowType): SparkInternalRow = { new Spark3InternalRow(rowType) } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala new file mode 100644 index 000000000000..461cbd0c938a --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +case class Spark4ResolutionRules(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index dfec4eb71f4f..eefddafdbfb8 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.paimon.shims +import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} @@ -25,7 +26,8 @@ import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.internal.ExpressionUtils @@ -38,6 +40,11 @@ class Spark4Shim extends SparkShim { override def createSparkParser(delegate: ParserInterface): ParserInterface = { new PaimonSpark4SqlExtensionsParser(delegate) } + + override def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] = { + Spark4ResolutionRules(spark) + } + override def createSparkInternalRow(rowType: RowType): SparkInternalRow = { new Spark4InternalRow(rowType) }