Skip to content

Commit

Permalink
[hive] Partition is dropped from metastore only when all branches do …
Browse files Browse the repository at this point in the history
…not contain this partition
  • Loading branch information
tsreaper committed Aug 1, 2024
1 parent 59143c1 commit b74e440
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand All @@ -40,6 +41,7 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -168,11 +170,13 @@ public Optional<CatalogLockContext> lockContext() {

@Override
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
Identifier tableIdentifier =
new Identifier(identifier.getDatabaseName(), identifier.getTableName());
try {
return Optional.of(
new HiveMetastoreClient.Factory(
identifier,
getDataTableSchema(identifier),
tableIdentifier,
getDataTableSchema(tableIdentifier),
hiveConf,
clientClassName,
options));
Expand Down Expand Up @@ -284,18 +288,33 @@ public Map<String, String> loadDatabasePropertiesImpl(String name)
}
}

private Map<String, String> convertToProperties(Database database) {
Map<String, String> properties = new HashMap<>(database.getParameters());
if (database.getLocationUri() != null) {
properties.put(DB_LOCATION_PROP, database.getLocationUri());
}
if (database.getDescription() != null) {
properties.put(COMMENT_PROP, database.getDescription());
}
return properties;
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
TableSchema tableSchema = getDataTableSchema(identifier);
if (!tableSchema.partitionKeys().isEmpty()
&& new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {

&& new CoreOptions(tableSchema.options()).partitionedTableInMetastore()
&& !partitionExistsInOtherBranches(identifier, partitionSpec)) {
try {
// Do not close client, it is for HiveCatalog
@SuppressWarnings("resource")
HiveMetastoreClient metastoreClient =
new HiveMetastoreClient(identifier, tableSchema, clients);
new HiveMetastoreClient(
new Identifier(
identifier.getDatabaseName(), identifier.getTableName()),
tableSchema,
clients);
metastoreClient.deletePartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -304,15 +323,38 @@ && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
super.dropPartition(identifier, partitionSpec);
}

private Map<String, String> convertToProperties(Database database) {
Map<String, String> properties = new HashMap<>(database.getParameters());
if (database.getLocationUri() != null) {
properties.put(DB_LOCATION_PROP, database.getLocationUri());
}
if (database.getDescription() != null) {
properties.put(COMMENT_PROP, database.getDescription());
private boolean partitionExistsInOtherBranches(
Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
FileStoreTable mainTable =
(FileStoreTable)
getTable(
new Identifier(
identifier.getDatabaseName(), identifier.getTableName()));
List<String> branchNames =
mainTable.branchManager().branches().stream()
.map(TableBranch::getBranchName)
.collect(Collectors.toList());
branchNames.add(DEFAULT_MAIN_BRANCH);

for (String branchName : branchNames) {
if (branchName.equals(identifier.getBranchNameOrDefault())) {
continue;
}

FileStoreTable table =
(FileStoreTable)
getTable(
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
branchName,
null));
if (!table.newScan().withPartitionFilter(partitionSpec).plan().splits().isEmpty()) {
return true;
}
}
return properties;
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,48 @@ public void testFlinkCreateBranchAndHiveRead() throws Exception {
.containsExactlyInAnyOrder("3\tx3", "3\tx33", "4\tx4");
}

@Test
public void testDropPartitionFromBranch() throws Exception {
tEnv.executeSql(
"CREATE TABLE t ( pt INT, v STRING ) PARTITIONED BY (pt) "
+ "WITH ( 'file.format' = 'avro', 'metastore.partitioned-table' = 'true' )")
.await();
tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();

tEnv.executeSql("INSERT INTO t VALUES (1, 'apple'), (2, 'banana'), (4, 'mango')").await();
tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'cat'), (3, 'dog'), (4, 'lion')")
.await();
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("pt=1", "pt=2", "pt=3", "pt=4");

tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 1)");
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("pt=1", "pt=2", "pt=3", "pt=4");

tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 3)");
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("pt=1", "pt=2", "pt=4");

tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 1)");
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("pt=2", "pt=4");

tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 4)");
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("pt=2", "pt=4");

tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 4)");
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")).containsExactlyInAnyOrder("pt=2");

tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 2)");
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")).isEmpty();
}

@Test
public void testFallbackBranchRead() throws Exception {
tEnv.executeSql(
"CREATE TABLE t ( pt INT, a INT, b STRING ) PARTITIONED BY (pt) "
+ "WITH ( 'file.format' = 'avro' )")
+ "WITH ( 'file.format' = 'avro', 'metastore.partitioned-table' = 'true' )")
.await();
tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
tEnv.executeSql(
Expand All @@ -510,9 +547,11 @@ public void testFallbackBranchRead() throws Exception {
Row.of(1, 20, "banana"),
Row.of(2, 10, "lion"),
Row.of(2, 20, "wolf"));
assertThat(hiveShell.executeQuery("SELECT * FROM t"))
assertThat(hiveShell.executeQuery("SELECT pt, a, b FROM t"))
.containsExactlyInAnyOrder(
"1\t10\tapple", "1\t20\tbanana", "2\t10\tlion", "2\t20\twolf");
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("pt=1", "pt=2");
}

/**
Expand Down

0 comments on commit b74e440

Please sign in to comment.