From 8c35b11043c63d15ac9abaaa5a5d79a611dc8e7b Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 9 Dec 2024 19:59:38 +0800 Subject: [PATCH] [hive] Make HiveMetastoreClient.addPartition thread safe --- .../metastore/AddPartitionCommitCallback.java | 14 ----------- .../paimon/hive/HiveMetastoreClient.java | 25 ++++++++----------- 2 files changed, 10 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java index 06002161a68e..599f88e512c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java @@ -72,20 +72,6 @@ public void retry(ManifestCommittable committable) { addPartitions(partitions); } - private void addPartition(BinaryRow partition) { - try { - boolean added = cache.get(partition, () -> false); - if (added) { - return; - } - - client.addPartition(partition); - cache.put(partition, true); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - private void addPartitions(Set partitions) { try { List newPartitions = new ArrayList<>(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index cb70e0191145..3793c86f8269 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; @@ -92,21 +93,15 @@ public void addPartitions(List partitions) throws Exception { @Override public void addPartition(LinkedHashMap partitionSpec) throws Exception { - List partitionValues = new ArrayList<>(partitionSpec.values()); - try { - clients.execute( - client -> - client.getPartition( - identifier.getDatabaseName(), - identifier.getTableName(), - partitionValues)); - // do nothing if the partition already exists - } catch (NoSuchObjectException e) { - // partition not found, create new partition - Partition hivePartition = - toHivePartition(partitionSpec, (int) (System.currentTimeMillis() / 1000)); - clients.execute(client -> client.add_partition(hivePartition)); - } + Partition hivePartition = + toHivePartition(partitionSpec, (int) (System.currentTimeMillis() / 1000)); + clients.execute( + client -> { + try { + client.add_partition(hivePartition); + } catch (AlreadyExistsException ignore) { + } + }); } @Override