From 8791c5d9817d2c38d6f3422a4aba799ebf403846 Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Wed, 29 May 2024 10:34:34 +0800 Subject: [PATCH] [hive] Fix flink not synchronizing to hive metastore when deleting table partitions using hive catalog (#3411) --- .../paimon/table/PrimaryKeyTableUtils.java | 8 +------- .../org/apache/paimon/hive/HiveCatalog.java | 20 +++++++++++++++++++ .../paimon/hive/HiveCatalogITCaseBase.java | 13 ++++++++++++ 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java index 572e488c6b51..b3dbbdd296c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java @@ -45,13 +45,7 @@ public static RowType addKeyNamePrefix(RowType type) { public static List addKeyNamePrefix(List keyFields) { return keyFields.stream() - .map( - f -> - new DataField( - f.id(), - KEY_FIELD_PREFIX + f.name(), - f.type(), - f.description())) + .map(f -> f.newName(KEY_FIELD_PREFIX + f.name())) .collect(Collectors.toList()); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 25cb8784d407..6a1bb9d0acd1 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -73,6 +73,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -268,6 +269,25 @@ public Map loadDatabasePropertiesImpl(String name) { } } + @Override + public void dropPartition(Identifier identifier, Map partitionSpec) + throws TableNotExistException { + TableSchema tableSchema = getDataTableSchema(identifier); + if (!tableSchema.partitionKeys().isEmpty() + && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { + try { + // Do not close client, it is for HiveCatalog + @SuppressWarnings("resource") + HiveMetastoreClient metastoreClient = + new HiveMetastoreClient(identifier, tableSchema, client); + metastoreClient.deletePartition(new LinkedHashMap<>(partitionSpec)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + super.dropPartition(identifier, partitionSpec); + } + private Map convertToProperties(Database database) { Map properties = new HashMap<>(database.getParameters()); if (database.getLocationUri() != null) { diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index ac140d9496a3..cfff7f38c940 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -970,6 +970,19 @@ public void testAddPartitionsToMetastoreForUnpartitionedTable() throws Exception .containsExactlyInAnyOrder("1\t10", "2\t20"); } + @Test + public void testDropPartitionsToMetastore() throws Exception { + prepareTestAddPartitionsToMetastore(); + + // drop partition + tEnv.executeSql( + "ALTER TABLE t DROP PARTITION (ptb = '1a', pta = 1), PARTITION (ptb = '1b', pta = 1)") + .await(); + assertThat(hiveShell.executeQuery("show partitions t")) + .containsExactlyInAnyOrder( + "ptb=2a/pta=2", "ptb=2b/pta=2", "ptb=3a/pta=3", "ptb=3b/pta=3"); + } + @Test public void testAddPartitionsForTag() throws Exception { tEnv.executeSql(