From 265937e15dfccf79664bd84a73ea9dc5b293fa33 Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Wed, 7 Aug 2024 21:38:10 +0800 Subject: [PATCH] [spark] Support alter table drop multi partitions (#3916) --- .../spark/PaimonPartitionManagement.scala | 36 +++++++++++-------- .../sql/PaimonPartitionManagementTest.scala | 20 ++++++++++- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 1aabcddf1ca5..113ad4c3b7a8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -27,40 +27,44 @@ import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement +import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement import org.apache.spark.sql.types.StructType -import java.util.{Collections, Map => JMap, Objects, UUID} +import java.util.{Map => JMap, Objects, UUID} import scala.collection.JavaConverters._ -trait PaimonPartitionManagement extends SupportsPartitionManagement { +trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { self: SparkTable => private lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, table.partitionKeys) override lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(partitionRowType) - override def dropPartition(internalRow: InternalRow): Boolean = { + override def dropPartitions(internalRows: Array[InternalRow]): Boolean = { table match { case fileStoreTable: FileStoreTable => - // convert internalRow to row - val row: Row = CatalystTypeConverters + val rowConverter = CatalystTypeConverters .createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema)) - .apply(internalRow) - .asInstanceOf[Row] val rowDataPartitionComputer = new InternalRowPartitionComputer( fileStoreTable.coreOptions().partitionDefaultName(), partitionRowType, table.partitionKeys().asScala.toArray) - val partitionMap = - rowDataPartitionComputer.generatePartValues(new SparkRow(partitionRowType, row)) + + val partitions = internalRows.map { + r => + rowDataPartitionComputer + .generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row])) + .asInstanceOf[JMap[String, String]] + } val commit: FileStoreCommit = fileStoreTable.store.newCommit(UUID.randomUUID.toString) - commit.dropPartitions( - Collections.singletonList(partitionMap), - BatchWriteBuilder.COMMIT_IDENTIFIER) - commit.close() + try { + commit.dropPartitions(partitions.toSeq.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER) + } finally { + commit.close() + } true + case _ => throw new UnsupportedOperationException("Only FileStoreTable supports drop partitions.") } @@ -107,7 +111,9 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement { .toArray } - override def createPartition(ident: InternalRow, properties: JMap[String, String]): Unit = { + override def createPartitions( + internalRows: Array[InternalRow], + maps: Array[JMap[String, String]]): Unit = { throw new UnsupportedOperationException("Create partition is not supported") } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala index bb24b427fb00..3ed99bad596a 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala @@ -61,6 +61,10 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase { spark.sql("alter table T drop partition (hh='1134')") } + assertThrows[AnalysisException] { + spark.sql("alter table T drop partition (dt='20230816'), partition (dt='20230817')") + } + assertThrows[AnalysisException] { spark.sql("show partitions T partition (dt='20230816', hh='1134')") } @@ -101,12 +105,15 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase { spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1132')") spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1133')") spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1134')") + spark.sql("INSERT INTO T VALUES('a','b',2,20240101,'00')") + spark.sql("INSERT INTO T VALUES('a','b',2,20240102,'00')") checkAnswer( spark.sql("show partitions T "), Row("dt=20230816/hh=1132") :: Row("dt=20230816/hh=1133") :: Row("dt=20230816/hh=1134") :: Row("dt=20230817/hh=1132") :: Row( - "dt=20230817/hh=1133") :: Row("dt=20230817/hh=1134") :: Nil + "dt=20230817/hh=1133") :: Row("dt=20230817/hh=1134") :: Row( + "dt=20240101/hh=00") :: Row("dt=20240102/hh=00") :: Nil ) checkAnswer( @@ -130,6 +137,17 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase { spark.sql("alter table T drop partition (dt=20230817, hh='1133')") + assertThrows[AnalysisException] { + spark.sql( + "alter table T drop partition (dt=20240101, hh='00'), partition(dt=999999, hh='00')") + } + checkAnswer( + spark.sql("show partitions T PARTITION (dt=20240101)"), + Row("dt=20240101/hh=00") :: Nil) + + spark.sql( + "alter table T drop partition (dt=20240101, hh='00'), partition (dt=20240102, hh='00')") + assertThrows[AnalysisException] { spark.sql("alter table T drop partition (dt=20230816)") }