Skip to content

Commit

Permalink
[spark] Add replace tag ddl syntax (#4457)
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang authored Nov 5, 2024
1 parent c7170e6 commit 29c1347
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 45 deletions.
14 changes: 12 additions & 2 deletions docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,12 @@ CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' =
```

## Tag DDL
### Create Tag
Create a tag based on snapshot or retention.
### Create or replace Tag
Create or replace a tag syntax with the following options.
- Create a tag with or without the snapshot id and time retention.
- Create an existed tag is not failed if using `IF NOT EXISTS` syntax.
- Update a tag using `REPLACE TAG` or `CREATE OR REPLACE TAG` syntax.

```sql
-- create a tag based on the latest snapshot and no retention.
ALTER TABLE T CREATE TAG `TAG-1`;
Expand All @@ -228,6 +232,12 @@ ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1;

-- create a tag based on snapshot-2 and retain it for 12 hour.
ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS;

-- replace a existed tag with new snapshot id and new retention
ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24 HOURS;

-- create or replace a tag, create tag if it not exist, replace tag if it exists.
ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS;
```

### Delete Tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ singleStatement
statement
: CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| SHOW TAGS multipartIdentifier #showTags
| ALTER TABLE multipartIdentifier CREATE TAG (IF NOT EXISTS)? identifier tagOptions #createTag
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag
;
Expand All @@ -81,6 +81,11 @@ callArgument
| identifier '=>' expression #namedArgument
;

createReplaceTagClause
: CREATE TAG (IF NOT EXISTS)? identifier tagOptions
| (CREATE OR)? REPLACE TAG identifier tagOptions
;

tagOptions
: (AS OF VERSION snapshotId)? (timeRetain)?
;
Expand Down Expand Up @@ -146,8 +151,8 @@ quotedIdentifier
;

nonReserved
: ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | TABLE
| RETAIN | VERSION | TAG
: ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | OR | TABLE
| REPLACE | RETAIN | VERSION | TAG
| TRUE | FALSE
| MAP
;
Expand All @@ -164,7 +169,9 @@ IF : 'IF';
MINUTES: 'MINUTES';
NOT: 'NOT';
OF: 'OF';
OR: 'OR';
RENAME: 'RENAME';
REPLACE: 'REPLACE';
RETAIN: 'RETAIN';
SHOW: 'SHOW';
TABLE: 'TABLE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand

import org.apache.spark.sql.catalyst.expressions.Attribute

case class CreateTagCommand(
case class CreateOrReplaceTagCommand(
table: Seq[String],
tagName: String,
tagOptions: TagOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean)
extends PaimonLeafCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}

case class CreateTagExec(
case class CreateOrReplaceTagExec(
catalog: TableCatalog,
ident: Identifier,
tagName: String,
tagOptions: TagOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean)
extends PaimonLeafV2CommandExec {

Expand All @@ -42,14 +44,27 @@ case class CreateTagExec(
table.asInstanceOf[SparkTable].getTable match {
case paimonTable: FileStoreTable =>
val tagIsExists = paimonTable.tagManager().tagExists(tagName)
if (tagIsExists && ifNotExists) {
return Nil
}
val timeRetained = tagOptions.timeRetained.orNull
if (tagOptions.snapshotId.isEmpty) {
paimonTable.createTag(tagName, timeRetained)
val snapshotId = tagOptions.snapshotId

if (create && replace && !tagIsExists) {
if (snapshotId.isEmpty) {
paimonTable.createTag(tagName, timeRetained)
} else {
paimonTable.createTag(tagName, snapshotId.get, timeRetained)
}
} else if (replace) {
paimonTable.replaceTag(tagName, snapshotId.get, timeRetained)
} else {
paimonTable.createTag(tagName, tagOptions.snapshotId.get, timeRetained)
if (tagIsExists && ifNotExists) {
return Nil
}

if (snapshotId.isEmpty) {
paimonTable.createTag(tagName, timeRetained)
} else {
paimonTable.createTag(tagName, snapshotId.get, timeRetained)
}
}
case t =>
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark.execution

import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -50,12 +50,14 @@ case class PaimonStrategy(spark: SparkSession)
case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) =>
ShowTagsExec(catalog, ident, t.output) :: Nil

case CreateTagCommand(
case CreateOrReplaceTagCommand(
PaimonCatalogAndIdentifier(table, ident),
tagName,
tagOptions,
create,
replace,
ifNotExists) =>
CreateTagExec(table, ident, tagName, tagOptions, ifNotExists) :: Nil
CreateOrReplaceTagExec(table, ident, tagName, tagOptions, create, replace, ifNotExists) :: Nil

case DeleteTagCommand(PaimonCatalogAndIdentifier(catalog, ident), tagStr, ifExists) =>
DeleteTagExec(catalog, ident, tagStr, ifExists) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface)
}

private def isTagRefDdl(normalized: String): Boolean = {
normalized.startsWith("show tags") || (normalized
.startsWith("alter table") && (normalized.contains("create tag") ||
normalized.contains("rename tag") ||
normalized.contains("delete tag")))
normalized.startsWith("show tags") ||
(normalized.startsWith("alter table") &&
(normalized.contains("create tag") ||
normalized.contains("replace tag") ||
normalized.contains("rename tag") ||
normalized.contains("delete tag")))
}

protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.spark.sql.catalyst.parser.extensions

import org.apache.paimon.spark.catalyst.plans.logical
import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
import org.apache.paimon.utils.TimeUtils

import org.antlr.v4.runtime._
Expand Down Expand Up @@ -98,31 +98,44 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface)
ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier))
}

/** Create a CREATE TAG logical command. */
override def visitCreateTag(ctx: CreateTagContext): CreateTagCommand = withOrigin(ctx) {
val tagName = ctx.identifier().getText
val tagOptionsContext = Option(ctx.tagOptions())
val snapshotId =
tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId())).map(_.getText.toLong)
val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.timeRetain()))
val timeRetained = if (timeRetainCtx.nonEmpty) {
val (number, timeUnit) =
timeRetainCtx.map(retain => (retain.number().getText.toLong, retain.timeUnit().getText)).get
Option(TimeUtils.parseDuration(number, timeUnit))
} else {
None
/** Create a CREATE OR REPLACE TAG logical command. */
override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTagCommand =
withOrigin(ctx) {
val createTagClause = ctx.createReplaceTagClause()

val tagName = createTagClause.identifier().getText
val tagOptionsContext = Option(createTagClause.tagOptions())
val snapshotId =
tagOptionsContext
.flatMap(tagOptions => Option(tagOptions.snapshotId()))
.map(_.getText.toLong)
val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.timeRetain()))
val timeRetained = if (timeRetainCtx.nonEmpty) {
val (number, timeUnit) =
timeRetainCtx
.map(retain => (retain.number().getText.toLong, retain.timeUnit().getText))
.get
Option(TimeUtils.parseDuration(number, timeUnit))
} else {
None
}
val tagOptions = TagOptions(
snapshotId,
timeRetained
)

val create = createTagClause.CREATE() != null
val replace = createTagClause.REPLACE() != null
val ifNotExists = createTagClause.EXISTS() != null

CreateOrReplaceTagCommand(
typedVisit[Seq[String]](ctx.multipartIdentifier),
tagName,
tagOptions,
create,
replace,
ifNotExists)
}
val tagOptions = TagOptions(
snapshotId,
timeRetained
)
val ifNotExists = ctx.EXISTS() != null
CreateTagCommand(
typedVisit[Seq[String]](ctx.multipartIdentifier),
tagName,
tagOptions,
ifNotExists)
}

/** Create a DELETE TAG logical command. */
override def visitDeleteTag(ctx: DeleteTagContext): DeleteTagCommand = withOrigin(ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,35 @@ abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase {
Row("tag-1", 3, null))
}

test("Tag ddl: alter table t create or replace tag syntax") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')""".stripMargin)

spark.sql("insert into T values(1, 'a')")
spark.sql("insert into T values(2, 'b')")
assertResult(2)(loadTable("T").snapshotManager().snapshotCount())

// test 'replace' syntax
spark.sql("alter table T create tag `tag-1` as of version 1")
spark.sql("alter table T replace tag `tag-1` as of version 2 RETAIN 1 HOURS")
checkAnswer(
spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
Row("tag-1", 2, "PT1H") :: Nil)

// test 'create or replace' syntax
// tag-2 not exist, create it
spark.sql("alter table T create or replace tag `tag-2` as of version 1")
checkAnswer(
spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where tag_name = 'tag-2'"),
Row("tag-2", 1, null) :: Nil)
// tag-2 exists, replace it
spark.sql("alter table T create or replace tag `tag-2` as of version 2 RETAIN 1 HOURS")
checkAnswer(
spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where tag_name = 'tag-2'"),
Row("tag-2", 2, "PT1H") :: Nil)
}

test("Tag ddl: alter table t delete tag syntax") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
Expand Down

0 comments on commit 29c1347

Please sign in to comment.