Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Add replace tag ddl syntax #4457

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,12 @@ CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' =
```

## Tag DDL
### Create Tag
Create a tag based on snapshot or retention.
### Create or replace Tag
Create or replace a tag syntax with the following options.
- Create a tag with or without the snapshot id and time retention.
- Create an existed tag is not failed if using `IF NOT EXISTS` syntax.
- Update a tag using `REPLACE TAG` or `CREATE OR REPLACE TAG` syntax.

```sql
-- create a tag based on the latest snapshot and no retention.
ALTER TABLE T CREATE TAG `TAG-1`;
Expand All @@ -228,6 +232,12 @@ ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1;

-- create a tag based on snapshot-2 and retain it for 12 hour.
ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS;

-- replace a existed tag with new snapshot id and new retention
ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24 HOURS;

-- create or replace a tag, create tag if it not exist, replace tag if it exists.
ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS;
```

### Delete Tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ singleStatement
statement
: CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| SHOW TAGS multipartIdentifier #showTags
| ALTER TABLE multipartIdentifier CREATE TAG (IF NOT EXISTS)? identifier tagOptions #createTag
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag
;
Expand All @@ -81,6 +81,11 @@ callArgument
| identifier '=>' expression #namedArgument
;

createReplaceTagClause
: CREATE TAG (IF NOT EXISTS)? identifier tagOptions
| (CREATE OR)? REPLACE TAG identifier tagOptions
;

tagOptions
: (AS OF VERSION snapshotId)? (timeRetain)?
;
Expand Down Expand Up @@ -146,8 +151,8 @@ quotedIdentifier
;

nonReserved
: ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | TABLE
| RETAIN | VERSION | TAG
: ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | OR | TABLE
| REPLACE | RETAIN | VERSION | TAG
| TRUE | FALSE
| MAP
;
Expand All @@ -164,7 +169,9 @@ IF : 'IF';
MINUTES: 'MINUTES';
NOT: 'NOT';
OF: 'OF';
OR: 'OR';
RENAME: 'RENAME';
REPLACE: 'REPLACE';
RETAIN: 'RETAIN';
SHOW: 'SHOW';
TABLE: 'TABLE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand

import org.apache.spark.sql.catalyst.expressions.Attribute

case class CreateTagCommand(
case class CreateOrReplaceTagCommand(
table: Seq[String],
tagName: String,
tagOptions: TagOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean)
extends PaimonLeafCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}

case class CreateTagExec(
case class CreateOrReplaceTagExec(
catalog: TableCatalog,
ident: Identifier,
tagName: String,
tagOptions: TagOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean)
extends PaimonLeafV2CommandExec {

Expand All @@ -42,14 +44,27 @@ case class CreateTagExec(
table.asInstanceOf[SparkTable].getTable match {
case paimonTable: FileStoreTable =>
val tagIsExists = paimonTable.tagManager().tagExists(tagName)
if (tagIsExists && ifNotExists) {
return Nil
}
val timeRetained = tagOptions.timeRetained.orNull
if (tagOptions.snapshotId.isEmpty) {
paimonTable.createTag(tagName, timeRetained)
val snapshotId = tagOptions.snapshotId

if (create && replace && !tagIsExists) {
if (snapshotId.isEmpty) {
paimonTable.createTag(tagName, timeRetained)
} else {
paimonTable.createTag(tagName, snapshotId.get, timeRetained)
}
} else if (replace) {
paimonTable.replaceTag(tagName, snapshotId.get, timeRetained)
} else {
paimonTable.createTag(tagName, tagOptions.snapshotId.get, timeRetained)
if (tagIsExists && ifNotExists) {
return Nil
}

if (snapshotId.isEmpty) {
paimonTable.createTag(tagName, timeRetained)
} else {
paimonTable.createTag(tagName, snapshotId.get, timeRetained)
}
}
case t =>
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark.execution

import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -50,12 +50,14 @@ case class PaimonStrategy(spark: SparkSession)
case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) =>
ShowTagsExec(catalog, ident, t.output) :: Nil

case CreateTagCommand(
case CreateOrReplaceTagCommand(
PaimonCatalogAndIdentifier(table, ident),
tagName,
tagOptions,
create,
replace,
ifNotExists) =>
CreateTagExec(table, ident, tagName, tagOptions, ifNotExists) :: Nil
CreateOrReplaceTagExec(table, ident, tagName, tagOptions, create, replace, ifNotExists) :: Nil

case DeleteTagCommand(PaimonCatalogAndIdentifier(catalog, ident), tagStr, ifExists) =>
DeleteTagExec(catalog, ident, tagStr, ifExists) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface)
}

private def isTagRefDdl(normalized: String): Boolean = {
normalized.startsWith("show tags") || (normalized
.startsWith("alter table") && (normalized.contains("create tag") ||
normalized.contains("rename tag") ||
normalized.contains("delete tag")))
normalized.startsWith("show tags") ||
(normalized.startsWith("alter table") &&
(normalized.contains("create tag") ||
normalized.contains("replace tag") ||
normalized.contains("rename tag") ||
normalized.contains("delete tag")))
}

protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.spark.sql.catalyst.parser.extensions

import org.apache.paimon.spark.catalyst.plans.logical
import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
import org.apache.paimon.utils.TimeUtils

import org.antlr.v4.runtime._
Expand Down Expand Up @@ -98,31 +98,44 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface)
ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier))
}

/** Create a CREATE TAG logical command. */
override def visitCreateTag(ctx: CreateTagContext): CreateTagCommand = withOrigin(ctx) {
val tagName = ctx.identifier().getText
val tagOptionsContext = Option(ctx.tagOptions())
val snapshotId =
tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId())).map(_.getText.toLong)
val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.timeRetain()))
val timeRetained = if (timeRetainCtx.nonEmpty) {
val (number, timeUnit) =
timeRetainCtx.map(retain => (retain.number().getText.toLong, retain.timeUnit().getText)).get
Option(TimeUtils.parseDuration(number, timeUnit))
} else {
None
/** Create a CREATE OR REPLACE TAG logical command. */
override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTagCommand =
withOrigin(ctx) {
val createTagClause = ctx.createReplaceTagClause()

val tagName = createTagClause.identifier().getText
val tagOptionsContext = Option(createTagClause.tagOptions())
val snapshotId =
tagOptionsContext
.flatMap(tagOptions => Option(tagOptions.snapshotId()))
.map(_.getText.toLong)
val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.timeRetain()))
val timeRetained = if (timeRetainCtx.nonEmpty) {
val (number, timeUnit) =
timeRetainCtx
.map(retain => (retain.number().getText.toLong, retain.timeUnit().getText))
.get
Option(TimeUtils.parseDuration(number, timeUnit))
} else {
None
}
val tagOptions = TagOptions(
snapshotId,
timeRetained
)

val create = createTagClause.CREATE() != null
val replace = createTagClause.REPLACE() != null
val ifNotExists = createTagClause.EXISTS() != null

CreateOrReplaceTagCommand(
typedVisit[Seq[String]](ctx.multipartIdentifier),
tagName,
tagOptions,
create,
replace,
ifNotExists)
}
val tagOptions = TagOptions(
snapshotId,
timeRetained
)
val ifNotExists = ctx.EXISTS() != null
CreateTagCommand(
typedVisit[Seq[String]](ctx.multipartIdentifier),
tagName,
tagOptions,
ifNotExists)
}

/** Create a DELETE TAG logical command. */
override def visitDeleteTag(ctx: DeleteTagContext): DeleteTagCommand = withOrigin(ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,35 @@ abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase {
Row("tag-1", 3, null))
}

test("Tag ddl: alter table t create or replace tag syntax") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')""".stripMargin)

spark.sql("insert into T values(1, 'a')")
spark.sql("insert into T values(2, 'b')")
assertResult(2)(loadTable("T").snapshotManager().snapshotCount())

// test 'replace' syntax
spark.sql("alter table T create tag `tag-1` as of version 1")
spark.sql("alter table T replace tag `tag-1` as of version 2 RETAIN 1 HOURS")
checkAnswer(
spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
Row("tag-1", 2, "PT1H") :: Nil)

// test 'create or replace' syntax
// tag-2 not exist, create it
spark.sql("alter table T create or replace tag `tag-2` as of version 1")
checkAnswer(
spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where tag_name = 'tag-2'"),
Row("tag-2", 1, null) :: Nil)
// tag-2 exists, replace it
spark.sql("alter table T create or replace tag `tag-2` as of version 2 RETAIN 1 HOURS")
checkAnswer(
spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where tag_name = 'tag-2'"),
Row("tag-2", 2, "PT1H") :: Nil)
}

test("Tag ddl: alter table t delete tag syntax") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
Expand Down
Loading