From ad515b70cb37df293a272e48859a8a15391d6690 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 16 Oct 2024 11:22:05 +0800 Subject: [PATCH] [core] Adjust create partition interface --- .../paimon/catalog/AbstractCatalog.java | 42 +++++++-------- .../org/apache/paimon/catalog/Catalog.java | 10 ++++ .../paimon/catalog/DelegateCatalog.java | 8 +-- .../org/apache/paimon/flink/FlinkCatalog.java | 7 +-- .../org/apache/paimon/hive/HiveCatalog.java | 17 ------ .../spark/PaimonPartitionManagement.scala | 52 +++++++++---------- 6 files changed, 60 insertions(+), 76 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 2542d3777ea2..276d75af433f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -41,9 +41,6 @@ import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.IOException; @@ -72,9 +69,6 @@ public abstract class AbstractCatalog implements Catalog { protected final FileIO fileIO; protected final Map tableDefaultOptions; protected final Options catalogOptions; - public MetastoreClient metastoreClient; - - private static final Logger LOG = LoggerFactory.getLogger(AbstractCatalog.class); @Nullable protected final LineageMetaFactory lineageMetaFactory; @@ -162,23 +156,29 @@ public Map loadDatabaseProperties(String name) protected abstract Map loadDatabasePropertiesImpl(String name) throws DatabaseNotExistException; + @Override public void createPartition(Identifier identifier, Map partitionSpec) throws TableNotExistException { - TableSchema tableSchema = getDataTableSchema(identifier); - if (!tableSchema.partitionKeys().isEmpty() - && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { - try { - // Do not close client, it is for HiveCatalog - if (metastoreClient == null) { - throw new UnsupportedOperationException( - "Only Support HiveCatalog in create partition!"); - } - metastoreClient.addPartition(new LinkedHashMap<>(partitionSpec)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - throw new RuntimeException("the table is not partitioned table in metastore!"); + Identifier tableIdentifier = + Identifier.create(identifier.getDatabaseName(), identifier.getTableName()); + FileStoreTable table = (FileStoreTable) getTable(tableIdentifier); + + if (table.partitionKeys().isEmpty() || !table.coreOptions().partitionedTableInMetastore()) { + throw new UnsupportedOperationException( + "The table is not partitioned table in metastore."); + } + + MetastoreClient.Factory metastoreFactory = + table.catalogEnvironment().metastoreClientFactory(); + if (metastoreFactory == null) { + throw new UnsupportedOperationException( + "The catalog must have metastore to create partition."); + } + + try (MetastoreClient client = metastoreFactory.create()) { + client.addPartition(new LinkedHashMap<>(partitionSpec)); + } catch (Exception e) { + throw new RuntimeException(e); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 106f7b15afd7..9f069dc3626d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -262,6 +262,16 @@ void alterTable(Identifier identifier, List changes, boolean ignor */ default void invalidateTable(Identifier identifier) {} + /** + * Create the partition of the specify table. + * + * @param identifier path of the table to drop partition + * @param partitionSpec the partition to be created + * @throws TableNotExistException if the table does not exist + */ + void createPartition(Identifier identifier, Map partitionSpec) + throws TableNotExistException; + /** * Drop the partition of the specify table. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 53d764f89a59..814de16d6e4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -140,14 +140,10 @@ public Path getTableLocation(Identifier identifier) { return wrapped.getTableLocation(identifier); } + @Override public void createPartition(Identifier identifier, Map partitions) throws TableNotExistException { - if (wrapped instanceof AbstractCatalog) { - ((AbstractCatalog) wrapped).createPartition(identifier, partitions); - } else { - throw new UnsupportedOperationException( - "Only Support HiveCatalog in create partition!"); - } + wrapped.createPartition(identifier, partitions); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 7e006185c48b..633fa9c4bde1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -101,7 +101,6 @@ import javax.annotation.Nullable; -import java.lang.reflect.Method; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -1156,10 +1155,8 @@ public final void createPartition( try { Identifier identifier = toIdentifier(tablePath); - Method func = - catalog.getClass().getMethod("createPartition", Identifier.class, Map.class); - func.invoke(catalog, identifier, partitionSpec.getPartitionSpec()); - } catch (Exception e) { + catalog.createPartition(identifier, partitionSpec.getPartitionSpec()); + } catch (Catalog.TableNotExistException e) { throw new CatalogException(e); } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index e84222bd9b89..4d9c482a20da 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -310,23 +310,6 @@ private Map convertToProperties(Database database) { return properties; } - @Override - public void createPartition(Identifier identifier, Map partitionSpec) - throws TableNotExistException { - try { - TableSchema tableSchema = getDataTableSchema(identifier); - metastoreClient = - new HiveMetastoreClient( - new Identifier(identifier.getDatabaseName(), identifier.getTableName()), - tableSchema, - clients); - } catch (Exception e) { - throw new RuntimeException(e); - } - - super.createPartition(identifier, partitionSpec); - } - @Override public void dropPartition(Identifier identifier, Map partitionSpec) throws TableNotExistException { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 8bf81641786a..23a7ac06bae4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -18,7 +18,6 @@ package org.apache.paimon.spark -import org.apache.paimon.catalog.Identifier import org.apache.paimon.metastore.MetastoreClient import org.apache.paimon.operation.FileStoreCommit import org.apache.paimon.table.FileStoreTable @@ -32,7 +31,8 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement import org.apache.spark.sql.types.StructType -import java.util.{LinkedHashMap, Map => JMap, Objects, UUID} +import java.util +import java.util.{Map => JMap, Objects, UUID} import scala.collection.JavaConverters._ @@ -43,7 +43,8 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { override lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(partitionRowType) - override def dropPartitions(internalRows: Array[InternalRow]): Boolean = { + private def toPaimonPartitions( + rows: Array[InternalRow]): Array[java.util.LinkedHashMap[String, String]] = { table match { case fileStoreTable: FileStoreTable => val rowConverter = CatalystTypeConverters @@ -53,12 +54,20 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { partitionRowType, table.partitionKeys().asScala.toArray) - val partitions = internalRows.map { + rows.map { r => rowDataPartitionComputer .generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row])) - .asInstanceOf[JMap[String, String]] } + case _ => + throw new UnsupportedOperationException("Only FileStoreTable supports partitions.") + } + } + + override def dropPartitions(rows: Array[InternalRow]): Boolean = { + table match { + case fileStoreTable: FileStoreTable => + val partitions = toPaimonPartitions(rows).map(_.asInstanceOf[JMap[String, String]]) val commit: FileStoreCommit = fileStoreTable.store.newCommit(UUID.randomUUID.toString) try { commit.dropPartitions(partitions.toSeq.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER) @@ -114,35 +123,24 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { } override def createPartitions( - internalRows: Array[InternalRow], + rows: Array[InternalRow], maps: Array[JMap[String, String]]): Unit = { table match { case fileStoreTable: FileStoreTable => - val rowConverter = CatalystTypeConverters - .createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema)) - val rowDataPartitionComputer = new InternalRowPartitionComputer( - fileStoreTable.coreOptions().partitionDefaultName(), - partitionRowType, - table.partitionKeys().asScala.toArray) - val partitions = internalRows.map { - r => - rowDataPartitionComputer - .generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row])) - .asInstanceOf[JMap[String, String]] + val partitions = toPaimonPartitions(rows) + val metastoreFactory = fileStoreTable.catalogEnvironment().metastoreClientFactory() + if (metastoreFactory == null) { + throw new UnsupportedOperationException( + "The table must have metastore to create partition.") } - val metastoreClient: MetastoreClient = - fileStoreTable.catalogEnvironment().metastoreClientFactory().create - partitions.foreach { - partition => - metastoreClient.addPartition(partition.asInstanceOf[LinkedHashMap[String, String]]) + val metastoreClient: MetastoreClient = metastoreFactory.create + try { + partitions.foreach(metastoreClient.addPartition) + } finally { + metastoreClient.close() } case _ => throw new UnsupportedOperationException("Only FileStoreTable supports create partitions.") } } - - def getIdentifierFromTableName(tableName: String): Identifier = { - val name: Array[String] = tableName.split("\\.") - new Identifier(name.apply(0), name.apply(1)) - } }