From 6e9c40a0baa4d293b48a8194492e9b6eb9fe2d05 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 7 Nov 2024 15:34:34 +0800 Subject: [PATCH] add partitions --- .../metastore/AddPartitionCommitCallback.java | 36 ++++++++++++++----- .../paimon/metastore/MetastoreClient.java | 14 ++++++++ .../paimon/hive/HiveMetastoreClient.java | 36 +++++++++++++++++++ 3 files changed, 77 insertions(+), 9 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java index 4f7d3d554ae2..44a021da9a9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java @@ -30,7 +30,10 @@ import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder; import java.time.Duration; +import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** A {@link CommitCallback} to add newly created partitions to metastore. */ public class AddPartitionCommitCallback implements CommitCallback { @@ -52,19 +55,21 @@ public AddPartitionCommitCallback(MetastoreClient client) { @Override public void call(List committedEntries, Snapshot snapshot) { - committedEntries.stream() - .filter(e -> FileKind.ADD.equals(e.kind())) - .map(ManifestEntry::partition) - .distinct() - .forEach(this::addPartition); + Set partitions = + committedEntries.stream() + .filter(e -> FileKind.ADD.equals(e.kind())) + .map(ManifestEntry::partition) + .collect(Collectors.toSet()); + addPartitions(partitions); } @Override public void retry(ManifestCommittable committable) { - committable.fileCommittables().stream() - .map(CommitMessage::partition) - .distinct() - .forEach(this::addPartition); + Set partitions = + committable.fileCommittables().stream() + .map(CommitMessage::partition) + .collect(Collectors.toSet()); + addPartitions(partitions); } private void addPartition(BinaryRow partition) { @@ -81,6 +86,19 @@ private void addPartition(BinaryRow partition) { } } + private void addPartitions(Set partitions) { + try { + List filteredPartitions = new ArrayList<>(); + for (BinaryRow partition : partitions) { + if (!cache.get(partition, () -> false)) filteredPartitions.add(partition); + } + client.addPartitions(filteredPartitions); + filteredPartitions.forEach(partition -> cache.put(partition, true)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public void close() throws Exception { client.close(); diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java index ac12bfc73490..60e28c59f45d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** @@ -32,8 +33,21 @@ public interface MetastoreClient extends AutoCloseable { void addPartition(BinaryRow partition) throws Exception; + default void addPartitions(List partitions) throws Exception { + for (BinaryRow partition : partitions) { + addPartition(partition); + } + } + void addPartition(LinkedHashMap partitionSpec) throws Exception; + default void addPartitionsSpec(List> partitionSpecsList) + throws Exception { + for (LinkedHashMap partitionSpecs : partitionSpecsList) { + addPartition(partitionSpecs); + } + } + void deletePartition(LinkedHashMap partitionSpec) throws Exception; void markDone(LinkedHashMap partitionSpec) throws Exception; diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 5856515bb866..d355fe94002a 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -41,6 +41,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** {@link MetastoreClient} for Hive tables. */ public class HiveMetastoreClient implements MetastoreClient { @@ -81,6 +82,14 @@ public void addPartition(BinaryRow partition) throws Exception { addPartition(partitionComputer.generatePartValues(partition)); } + @Override + public void addPartitions(List partitions) throws Exception { + addPartitionsSpec( + partitions.stream() + .map(partitionComputer::generatePartValues) + .collect(Collectors.toList())); + } + @Override public void addPartition(LinkedHashMap partitionSpec) throws Exception { List partitionValues = new ArrayList<>(partitionSpec.values()); @@ -113,6 +122,33 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc } } + @Override + public void addPartitionsSpec(List> partitionSpecsList) + throws Exception { + String databaseName = identifier.getDatabaseName(); + String tableName = identifier.getTableName(); + ArrayList hivePartitions = new ArrayList<>(); + int currentTime = (int) (System.currentTimeMillis() / 1000); + for (LinkedHashMap partitionSpec : partitionSpecsList) { + List partitionValues = new ArrayList<>(partitionSpec.values()); + StorageDescriptor newSd = new StorageDescriptor(sd); + newSd.setLocation( + sd.getLocation() + + "/" + + PartitionPathUtils.generatePartitionPath(partitionSpec)); + Partition hivePartition = new Partition(); + hivePartition.setDbName(databaseName); + hivePartition.setTableName(tableName); + hivePartition.setValues(partitionValues); + hivePartition.setSd(newSd); + hivePartition.setCreateTime(currentTime); + hivePartition.setLastAccessTime(currentTime); + hivePartitions.add(hivePartition); + } + + clients.execute(client -> client.add_partitions(hivePartitions, true, false)); + } + @Override public void alterPartition( LinkedHashMap partitionSpec,