Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Adjust create partition interface #4334

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
Expand Down Expand Up @@ -72,9 +69,6 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
public MetastoreClient metastoreClient;

private static final Logger LOG = LoggerFactory.getLogger(AbstractCatalog.class);

@Nullable protected final LineageMetaFactory lineageMetaFactory;

Expand Down Expand Up @@ -162,23 +156,29 @@ public Map<String, String> loadDatabaseProperties(String name)
protected abstract Map<String, String> loadDatabasePropertiesImpl(String name)
throws DatabaseNotExistException;

@Override
public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
TableSchema tableSchema = getDataTableSchema(identifier);
if (!tableSchema.partitionKeys().isEmpty()
&& new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
try {
// Do not close client, it is for HiveCatalog
if (metastoreClient == null) {
throw new UnsupportedOperationException(
"Only Support HiveCatalog in create partition!");
}
metastoreClient.addPartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("the table is not partitioned table in metastore!");
Identifier tableIdentifier =
Identifier.create(identifier.getDatabaseName(), identifier.getTableName());
FileStoreTable table = (FileStoreTable) getTable(tableIdentifier);

if (table.partitionKeys().isEmpty() || !table.coreOptions().partitionedTableInMetastore()) {
throw new UnsupportedOperationException(
"The table is not partitioned table in metastore.");
}

MetastoreClient.Factory metastoreFactory =
table.catalogEnvironment().metastoreClientFactory();
if (metastoreFactory == null) {
throw new UnsupportedOperationException(
"The catalog must have metastore to create partition.");
}

try (MetastoreClient client = metastoreFactory.create()) {
client.addPartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down
10 changes: 10 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ void alterTable(Identifier identifier, List<SchemaChange> changes, boolean ignor
*/
default void invalidateTable(Identifier identifier) {}

/**
* Create the partition of the specify table.
*
* @param identifier path of the table to drop partition
* @param partitionSpec the partition to be created
* @throws TableNotExistException if the table does not exist
*/
void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException;

/**
* Drop the partition of the specify table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,10 @@ public Path getTableLocation(Identifier identifier) {
return wrapped.getTableLocation(identifier);
}

@Override
public void createPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException {
if (wrapped instanceof AbstractCatalog) {
((AbstractCatalog) wrapped).createPartition(identifier, partitions);
} else {
throw new UnsupportedOperationException(
"Only Support HiveCatalog in create partition!");
}
wrapped.createPartition(identifier, partitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@

import javax.annotation.Nullable;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -1156,10 +1155,8 @@ public final void createPartition(

try {
Identifier identifier = toIdentifier(tablePath);
Method func =
catalog.getClass().getMethod("createPartition", Identifier.class, Map.class);
func.invoke(catalog, identifier, partitionSpec.getPartitionSpec());
} catch (Exception e) {
catalog.createPartition(identifier, partitionSpec.getPartitionSpec());
} catch (Catalog.TableNotExistException e) {
throw new CatalogException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,23 +310,6 @@ private Map<String, String> convertToProperties(Database database) {
return properties;
}

@Override
public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
try {
TableSchema tableSchema = getDataTableSchema(identifier);
metastoreClient =
new HiveMetastoreClient(
new Identifier(identifier.getDatabaseName(), identifier.getTableName()),
tableSchema,
clients);
} catch (Exception e) {
throw new RuntimeException(e);
}

super.createPartition(identifier, partitionSpec);
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.spark

import org.apache.paimon.catalog.Identifier
import org.apache.paimon.metastore.MetastoreClient
import org.apache.paimon.operation.FileStoreCommit
import org.apache.paimon.table.FileStoreTable
Expand All @@ -32,7 +31,8 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.types.StructType

import java.util.{LinkedHashMap, Map => JMap, Objects, UUID}
import java.util
import java.util.{Map => JMap, Objects, UUID}

import scala.collection.JavaConverters._

Expand All @@ -43,7 +43,8 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {

override lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(partitionRowType)

override def dropPartitions(internalRows: Array[InternalRow]): Boolean = {
private def toPaimonPartitions(
rows: Array[InternalRow]): Array[java.util.LinkedHashMap[String, String]] = {
table match {
case fileStoreTable: FileStoreTable =>
val rowConverter = CatalystTypeConverters
Expand All @@ -53,12 +54,20 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
partitionRowType,
table.partitionKeys().asScala.toArray)

val partitions = internalRows.map {
rows.map {
r =>
rowDataPartitionComputer
.generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row]))
.asInstanceOf[JMap[String, String]]
}
case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports partitions.")
}
}

override def dropPartitions(rows: Array[InternalRow]): Boolean = {
table match {
case fileStoreTable: FileStoreTable =>
val partitions = toPaimonPartitions(rows).map(_.asInstanceOf[JMap[String, String]])
val commit: FileStoreCommit = fileStoreTable.store.newCommit(UUID.randomUUID.toString)
try {
commit.dropPartitions(partitions.toSeq.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER)
Expand Down Expand Up @@ -114,35 +123,24 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
}

override def createPartitions(
internalRows: Array[InternalRow],
rows: Array[InternalRow],
maps: Array[JMap[String, String]]): Unit = {
table match {
case fileStoreTable: FileStoreTable =>
val rowConverter = CatalystTypeConverters
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
val rowDataPartitionComputer = new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
partitionRowType,
table.partitionKeys().asScala.toArray)
val partitions = internalRows.map {
r =>
rowDataPartitionComputer
.generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row]))
.asInstanceOf[JMap[String, String]]
val partitions = toPaimonPartitions(rows)
val metastoreFactory = fileStoreTable.catalogEnvironment().metastoreClientFactory()
if (metastoreFactory == null) {
throw new UnsupportedOperationException(
"The table must have metastore to create partition.")
}
val metastoreClient: MetastoreClient =
fileStoreTable.catalogEnvironment().metastoreClientFactory().create
partitions.foreach {
partition =>
metastoreClient.addPartition(partition.asInstanceOf[LinkedHashMap[String, String]])
val metastoreClient: MetastoreClient = metastoreFactory.create
try {
partitions.foreach(metastoreClient.addPartition)
} finally {
metastoreClient.close()
}
case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports create partitions.")
}
}

def getIdentifierFromTableName(tableName: String): Identifier = {
val name: Array[String] = tableName.split("\\.")
new Identifier(name.apply(0), name.apply(1))
}
}
Loading