Skip to content

Commit

Permalink
[spark] Add show tags sql syntax (#4316)
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang authored Oct 14, 2024
1 parent eaa4a24 commit 3d62756
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 6 deletions.
6 changes: 6 additions & 0 deletions docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,9 @@ CREATE TABLE my_table_all (
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;
```

## Show Tags
The SHOW TAGS statement is used to list all tags of a table.
```
SHOW TAGS my_table;
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class ShowTagsTest extends PaimonShowTagsTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class ShowTagsTest extends PaimonShowTagsTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class ShowTagsTest extends PaimonShowTagsTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class ShowTagsTest extends PaimonShowTagsTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ singleStatement

statement
: CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| SHOW TAGS multipartIdentifier #showTags
;

callArgument
Expand Down Expand Up @@ -130,6 +131,8 @@ nonReserved
;

CALL: 'CALL';
SHOW: 'SHOW';
TAGS: 'TAGS';

TRUE: 'TRUE';
FALSE: 'FALSE';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.plans.logical

import org.apache.paimon.spark.leafnode.PaimonLeafCommand

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType

case class ShowTagsCommand(table: Seq[String]) extends PaimonLeafCommand {

override def output: Seq[Attribute] =
Seq(AttributeReference("tag", StringType, nullable = false)())

override def simpleString(maxFields: Int): String = {
s"Show Tags for table: $table"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@

package org.apache.paimon.spark.execution

import org.apache.paimon.spark.catalyst.plans.logical.PaimonCallCommand
import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
import org.apache.paimon.spark.catalyst.plans.logical.{PaimonCallCommand, ShowTagsCommand}

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan}
import org.apache.spark.sql.connector.catalog.{Identifier, PaimonLookupCatalog, TableCatalog}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy

case class PaimonStrategy(spark: SparkSession) extends Strategy with PredicateHelper {
import scala.collection.JavaConverters._

case class PaimonStrategy(spark: SparkSession)
extends Strategy
with PredicateHelper
with PaimonLookupCatalog {

protected lazy val catalogManager = spark.sessionState.catalogManager

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

Expand All @@ -37,6 +46,9 @@ case class PaimonStrategy(spark: SparkSession) extends Strategy with PredicateHe
case c @ PaimonCallCommand(procedure, args) =>
val input = buildInternalRow(args)
PaimonCallExec(c.output, procedure, input) :: Nil

case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) =>
ShowTagsExec(catalog, ident, t.output) :: Nil
case _ => Nil
}

Expand All @@ -48,4 +60,16 @@ case class PaimonStrategy(spark: SparkSession) extends Strategy with PredicateHe
new GenericInternalRow(values)
}

private object PaimonCatalogAndIdentifier {
def unapply(identifier: Seq[String]): Option[(TableCatalog, Identifier)] = {
val catalogAndIdentifier =
SparkUtils.catalogAndIdentifier(spark, identifier.asJava, catalogManager.currentCatalog)
catalogAndIdentifier.catalog match {
case paimonCatalog: SparkCatalog =>
Some((paimonCatalog, catalogAndIdentifier.identifier()))
case _ =>
None
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.execution

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.JavaConverters._

case class ShowTagsExec(catalog: TableCatalog, ident: Identifier, out: Seq[Attribute])
extends PaimonLeafV2CommandExec {

override protected def run(): Seq[InternalRow] = {
val table = catalog.loadTable(ident)
assert(table.isInstanceOf[SparkTable])

var tags: Seq[InternalRow] = Nil
table.asInstanceOf[SparkTable].getTable match {
case paimonTable: FileStoreTable =>
val tagNames = paimonTable.tagManager().allTagNames()
tags = tagNames.asScala.toList.sorted.map(t => InternalRow(UTF8String.fromString(t)))
case t =>
throw new UnsupportedOperationException(
s"Can not show tags for non-paimon FileStoreTable: $t")
}
tags
}

override def output: Seq[Attribute] = out
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface)
/** Parses a string to a LogicalPlan. */
override def parsePlan(sqlText: String): LogicalPlan = {
val sqlTextAfterSubstitution = substitutor.substitute(sqlText)
if (isCommand(sqlTextAfterSubstitution)) {
if (isPaimonCommand(sqlTextAfterSubstitution)) {
parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement()))
.asInstanceOf[LogicalPlan]
} else {
Expand Down Expand Up @@ -93,15 +93,16 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface)
delegate.parseMultipartIdentifier(sqlText)

/** Returns whether SQL text is command. */
private def isCommand(sqlText: String): Boolean = {
private def isPaimonCommand(sqlText: String): Boolean = {
val normalized = sqlText
.toLowerCase(Locale.ROOT)
.trim()
.replaceAll("--.*?\\n", " ")
.replaceAll("\\s+", " ")
.replaceAll("/\\*.*?\\*/", " ")
.trim()
normalized.startsWith("call")
normalized.startsWith("call") ||
normalized.contains("show tags")
}

protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.spark.sql.catalyst.parser.extensions

import org.apache.paimon.spark.catalyst.plans.logical
import org.apache.paimon.spark.catalyst.plans.logical.{PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument}
import org.apache.paimon.spark.catalyst.plans.logical.{PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, ShowTagsCommand}

import org.antlr.v4.runtime._
import org.antlr.v4.runtime.misc.Interval
Expand Down Expand Up @@ -92,6 +92,10 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface)
ctx.parts.asScala.map(_.getText).toSeq
}

override def visitShowTags(ctx: ShowTagsContext): AnyRef = withOrigin(ctx) {
ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier))
}

private def toBuffer[T](list: java.util.List[T]) = list.asScala

private def toSeq[T](list: java.util.List[T]) = toBuffer(list)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.Row

abstract class PaimonShowTagsTestBase extends PaimonSparkTestBase {

test("Paimon DDL: show tags for table") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')""".stripMargin)

spark.sql("insert into T values(1, 'a')")
spark.sql("insert into T values(2, 'b')")

spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-12')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-11')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-13')")

checkAnswer(
spark.sql("show tags T"),
Row("2024-10-11") :: Row("2024-10-12") :: Row("2024-10-13") :: Nil)
}
}

0 comments on commit 3d62756

Please sign in to comment.