Skip to content

Commit

Permalink
[core] Adjust create partition interface
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Oct 16, 2024
1 parent cc3ed7f commit ad515b7
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
Expand Down Expand Up @@ -72,9 +69,6 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
public MetastoreClient metastoreClient;

private static final Logger LOG = LoggerFactory.getLogger(AbstractCatalog.class);

@Nullable protected final LineageMetaFactory lineageMetaFactory;

Expand Down Expand Up @@ -162,23 +156,29 @@ public Map<String, String> loadDatabaseProperties(String name)
protected abstract Map<String, String> loadDatabasePropertiesImpl(String name)
throws DatabaseNotExistException;

@Override
public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
TableSchema tableSchema = getDataTableSchema(identifier);
if (!tableSchema.partitionKeys().isEmpty()
&& new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
try {
// Do not close client, it is for HiveCatalog
if (metastoreClient == null) {
throw new UnsupportedOperationException(
"Only Support HiveCatalog in create partition!");
}
metastoreClient.addPartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("the table is not partitioned table in metastore!");
Identifier tableIdentifier =
Identifier.create(identifier.getDatabaseName(), identifier.getTableName());
FileStoreTable table = (FileStoreTable) getTable(tableIdentifier);

if (table.partitionKeys().isEmpty() || !table.coreOptions().partitionedTableInMetastore()) {
throw new UnsupportedOperationException(
"The table is not partitioned table in metastore.");
}

MetastoreClient.Factory metastoreFactory =
table.catalogEnvironment().metastoreClientFactory();
if (metastoreFactory == null) {
throw new UnsupportedOperationException(
"The catalog must have metastore to create partition.");
}

try (MetastoreClient client = metastoreFactory.create()) {
client.addPartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down
10 changes: 10 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ void alterTable(Identifier identifier, List<SchemaChange> changes, boolean ignor
*/
default void invalidateTable(Identifier identifier) {}

/**
* Create the partition of the specify table.
*
* @param identifier path of the table to drop partition
* @param partitionSpec the partition to be created
* @throws TableNotExistException if the table does not exist
*/
void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException;

/**
* Drop the partition of the specify table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,10 @@ public Path getTableLocation(Identifier identifier) {
return wrapped.getTableLocation(identifier);
}

@Override
public void createPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException {
if (wrapped instanceof AbstractCatalog) {
((AbstractCatalog) wrapped).createPartition(identifier, partitions);
} else {
throw new UnsupportedOperationException(
"Only Support HiveCatalog in create partition!");
}
wrapped.createPartition(identifier, partitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@

import javax.annotation.Nullable;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -1156,10 +1155,8 @@ public final void createPartition(

try {
Identifier identifier = toIdentifier(tablePath);
Method func =
catalog.getClass().getMethod("createPartition", Identifier.class, Map.class);
func.invoke(catalog, identifier, partitionSpec.getPartitionSpec());
} catch (Exception e) {
catalog.createPartition(identifier, partitionSpec.getPartitionSpec());
} catch (Catalog.TableNotExistException e) {
throw new CatalogException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,23 +310,6 @@ private Map<String, String> convertToProperties(Database database) {
return properties;
}

@Override
public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
try {
TableSchema tableSchema = getDataTableSchema(identifier);
metastoreClient =
new HiveMetastoreClient(
new Identifier(identifier.getDatabaseName(), identifier.getTableName()),
tableSchema,
clients);
} catch (Exception e) {
throw new RuntimeException(e);
}

super.createPartition(identifier, partitionSpec);
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.spark

import org.apache.paimon.catalog.Identifier
import org.apache.paimon.metastore.MetastoreClient
import org.apache.paimon.operation.FileStoreCommit
import org.apache.paimon.table.FileStoreTable
Expand All @@ -32,7 +31,8 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.types.StructType

import java.util.{LinkedHashMap, Map => JMap, Objects, UUID}
import java.util
import java.util.{Map => JMap, Objects, UUID}

import scala.collection.JavaConverters._

Expand All @@ -43,7 +43,8 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {

override lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(partitionRowType)

override def dropPartitions(internalRows: Array[InternalRow]): Boolean = {
private def toPaimonPartitions(
rows: Array[InternalRow]): Array[java.util.LinkedHashMap[String, String]] = {
table match {
case fileStoreTable: FileStoreTable =>
val rowConverter = CatalystTypeConverters
Expand All @@ -53,12 +54,20 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
partitionRowType,
table.partitionKeys().asScala.toArray)

val partitions = internalRows.map {
rows.map {
r =>
rowDataPartitionComputer
.generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row]))
.asInstanceOf[JMap[String, String]]
}
case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports partitions.")
}
}

override def dropPartitions(rows: Array[InternalRow]): Boolean = {
table match {
case fileStoreTable: FileStoreTable =>
val partitions = toPaimonPartitions(rows).map(_.asInstanceOf[JMap[String, String]])
val commit: FileStoreCommit = fileStoreTable.store.newCommit(UUID.randomUUID.toString)
try {
commit.dropPartitions(partitions.toSeq.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER)
Expand Down Expand Up @@ -114,35 +123,24 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
}

override def createPartitions(
internalRows: Array[InternalRow],
rows: Array[InternalRow],
maps: Array[JMap[String, String]]): Unit = {
table match {
case fileStoreTable: FileStoreTable =>
val rowConverter = CatalystTypeConverters
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
val rowDataPartitionComputer = new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
partitionRowType,
table.partitionKeys().asScala.toArray)
val partitions = internalRows.map {
r =>
rowDataPartitionComputer
.generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row]))
.asInstanceOf[JMap[String, String]]
val partitions = toPaimonPartitions(rows)
val metastoreFactory = fileStoreTable.catalogEnvironment().metastoreClientFactory()
if (metastoreFactory == null) {
throw new UnsupportedOperationException(
"The table must have metastore to create partition.")
}
val metastoreClient: MetastoreClient =
fileStoreTable.catalogEnvironment().metastoreClientFactory().create
partitions.foreach {
partition =>
metastoreClient.addPartition(partition.asInstanceOf[LinkedHashMap[String, String]])
val metastoreClient: MetastoreClient = metastoreFactory.create
try {
partitions.foreach(metastoreClient.addPartition)
} finally {
metastoreClient.close()
}
case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports create partitions.")
}
}

def getIdentifierFromTableName(tableName: String): Identifier = {
val name: Array[String] = tableName.split("\\.")
new Identifier(name.apply(0), name.apply(1))
}
}

0 comments on commit ad515b7

Please sign in to comment.