diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 4ed7c54d8a74f..c02b11435bc7d 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.branch.TableBranch; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; @@ -40,6 +41,7 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.TableType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -168,11 +170,13 @@ public Optional lockContext() { @Override public Optional metastoreClientFactory(Identifier identifier) { + Identifier tableIdentifier = + new Identifier(identifier.getDatabaseName(), identifier.getTableName()); try { return Optional.of( new HiveMetastoreClient.Factory( - identifier, - getDataTableSchema(identifier), + tableIdentifier, + getDataTableSchema(tableIdentifier), hiveConf, clientClassName, options)); @@ -284,18 +288,33 @@ public Map loadDatabasePropertiesImpl(String name) } } + private Map convertToProperties(Database database) { + Map properties = new HashMap<>(database.getParameters()); + if (database.getLocationUri() != null) { + properties.put(DB_LOCATION_PROP, database.getLocationUri()); + } + if (database.getDescription() != null) { + properties.put(COMMENT_PROP, database.getDescription()); + } + return properties; + } + @Override public void dropPartition(Identifier identifier, Map partitionSpec) throws TableNotExistException { TableSchema tableSchema = getDataTableSchema(identifier); if (!tableSchema.partitionKeys().isEmpty() - && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { - + && new CoreOptions(tableSchema.options()).partitionedTableInMetastore() + && !partitionExistsInOtherBranches(identifier, partitionSpec)) { try { // Do not close client, it is for HiveCatalog @SuppressWarnings("resource") HiveMetastoreClient metastoreClient = - new HiveMetastoreClient(identifier, tableSchema, clients); + new HiveMetastoreClient( + new Identifier( + identifier.getDatabaseName(), identifier.getTableName()), + tableSchema, + clients); metastoreClient.deletePartition(new LinkedHashMap<>(partitionSpec)); } catch (Exception e) { throw new RuntimeException(e); @@ -304,15 +323,38 @@ && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { super.dropPartition(identifier, partitionSpec); } - private Map convertToProperties(Database database) { - Map properties = new HashMap<>(database.getParameters()); - if (database.getLocationUri() != null) { - properties.put(DB_LOCATION_PROP, database.getLocationUri()); - } - if (database.getDescription() != null) { - properties.put(COMMENT_PROP, database.getDescription()); + private boolean partitionExistsInOtherBranches( + Identifier identifier, Map partitionSpec) + throws TableNotExistException { + FileStoreTable mainTable = + (FileStoreTable) + getTable( + new Identifier( + identifier.getDatabaseName(), identifier.getTableName())); + List branchNames = + mainTable.branchManager().branches().stream() + .map(TableBranch::getBranchName) + .collect(Collectors.toList()); + branchNames.add(DEFAULT_MAIN_BRANCH); + + for (String branchName : branchNames) { + if (branchName.equals(identifier.getBranchNameOrDefault())) { + continue; + } + + FileStoreTable table = + (FileStoreTable) + getTable( + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + branchName, + null)); + if (!table.newScan().withPartitionFilter(partitionSpec).plan().splits().isEmpty()) { + return true; + } } - return properties; + return false; } @Override diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index e0e19b9ba7b19..fb504f5594e44 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -485,11 +485,48 @@ public void testFlinkCreateBranchAndHiveRead() throws Exception { .containsExactlyInAnyOrder("3\tx3", "3\tx33", "4\tx4"); } + @Test + public void testDropPartitionFromBranch() throws Exception { + tEnv.executeSql( + "CREATE TABLE t ( pt INT, v STRING ) PARTITIONED BY (pt) " + + "WITH ( 'file.format' = 'avro', 'metastore.partitioned-table' = 'true' )") + .await(); + tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await(); + + tEnv.executeSql("INSERT INTO t VALUES (1, 'apple'), (2, 'banana'), (4, 'mango')").await(); + tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'cat'), (3, 'dog'), (4, 'lion')") + .await(); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=3", "pt=4"); + + tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 1)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=3", "pt=4"); + + tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 3)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=4"); + + tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 1)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=2", "pt=4"); + + tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 4)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=2", "pt=4"); + + tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 4)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")).containsExactlyInAnyOrder("pt=2"); + + tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 2)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")).isEmpty(); + } + @Test public void testFallbackBranchRead() throws Exception { tEnv.executeSql( "CREATE TABLE t ( pt INT, a INT, b STRING ) PARTITIONED BY (pt) " - + "WITH ( 'file.format' = 'avro' )") + + "WITH ( 'file.format' = 'avro', 'metastore.partitioned-table' = 'true' )") .await(); tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await(); tEnv.executeSql( @@ -510,9 +547,11 @@ public void testFallbackBranchRead() throws Exception { Row.of(1, 20, "banana"), Row.of(2, 10, "lion"), Row.of(2, 20, "wolf")); - assertThat(hiveShell.executeQuery("SELECT * FROM t")) + assertThat(hiveShell.executeQuery("SELECT pt, a, b FROM t")) .containsExactlyInAnyOrder( "1\t10\tapple", "1\t20\tbanana", "2\t10\tlion", "2\t20\twolf"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=1", "pt=2"); } /**