From 3d6275653ffea606ede804c00869d0c9347f7b2b Mon Sep 17 00:00:00 2001 From: askwang <135721692+askwang@users.noreply.github.com> Date: Mon, 14 Oct 2024 20:47:35 +0800 Subject: [PATCH] [spark] Add show tags sql syntax (#4316) --- docs/content/spark/sql-ddl.md | 6 +++ .../paimon/spark/sql/ShowTagsTest.scala | 21 ++++++++ .../paimon/spark/sql/ShowTagsTest.scala | 21 ++++++++ .../paimon/spark/sql/ShowTagsTest.scala | 21 ++++++++ .../paimon/spark/sql/ShowTagsTest.scala | 21 ++++++++ .../PaimonSqlExtensions.g4 | 3 ++ .../plans/logical/ShowTagsCommand.scala | 34 ++++++++++++ .../spark/execution/PaimonStrategy.scala | 28 +++++++++- .../paimon/spark/execution/ShowTagsExec.scala | 52 +++++++++++++++++++ .../PaimonSparkSqlExtensionsParser.scala | 7 +-- .../PaimonSqlExtensionsAstBuilder.scala | 6 ++- .../spark/sql/PaimonShowTagsTestBase.scala | 43 +++++++++++++++ 12 files changed, 257 insertions(+), 6 deletions(-) create mode 100644 paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala create mode 100644 paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala create mode 100644 paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala create mode 100644 paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/ShowTagsCommand.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/ShowTagsExec.scala create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowTagsTestBase.scala diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index fa82b467d5c7..35b2e53514c1 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -209,3 +209,9 @@ CREATE TABLE my_table_all ( ); CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all; ``` + +## Show Tags +The SHOW TAGS statement is used to list all tags of a table. +``` +SHOW TAGS my_table; +``` diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala new file mode 100644 index 000000000000..36b42e6b677e --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class ShowTagsTest extends PaimonShowTagsTestBase {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala new file mode 100644 index 000000000000..36b42e6b677e --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class ShowTagsTest extends PaimonShowTagsTestBase {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala new file mode 100644 index 000000000000..36b42e6b677e --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class ShowTagsTest extends PaimonShowTagsTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala new file mode 100644 index 000000000000..36b42e6b677e --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class ShowTagsTest extends PaimonShowTagsTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index 54e71b362fc3..d29aaa423899 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -70,6 +70,7 @@ singleStatement statement : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call + | SHOW TAGS multipartIdentifier #showTags ; callArgument @@ -130,6 +131,8 @@ nonReserved ; CALL: 'CALL'; +SHOW: 'SHOW'; +TAGS: 'TAGS'; TRUE: 'TRUE'; FALSE: 'FALSE'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/ShowTagsCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/ShowTagsCommand.scala new file mode 100644 index 000000000000..f5b62d333861 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/ShowTagsCommand.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType + +case class ShowTagsCommand(table: Seq[String]) extends PaimonLeafCommand { + + override def output: Seq[Attribute] = + Seq(AttributeReference("tag", StringType, nullable = false)()) + + override def simpleString(maxFields: Int): String = { + s"Show Tags for table: $table" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index c6c6fc8759c0..21b809d1e21b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -18,16 +18,25 @@ package org.apache.paimon.spark.execution -import org.apache.paimon.spark.catalyst.plans.logical.PaimonCallCommand +import org.apache.paimon.spark.{SparkCatalog, SparkUtils} +import org.apache.paimon.spark.catalyst.plans.logical.{PaimonCallCommand, ShowTagsCommand} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, PredicateHelper} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan} +import org.apache.spark.sql.connector.catalog.{Identifier, PaimonLookupCatalog, TableCatalog} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy -case class PaimonStrategy(spark: SparkSession) extends Strategy with PredicateHelper { +import scala.collection.JavaConverters._ + +case class PaimonStrategy(spark: SparkSession) + extends Strategy + with PredicateHelper + with PaimonLookupCatalog { + + protected lazy val catalogManager = spark.sessionState.catalogManager override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { @@ -37,6 +46,9 @@ case class PaimonStrategy(spark: SparkSession) extends Strategy with PredicateHe case c @ PaimonCallCommand(procedure, args) => val input = buildInternalRow(args) PaimonCallExec(c.output, procedure, input) :: Nil + + case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) => + ShowTagsExec(catalog, ident, t.output) :: Nil case _ => Nil } @@ -48,4 +60,16 @@ case class PaimonStrategy(spark: SparkSession) extends Strategy with PredicateHe new GenericInternalRow(values) } + private object PaimonCatalogAndIdentifier { + def unapply(identifier: Seq[String]): Option[(TableCatalog, Identifier)] = { + val catalogAndIdentifier = + SparkUtils.catalogAndIdentifier(spark, identifier.asJava, catalogManager.currentCatalog) + catalogAndIdentifier.catalog match { + case paimonCatalog: SparkCatalog => + Some((paimonCatalog, catalogAndIdentifier.identifier())) + case _ => + None + } + } + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/ShowTagsExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/ShowTagsExec.scala new file mode 100644 index 000000000000..8f8d2b4665a5 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/ShowTagsExec.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +case class ShowTagsExec(catalog: TableCatalog, ident: Identifier, out: Seq[Attribute]) + extends PaimonLeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + val table = catalog.loadTable(ident) + assert(table.isInstanceOf[SparkTable]) + + var tags: Seq[InternalRow] = Nil + table.asInstanceOf[SparkTable].getTable match { + case paimonTable: FileStoreTable => + val tagNames = paimonTable.tagManager().allTagNames() + tags = tagNames.asScala.toList.sorted.map(t => InternalRow(UTF8String.fromString(t))) + case t => + throw new UnsupportedOperationException( + s"Can not show tags for non-paimon FileStoreTable: $t") + } + tags + } + + override def output: Seq[Attribute] = out +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala index 26a351bc673a..47991228acdf 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala @@ -57,7 +57,7 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface) /** Parses a string to a LogicalPlan. */ override def parsePlan(sqlText: String): LogicalPlan = { val sqlTextAfterSubstitution = substitutor.substitute(sqlText) - if (isCommand(sqlTextAfterSubstitution)) { + if (isPaimonCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement())) .asInstanceOf[LogicalPlan] } else { @@ -93,7 +93,7 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface) delegate.parseMultipartIdentifier(sqlText) /** Returns whether SQL text is command. */ - private def isCommand(sqlText: String): Boolean = { + private def isPaimonCommand(sqlText: String): Boolean = { val normalized = sqlText .toLowerCase(Locale.ROOT) .trim() @@ -101,7 +101,8 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface) .replaceAll("\\s+", " ") .replaceAll("/\\*.*?\\*/", " ") .trim() - normalized.startsWith("call") + normalized.startsWith("call") || + normalized.contains("show tags") } protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index 3424129e957b..1fe92a1b38bf 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.parser.extensions import org.apache.paimon.spark.catalyst.plans.logical -import org.apache.paimon.spark.catalyst.plans.logical.{PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument} +import org.apache.paimon.spark.catalyst.plans.logical.{PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, ShowTagsCommand} import org.antlr.v4.runtime._ import org.antlr.v4.runtime.misc.Interval @@ -92,6 +92,10 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) ctx.parts.asScala.map(_.getText).toSeq } + override def visitShowTags(ctx: ShowTagsContext): AnyRef = withOrigin(ctx) { + ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier)) + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala private def toSeq[T](list: java.util.List[T]) = toBuffer(list) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowTagsTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowTagsTestBase.scala new file mode 100644 index 000000000000..4d8f0b9b4b82 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowTagsTestBase.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +abstract class PaimonShowTagsTestBase extends PaimonSparkTestBase { + + test("Paimon DDL: show tags for table") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id')""".stripMargin) + + spark.sql("insert into T values(1, 'a')") + spark.sql("insert into T values(2, 'b')") + + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-12')") + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-11')") + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-13')") + + checkAnswer( + spark.sql("show tags T"), + Row("2024-10-11") :: Row("2024-10-12") :: Row("2024-10-13") :: Nil) + } +}