From 98462120f345cd30f8f308744bfa99d2c0718070 Mon Sep 17 00:00:00 2001 From: melin Date: Fri, 30 Aug 2024 13:18:50 +0800 Subject: [PATCH] support iceberg sql extensions --- pom.xml | 2 +- .../melin/superior/common/AlterActionType.kt | 12 +++ .../relational/alter/IcebergAlterActions.kt | 27 ++++++ .../parser/spark/antlr4/SparkSqlLexer.g4 | 12 +++ .../parser/spark/antlr4/SparkSqlParser.g4 | 97 +++++++++++++++++++ .../parser/spark/SparkSqlAntlr4Visitor.kt | 63 ++++++++++++ .../parser/spark/IcebergSqlExtensionsTest.kt | 64 ++++++++++++ 7 files changed, 276 insertions(+), 1 deletion(-) create mode 100644 superior-common-parser/src/main/kotlin/io/github/melin/superior/common/relational/alter/IcebergAlterActions.kt create mode 100644 superior-spark-parser/src/test/kotlin/io/github/melin/superior/parser/spark/IcebergSqlExtensionsTest.kt diff --git a/pom.xml b/pom.xml index 62bf01d7..5a5c6212 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ com.google.guava guava - 33.1.0-jre + 33.2.0-jre provided diff --git a/superior-common-parser/src/main/kotlin/io/github/melin/superior/common/AlterActionType.kt b/superior-common-parser/src/main/kotlin/io/github/melin/superior/common/AlterActionType.kt index e6077a71..e419cf36 100644 --- a/superior-common-parser/src/main/kotlin/io/github/melin/superior/common/AlterActionType.kt +++ b/superior-common-parser/src/main/kotlin/io/github/melin/superior/common/AlterActionType.kt @@ -25,5 +25,17 @@ enum class AlterActionType : Serializable { ATTACH_PARTITION, TRUNCATE_PARTITION, REFRESH_MV, + + // Iceberg SQL Extensions + CREATE_TAG, + CREATE_BRANCH, + DROP_TAG, + DROP_BRANCH, + ADD_PARTITION_FIELD, + DROP_PARTITION_FIELD, + REPLACE_PARTITION_FIELD, + SET_WRITE_DISTRIBUTION_AND_ORDERING, + SET_IDENTIFIER_FIELDS, + DROP_IDENTIFIER_FIELDS, UNKOWN } diff --git a/superior-common-parser/src/main/kotlin/io/github/melin/superior/common/relational/alter/IcebergAlterActions.kt b/superior-common-parser/src/main/kotlin/io/github/melin/superior/common/relational/alter/IcebergAlterActions.kt new file mode 100644 index 00000000..41cd11ab --- /dev/null +++ b/superior-common-parser/src/main/kotlin/io/github/melin/superior/common/relational/alter/IcebergAlterActions.kt @@ -0,0 +1,27 @@ +package io.github.melin.superior.common.relational.alter + +import io.github.melin.superior.common.AlterActionType + +data class AlterCreateTagAction(val tagName: String) : AlterAction() { + override var alterType: AlterActionType = AlterActionType.CREATE_TAG +} + +data class AlterDropTagAction(val tagName: String) : AlterAction() { + override var alterType: AlterActionType = AlterActionType.DROP_TAG +} + +data class AlterCreateBranchAction(val branchName: String) : AlterAction() { + override var alterType: AlterActionType = AlterActionType.CREATE_BRANCH +} + +data class AlterDropBranchAction(val branchName: String) : AlterAction() { + override var alterType: AlterActionType = AlterActionType.DROP_BRANCH +} + +data class AlterSetIdentifierFieldsAction(val fields: List) : AlterAction() { + override var alterType: AlterActionType = AlterActionType.SET_IDENTIFIER_FIELDS +} + +data class AlterDropIdentifierFieldsAction(val fields: List) : AlterAction() { + override var alterType: AlterActionType = AlterActionType.DROP_IDENTIFIER_FIELDS +} diff --git a/superior-spark-parser/src/main/antlr4/io/github/melin/superior/parser/spark/antlr4/SparkSqlLexer.g4 b/superior-spark-parser/src/main/antlr4/io/github/melin/superior/parser/spark/antlr4/SparkSqlLexer.g4 index b3c688d3..a0e1bb6b 100644 --- a/superior-spark-parser/src/main/antlr4/io/github/melin/superior/parser/spark/antlr4/SparkSqlLexer.g4 +++ b/superior-spark-parser/src/main/antlr4/io/github/melin/superior/parser/spark/antlr4/SparkSqlLexer.g4 @@ -111,6 +111,7 @@ BIGINT: 'BIGINT'; BINARY: 'BINARY'; BOOLEAN: 'BOOLEAN'; BOTH: 'BOTH'; +BRANCH: 'BRANCH'; BUCKET: 'BUCKET'; BUCKETS: 'BUCKETS'; BY: 'BY'; @@ -178,6 +179,7 @@ DIRECTORY: 'DIRECTORY'; DISTCP: 'DISTCP'; DISTINCT: 'DISTINCT'; DISTRIBUTE: 'DISTRIBUTE'; +DISTRIBUTED: 'DISTRIBUTED'; DIV: 'DIV'; DOUBLE: 'DOUBLE'; DROP: 'DROP'; @@ -196,6 +198,7 @@ EXTERNAL: 'EXTERNAL'; EXTRACT: 'EXTRACT'; FALSE: 'FALSE'; FETCH: 'FETCH'; +FIELD: 'FIELD'; FIELDS: 'FIELDS'; FILTER: 'FILTER'; FILE: 'FILE'; @@ -260,6 +263,7 @@ LOCK: 'LOCK'; LOCKS: 'LOCKS'; LOGICAL: 'LOGICAL'; LONG: 'LONG'; +LOCALLY: 'LOCALLY'; MACRO: 'MACRO'; MAP: 'MAP'; MATCHED: 'MATCHED'; @@ -293,6 +297,7 @@ OPTION: 'OPTION'; OPTIONS: 'OPTIONS'; OR: 'OR'; ORDER: 'ORDER'; +ORDERED: 'ORDERED'; OUT: 'OUT'; OUTER: 'OUTER'; OUTPUTFORMAT: 'OUTPUTFORMAT'; @@ -332,6 +337,8 @@ REPLACE: 'REPLACE'; RESET: 'RESET'; RESPECT: 'RESPECT'; RESTRICT: 'RESTRICT'; +RETAIN: 'RETAIN'; +RETENTION: 'RETENTION'; REVOKE: 'REVOKE'; RIGHT: 'RIGHT'; RLIKE: 'RLIKE' | 'REGEXP'; @@ -341,6 +348,8 @@ ROLLBACK: 'ROLLBACK'; ROLLUP: 'ROLLUP'; ROW: 'ROW'; ROWS: 'ROWS'; +SNAPSHOT: 'SNAPSHOT'; +SNAPSHOTS: 'SNAPSHOTS'; SECOND: 'SECOND'; SECONDS: 'SECONDS'; SCHEMA: 'SCHEMA'; @@ -379,6 +388,7 @@ SYSTEM_VERSION: 'SYSTEM_VERSION'; TABLE: 'TABLE'; TABLES: 'TABLES'; TABLESAMPLE: 'TABLESAMPLE'; +TAG: 'TAG'; TARGET: 'TARGET'; TBLPROPERTIES: 'TBLPROPERTIES'; TEMPORARY: 'TEMPORARY' | 'TEMP'; @@ -412,6 +422,7 @@ UNLOCK: 'UNLOCK'; UNPIVOT: 'UNPIVOT'; UNSET: 'UNSET'; UPDATE: 'UPDATE'; +UNORDERED: 'UNORDERED'; USE: 'USE'; USER: 'USER'; USING: 'USING'; @@ -428,6 +439,7 @@ WHERE: 'WHERE'; WINDOW: 'WINDOW'; WITH: 'WITH'; WITHIN: 'WITHIN'; +WRITE: 'WRITE'; YEAR: 'YEAR'; YEARS: 'YEARS'; ZONE: 'ZONE'; diff --git a/superior-spark-parser/src/main/antlr4/io/github/melin/superior/parser/spark/antlr4/SparkSqlParser.g4 b/superior-spark-parser/src/main/antlr4/io/github/melin/superior/parser/spark/antlr4/SparkSqlParser.g4 index 9ddc8b02..b66e6f6b 100644 --- a/superior-spark-parser/src/main/antlr4/io/github/melin/superior/parser/spark/antlr4/SparkSqlParser.g4 +++ b/superior-spark-parser/src/main/antlr4/io/github/melin/superior/parser/spark/antlr4/SparkSqlParser.g4 @@ -256,6 +256,18 @@ statement | SYNC dtType=(DATABASE|TABLE) FROM source=multipartIdentifier (SET OWNER principal=identifier)? #syncTableMeta + // iceberg sql extensions + | ALTER TABLE multipartIdentifier ADD PARTITION FIELD transform (AS name=identifier)? #addPartitionField + | ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField + | ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH transform (AS name=identifier)? #replacePartitionField + | ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering + | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields + | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields + | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch + | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag + | ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch + | ALTER TABLE multipartIdentifier DROP TAG (IF EXISTS)? identifier #dropTag + | unsupportedHiveNativeCommands .*? #failNativeCommand ; @@ -335,6 +347,78 @@ exportTableClauses (SINGLE single = booleanValue))* ; +createReplaceTagClause + : (CREATE OR)? REPLACE TAG identifier tagOptions + | CREATE TAG (IF NOT EXISTS)? identifier tagOptions + ; + +createReplaceBranchClause + : (CREATE OR)? REPLACE BRANCH identifier branchOptions + | CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions + ; + +tagOptions + : (AS OF VERSION snapshotId)? (refRetain)? + ; + +branchOptions + : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)? + ; + +snapshotId + : number + ; + +timeUnit + : DAYS + | HOURS + | MINUTES + ; + +snapshotRetention + : WITH SNAPSHOT RETENTION minSnapshotsToKeep + | WITH SNAPSHOT RETENTION maxSnapshotAge + | WITH SNAPSHOT RETENTION minSnapshotsToKeep maxSnapshotAge + ; + +refRetain + : RETAIN number timeUnit + ; + +maxSnapshotAge + : number timeUnit + ; + +minSnapshotsToKeep + : number SNAPSHOTS + ; + +writeSpec + : (writeDistributionSpec | writeOrderingSpec)* + ; + +writeDistributionSpec + : DISTRIBUTED BY PARTITION + ; + +writeOrderingSpec + : LOCALLY? ORDERED BY orderExpr + | UNORDERED + ; + +orderExpr + : fields+=orderField (',' fields+=orderField)* + | '(' fields+=orderField (',' fields+=orderField)* ')' + ; + +orderField + : transform direction=(ASC | DESC)? (NULLS nullOrder=(FIRST | LAST))? + ; + +fieldList + : fields+=multipartIdentifier (',' fields+=multipartIdentifier)* + ; + unsupportedHiveNativeCommands : kw1=CREATE kw2=ROLE | kw1=DROP kw2=ROLE @@ -1454,6 +1538,7 @@ ansiNonReserved | EXTENDED | EXTERNAL | EXTRACT + | FIELD | FIELDS | FILEFORMAT | FIRST @@ -1700,6 +1785,7 @@ nonReserved | BINARY_HEX | BOOLEAN | BOTH + | BRANCH | BUCKET | BUCKETS | BY @@ -1762,6 +1848,7 @@ nonReserved | DIRECTORY | DISTINCT | DISTRIBUTE + | DISTRIBUTED | DIV | DOUBLE | DROP @@ -1780,6 +1867,7 @@ nonReserved | FALSE | FETCH | FILTER + | FIELD | FIELDS | FILEFORMAT | FIRST @@ -1834,6 +1922,7 @@ nonReserved | LOCKS | LOGICAL | LONG + | LOCALLY | MACRO | MAP | MATCHED @@ -1864,6 +1953,7 @@ nonReserved | OPTIONS | OR | ORDER + | ORDERED | OUT | OUTER | OUTPUTFORMAT @@ -1902,6 +1992,8 @@ nonReserved | RESET | RESPECT | RESTRICT + | RETAIN + | RETENTION | REVOKE | RLIKE | ROLE @@ -1910,6 +2002,8 @@ nonReserved | ROLLUP | ROW | ROWS + | SNAPSHOT + | SNAPSHOTS | SCHEMA | SCHEMAS | SECOND @@ -1943,6 +2037,7 @@ nonReserved | TABLE | TABLES | TABLESAMPLE + | TAG | TARGET | TBLPROPERTIES | TEMPORARY @@ -1977,6 +2072,7 @@ nonReserved | UPDATE | USE | USER + | UNORDERED | VALUES | VARCHAR | VERSION @@ -1990,6 +2086,7 @@ nonReserved | WINDOW | WITH | WITHIN + | WRITE | YEAR | YEARS | ZONE diff --git a/superior-spark-parser/src/main/kotlin/io/github/melin/superior/parser/spark/SparkSqlAntlr4Visitor.kt b/superior-spark-parser/src/main/kotlin/io/github/melin/superior/parser/spark/SparkSqlAntlr4Visitor.kt index e166e822..34998ce4 100644 --- a/superior-spark-parser/src/main/kotlin/io/github/melin/superior/parser/spark/SparkSqlAntlr4Visitor.kt +++ b/superior-spark-parser/src/main/kotlin/io/github/melin/superior/parser/spark/SparkSqlAntlr4Visitor.kt @@ -861,6 +861,69 @@ class SparkSqlAntlr4Visitor(val splitSql: Boolean = false, val command: String?) val functionId = parseTableName(ctx.identifierReference()) return DropFunction(FunctionId(functionId.schemaName, functionId.tableName)) } + + // -----------------------------------iceberg sql start ------------------------------------- + + override fun visitAddPartitionField(ctx: SparkSqlParser.AddPartitionFieldContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + return AlterTable(tableId, AlterTableAction(ADD_PARTITION_FIELD)) + } + + override fun visitDropPartitionField(ctx: SparkSqlParser.DropPartitionFieldContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + return AlterTable(tableId, AlterTableAction(DROP_PARTITION_FIELD)) + } + + override fun visitReplacePartitionField(ctx: SparkSqlParser.ReplacePartitionFieldContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + return AlterTable(tableId, AlterTableAction(REPLACE_PARTITION_FIELD)) + } + + override fun visitSetWriteDistributionAndOrdering( + ctx: SparkSqlParser.SetWriteDistributionAndOrderingContext + ): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + return AlterTable(tableId, AlterTableAction(SET_WRITE_DISTRIBUTION_AND_ORDERING)) + } + + override fun visitSetIdentifierFields(ctx: SparkSqlParser.SetIdentifierFieldsContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + val fields = ctx.fieldList().fields.map { field -> field.text }.toList() + return AlterTable(tableId, AlterSetIdentifierFieldsAction(fields)) + } + + override fun visitDropIdentifierFields(ctx: SparkSqlParser.DropIdentifierFieldsContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + val fields = ctx.fieldList().fields.map { field -> field.text }.toList() + return AlterTable(tableId, AlterDropIdentifierFieldsAction(fields)) + } + + override fun visitCreateOrReplaceTag(ctx: SparkSqlParser.CreateOrReplaceTagContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + val tagName = CommonUtils.cleanQuote(ctx.createReplaceTagClause().identifier().text) + return AlterTable(tableId, AlterCreateTagAction(tagName)) + } + + override fun visitDropTag(ctx: SparkSqlParser.DropTagContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + val tagName = CommonUtils.cleanQuote(ctx.identifier().text) + return AlterTable(tableId, AlterDropTagAction(tagName)) + } + + override fun visitCreateOrReplaceBranch(ctx: SparkSqlParser.CreateOrReplaceBranchContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + val branchName = CommonUtils.cleanQuote(ctx.createReplaceBranchClause().identifier().text) + return AlterTable(tableId, AlterCreateBranchAction(branchName)) + } + + override fun visitDropBranch(ctx: SparkSqlParser.DropBranchContext): Statement { + val tableId = parseTableName(ctx.multipartIdentifier()) + val branchName = CommonUtils.cleanQuote(ctx.identifier().text) + return AlterTable(tableId, AlterDropBranchAction(branchName)) + } + + // -----------------------------------iceberg sql end ------------------------------------- + // -----------------------------------cache------------------------------------------------- override fun visitCacheTable(ctx: SparkSqlParser.CacheTableContext): Statement { diff --git a/superior-spark-parser/src/test/kotlin/io/github/melin/superior/parser/spark/IcebergSqlExtensionsTest.kt b/superior-spark-parser/src/test/kotlin/io/github/melin/superior/parser/spark/IcebergSqlExtensionsTest.kt new file mode 100644 index 00000000..f60f3b89 --- /dev/null +++ b/superior-spark-parser/src/test/kotlin/io/github/melin/superior/parser/spark/IcebergSqlExtensionsTest.kt @@ -0,0 +1,64 @@ +package io.github.melin.superior.parser.spark + +import io.github.melin.superior.common.* +import io.github.melin.superior.common.relational.alter.* +import org.junit.Assert +import org.junit.Test + +/** Created by libinsong on 2018/1/10. */ +class IcebergSqlExtensionsTest { + + @Test + fun createTagTest() { + val sql = "ALTER TABLE prod.db.sample CREATE TAG `historical-tag`" + + val statement = SparkSqlHelper.parseStatement(sql) + if (statement is AlterTable) { + Assert.assertEquals("sample", statement.tableId.tableName) + Assert.assertEquals(AlterActionType.CREATE_TAG, statement.firstAction().alterType) + } else { + Assert.fail() + } + } + + @Test + fun dropTagTest() { + val sql = "ALTER TABLE prod.db.sample DROP TAG `historical-tag`" + + val statement = SparkSqlHelper.parseStatement(sql) + if (statement is AlterTable) { + Assert.assertEquals("sample", statement.tableId.tableName) + Assert.assertEquals(AlterActionType.DROP_TAG, statement.firstAction().alterType) + } else { + Assert.fail() + } + } + + @Test + fun setIdentifierTest() { + val sql = "ALTER TABLE prod.db.sample SET IDENTIFIER FIELDS id, data" + + val statement = SparkSqlHelper.parseStatement(sql) + if (statement is AlterTable) { + Assert.assertEquals("sample", statement.tableId.tableName) + val action = statement.firstAction() as AlterSetIdentifierFieldsAction + Assert.assertEquals(2, action.fields.size) + Assert.assertEquals(AlterActionType.SET_IDENTIFIER_FIELDS, statement.firstAction().alterType) + } else { + Assert.fail() + } + } + + @Test + fun writeSpecTest() { + val sql = "ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id" + + val statement = SparkSqlHelper.parseStatement(sql) + if (statement is AlterTable) { + Assert.assertEquals("sample", statement.tableId.tableName) + Assert.assertEquals(AlterActionType.SET_WRITE_DISTRIBUTION_AND_ORDERING, statement.firstAction().alterType) + } else { + Assert.fail() + } + } +}