Skip to content

Commit

Permalink
tag ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Oct 15, 2024
1 parent 414cd52 commit ec3c207
Show file tree
Hide file tree
Showing 18 changed files with 560 additions and 20 deletions.
46 changes: 43 additions & 3 deletions docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,48 @@ CREATE TABLE my_table_all (
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;
```

## Show Tags
The SHOW TAGS statement is used to list all tags of a table.
## Tag DDL
### Create Tag
Create a tag based on snapshot or retention.
```sql
-- create a tag based on lastest snapshot and no retention.
ALTER TABLE T CREATE TAG `TAG-1`;

-- create a tag based on lastest snapshot and no retention if it doesn't exist.
ALTER TABLE T CREATE TAG IF NOT EXISTS `TAG-1`;

-- create a tag based on lastest snapshot and retain it for 7 day.
ALTER TABLE T CREATE TAG `TAG-2` RETAIN 7 DAYS;

-- create a tag based on snapshot 1 and no retention.
ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1;

-- create a tag based on snapshot 2 and retain it for 12 hour.
ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS;
```

### Delete Tag
Delete a tag or multiple tags of a table.
```sql
-- delete a tag.
ALTER TABLE T DELETE TAG `TAG-1`;

-- delete a tag if it exists.
ALTER TABLE T DELETE TAG IF EXISTS `TAG-1`

-- delete multiple tags, delimiter is ','.
ALTER TABLE T DELETE TAG `TAG-1,TAG-2`;
```

### Rename Tag
Rename an existing tag with a new tag name.
```sql
ALTER TABLE T RENAME TAG `TAG-1` TO `TAG-2`;
```
SHOW TAGS my_table;

### Show Tags
List all tags of a table.
```sql
SHOW TAGS T;
```

20 changes: 20 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/TimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ public static Duration parseDuration(String text) {
}
}

/**
* Parse the given number and unitLabel to a java {@link Duration}. The usage is in format
* "(digital number, time unit label)", e.g. "(1, DAYS)".
*
* @param number a digital number
* @param unitLabel time unit label
*/
public static Duration parseDuration(Long number, String unitLabel) {
ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel.toLowerCase(Locale.US));
if (unit != null) {
return Duration.of(number, unit);
} else {
throw new IllegalArgumentException(
"Time interval unit label '"
+ unitLabel
+ "' does not match any of the recognized units: "
+ TimeUnit.getAllUnits());
}
}

private static Map<String, ChronoUnit> initMap() {
Map<String, ChronoUnit> labelToUnit = new HashMap<>();
for (TimeUnit timeUnit : TimeUnit.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@

package org.apache.paimon.spark.sql

class ShowTagsTest extends PaimonShowTagsTestBase {}
class TagDdlTest extends PaimonTagDdlTestBase
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@

package org.apache.paimon.spark.sql

class ShowTagsTest extends PaimonShowTagsTestBase {}
class TagDdlTest extends PaimonTagDdlTestBase
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@

package org.apache.paimon.spark.sql

class ShowTagsTest extends PaimonShowTagsTestBase {}
class TagDdlTest extends PaimonTagDdlTestBase
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@

package org.apache.paimon.spark.sql

class ShowTagsTest extends PaimonShowTagsTestBase {}
class TagDdlTest extends PaimonTagDdlTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,34 @@ singleStatement
statement
: CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| SHOW TAGS multipartIdentifier #showTags
;
| ALTER TABLE multipartIdentifier CREATE TAG (IF NOT EXISTS)? identifier tagOptions #createTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag
;

callArgument
: expression #positionalArgument
| identifier '=>' expression #namedArgument
;

tagOptions
: (AS OF VERSION snapshotId)? (timeRetain)?
;

snapshotId
: number
;

timeRetain
: RETAIN number timeUnit
;

timeUnit
: DAYS
| HOURS
| MINUTES
;

expression
: constant
| stringMap
Expand Down Expand Up @@ -125,14 +146,32 @@ quotedIdentifier
;

nonReserved
: CALL
: ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | TABLE
| RETAIN | VERSION | TAG
| TRUE | FALSE
| MAP
;

ALTER: 'ALTER';
AS: 'AS';
CALL: 'CALL';
CREATE: 'CREATE';
DAYS: 'DAYS';
DELETE: 'DELETE';
EXISTS: 'EXISTS';
HOURS: 'HOURS';
IF : 'IF';
MINUTES: 'MINUTES';
NOT: 'NOT';
OF: 'OF';
RENAME: 'RENAME';
RETAIN: 'RETAIN';
SHOW: 'SHOW';
TABLE: 'TABLE';
TAG: 'TAG';
TAGS: 'TAGS';
TO: 'TO';
VERSION: 'VERSION';

TRUE: 'TRUE';
FALSE: 'FALSE';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.plans.logical

import org.apache.paimon.spark.leafnode.PaimonLeafCommand

import org.apache.spark.sql.catalyst.expressions.Attribute

case class CreateTagCommand(
table: Seq[String],
tagName: String,
tagOptions: TagOptions,
ifNotExists: Boolean)
extends PaimonLeafCommand {

override def output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"Create tag: $tagName for table: $table"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.plans.logical

import org.apache.paimon.spark.leafnode.PaimonLeafCommand

import org.apache.spark.sql.catalyst.expressions.Attribute

case class DeleteTagCommand(table: Seq[String], tagStr: String, ifExists: Boolean)
extends PaimonLeafCommand {

override def output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"Delete tag: $tagStr for table: $table"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.plans.logical

import org.apache.paimon.spark.leafnode.PaimonLeafCommand

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}

case class RenameTagCommand(table: Seq[String], sourceTag: String, targetTag: String)
extends PaimonLeafCommand {

override def output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"Rename tag from $sourceTag to $targetTag for table: $table"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.plans.logical

import java.time.Duration

case class TagOptions(snapshotId: Option[Long], timeRetained: Option[Duration])
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.execution

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.plans.logical.TagOptions
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}

case class CreateTagExec(
catalog: TableCatalog,
ident: Identifier,
tagName: String,
tagOptions: TagOptions,
ifNotExists: Boolean)
extends PaimonLeafV2CommandExec {

override protected def run(): Seq[InternalRow] = {
val table = catalog.loadTable(ident)
assert(table.isInstanceOf[SparkTable])

table.asInstanceOf[SparkTable].getTable match {
case paimonTable: FileStoreTable =>
val tagIsExists = paimonTable.tagManager().tagExists(tagName)
if (tagIsExists && ifNotExists) {
return Nil
}
val timeRetained = tagOptions.timeRetained.orNull
if (tagOptions.snapshotId.isEmpty) {
paimonTable.createTag(tagName, timeRetained)
} else {
paimonTable.createTag(tagName, tagOptions.snapshotId.get, timeRetained)
}
case t =>
throw new UnsupportedOperationException(
s"Can not create tag for non-paimon FileStoreTable: $t")
}
Nil
}

override def output: Seq[Attribute] = Nil
}
Loading

0 comments on commit ec3c207

Please sign in to comment.