Skip to content

Commit

Permalink
[spark] Support alter table drop multi partitions (#3916)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Aug 7, 2024
1 parent 37e8fb0 commit 265937e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,44 @@ import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.types.StructType

import java.util.{Collections, Map => JMap, Objects, UUID}
import java.util.{Map => JMap, Objects, UUID}

import scala.collection.JavaConverters._

trait PaimonPartitionManagement extends SupportsPartitionManagement {
trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
self: SparkTable =>

private lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, table.partitionKeys)

override lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(partitionRowType)

override def dropPartition(internalRow: InternalRow): Boolean = {
override def dropPartitions(internalRows: Array[InternalRow]): Boolean = {
table match {
case fileStoreTable: FileStoreTable =>
// convert internalRow to row
val row: Row = CatalystTypeConverters
val rowConverter = CatalystTypeConverters
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
.apply(internalRow)
.asInstanceOf[Row]
val rowDataPartitionComputer = new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
partitionRowType,
table.partitionKeys().asScala.toArray)
val partitionMap =
rowDataPartitionComputer.generatePartValues(new SparkRow(partitionRowType, row))

val partitions = internalRows.map {
r =>
rowDataPartitionComputer
.generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row]))
.asInstanceOf[JMap[String, String]]
}
val commit: FileStoreCommit = fileStoreTable.store.newCommit(UUID.randomUUID.toString)
commit.dropPartitions(
Collections.singletonList(partitionMap),
BatchWriteBuilder.COMMIT_IDENTIFIER)
commit.close()
try {
commit.dropPartitions(partitions.toSeq.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER)
} finally {
commit.close()
}
true

case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports drop partitions.")
}
Expand Down Expand Up @@ -107,7 +111,9 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement {
.toArray
}

override def createPartition(ident: InternalRow, properties: JMap[String, String]): Unit = {
override def createPartitions(
internalRows: Array[InternalRow],
maps: Array[JMap[String, String]]): Unit = {
throw new UnsupportedOperationException("Create partition is not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {
spark.sql("alter table T drop partition (hh='1134')")
}

assertThrows[AnalysisException] {
spark.sql("alter table T drop partition (dt='20230816'), partition (dt='20230817')")
}

assertThrows[AnalysisException] {
spark.sql("show partitions T partition (dt='20230816', hh='1134')")
}
Expand Down Expand Up @@ -101,12 +105,15 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {
spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1132')")
spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1133')")
spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1134')")
spark.sql("INSERT INTO T VALUES('a','b',2,20240101,'00')")
spark.sql("INSERT INTO T VALUES('a','b',2,20240102,'00')")

checkAnswer(
spark.sql("show partitions T "),
Row("dt=20230816/hh=1132") :: Row("dt=20230816/hh=1133")
:: Row("dt=20230816/hh=1134") :: Row("dt=20230817/hh=1132") :: Row(
"dt=20230817/hh=1133") :: Row("dt=20230817/hh=1134") :: Nil
"dt=20230817/hh=1133") :: Row("dt=20230817/hh=1134") :: Row(
"dt=20240101/hh=00") :: Row("dt=20240102/hh=00") :: Nil
)

checkAnswer(
Expand All @@ -130,6 +137,17 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {

spark.sql("alter table T drop partition (dt=20230817, hh='1133')")

assertThrows[AnalysisException] {
spark.sql(
"alter table T drop partition (dt=20240101, hh='00'), partition(dt=999999, hh='00')")
}
checkAnswer(
spark.sql("show partitions T PARTITION (dt=20240101)"),
Row("dt=20240101/hh=00") :: Nil)

spark.sql(
"alter table T drop partition (dt=20240101, hh='00'), partition (dt=20240102, hh='00')")

assertThrows[AnalysisException] {
spark.sql("alter table T drop partition (dt=20230816)")
}
Expand Down

0 comments on commit 265937e

Please sign in to comment.