From d4d85c87a835e9b0951ea6fb77f1db478847418b Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 5 Aug 2024 18:51:31 +0800 Subject: [PATCH] [hive] Partition is dropped from metastore only when all branches do not contain this partition (#3867) --- .../org/apache/paimon/branch/TableBranch.java | 66 -------------- .../paimon/table/system/BranchesTable.java | 87 ++++++++++++++++--- .../apache/paimon/utils/BranchManager.java | 56 +----------- .../apache/paimon/flink/BranchSqlITCase.java | 39 +++------ .../org/apache/paimon/hive/HiveCatalog.java | 66 +++++++++++--- .../paimon/hive/HiveCatalogITCaseBase.java | 53 ++++++++++- 6 files changed, 196 insertions(+), 171 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java diff --git a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java deleted file mode 100644 index 9b5b478fc4d02..0000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.branch; - -/** {@link TableBranch} has branch relevant information for table. */ -public class TableBranch { - private final String branchName; - private final String createdFromTag; - private final Long createdFromSnapshot; - - private final long createTime; - - public TableBranch(String branchName, Long createdFromSnapshot, long createTime) { - this.branchName = branchName; - this.createdFromTag = null; - this.createdFromSnapshot = createdFromSnapshot; - this.createTime = createTime; - } - - public TableBranch(String branchName, long createTime) { - this.branchName = branchName; - this.createdFromTag = null; - this.createdFromSnapshot = null; - this.createTime = createTime; - } - - public TableBranch( - String branchName, String createdFromTag, Long createdFromSnapshot, long createTime) { - this.branchName = branchName; - this.createdFromTag = createdFromTag; - this.createdFromSnapshot = createdFromSnapshot; - this.createTime = createTime; - } - - public String getBranchName() { - return branchName; - } - - public String getCreatedFromTag() { - return createdFromTag; - } - - public Long getCreatedFromSnapshot() { - return createdFromSnapshot; - } - - public long getCreateTime() { - return createTime; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index 2e3f5e90a361c..ec1608d959906 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -18,7 +18,7 @@ package org.apache.paimon.table.system; -import org.apache.paimon.branch.TableBranch; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -28,6 +28,8 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.ReadonlyTable; @@ -42,21 +44,33 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.SerializationUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.SortedMap; +import java.util.stream.Collectors; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; +import static org.apache.paimon.utils.BranchManager.BRANCH_PREFIX; +import static org.apache.paimon.utils.BranchManager.branchPath; +import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** A {@link Table} for showing branches of table. */ public class BranchesTable implements ReadonlyTable { @@ -189,25 +203,78 @@ public RecordReader createReader(Split split) { if (!(split instanceof BranchesSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } + Path location = ((BranchesSplit) split).location; FileStoreTable table = FileStoreTableFactory.create(fileIO, location); - List branches = table.branchManager().branches(); - Iterator rows = Iterators.transform(branches.iterator(), this::toRow); + Iterator rows; + try { + rows = branches(table).iterator(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + if (projection != null) { rows = Iterators.transform( rows, row -> ProjectedRow.from(projection).replaceRow(row)); } + return new IteratorRecordReader<>(rows); } - private InternalRow toRow(TableBranch branch) { - return GenericRow.of( - BinaryString.fromString(branch.getBranchName()), - BinaryString.fromString(branch.getCreatedFromTag()), - branch.getCreatedFromSnapshot(), - Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(branch.getCreateTime()))); + private List branches(FileStoreTable table) throws IOException { + BranchManager branchManager = table.branchManager(); + SchemaManager schemaManager = new SchemaManager(fileIO, table.location()); + + List> paths = + listVersionedDirectories(fileIO, branchManager.branchDirectory(), BRANCH_PREFIX) + .map(status -> Pair.of(status.getPath(), status.getModificationTime())) + .collect(Collectors.toList()); + List result = new ArrayList<>(); + + for (Pair path : paths) { + String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length()); + String basedTag = null; + Long basedSnapshotId = null; + long creationTime = path.getRight(); + + Optional tableSchema = + schemaManager.copyWithBranch(branchName).latest(); + if (tableSchema.isPresent()) { + FileStoreTable branchTable = + FileStoreTableFactory.create( + fileIO, new Path(branchPath(table.location(), branchName))); + SortedMap> snapshotTags = + branchTable.tagManager().tags(); + Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); + if (snapshotTags.isEmpty()) { + // create based on snapshotId + basedSnapshotId = earliestSnapshotId; + } else { + Snapshot snapshot = snapshotTags.firstKey(); + if (Objects.equals(earliestSnapshotId, snapshot.id())) { + // create based on tag + List tags = snapshotTags.get(snapshot); + checkArgument(tags.size() == 1); + basedTag = tags.get(0); + basedSnapshotId = snapshot.id(); + } else { + // create based on snapshotId + basedSnapshotId = earliestSnapshotId; + } + } + } + + result.add( + GenericRow.of( + BinaryString.fromString(branchName), + BinaryString.fromString(basedTag), + basedSnapshotId, + Timestamp.fromLocalDateTime( + DateTimeUtils.toLocalDateTime(creationTime)))); + } + + return result; } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 6ff8d4c2a2e09..af598587c79fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -19,25 +19,17 @@ package org.apache.paimon.utils; import org.apache.paimon.Snapshot; -import org.apache.paimon.branch.TableBranch; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; import java.util.List; -import java.util.Optional; -import java.util.PriorityQueue; -import java.util.SortedMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -315,51 +307,11 @@ public boolean branchExists(String branchName) { } /** Get all branches for the table. */ - public List branches() { + public List branches() { try { - List> paths = - listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX) - .map(status -> Pair.of(status.getPath(), status.getModificationTime())) - .collect(Collectors.toList()); - PriorityQueue pq = - new PriorityQueue<>(Comparator.comparingLong(TableBranch::getCreateTime)); - for (Pair path : paths) { - String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length()); - Optional tableSchema = - schemaManager.copyWithBranch(branchName).latest(); - if (!tableSchema.isPresent()) { - // Support empty branch. - pq.add(new TableBranch(branchName, path.getValue())); - continue; - } - FileStoreTable branchTable = - FileStoreTableFactory.create( - fileIO, new Path(branchPath(tablePath, branchName))); - SortedMap> snapshotTags = branchTable.tagManager().tags(); - Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); - if (snapshotTags.isEmpty()) { - // Create based on snapshotId. - pq.add(new TableBranch(branchName, earliestSnapshotId, path.getValue())); - } else { - Snapshot snapshot = snapshotTags.firstKey(); - if (earliestSnapshotId == snapshot.id()) { - List tags = snapshotTags.get(snapshot); - checkArgument(tags.size() == 1); - pq.add( - new TableBranch( - branchName, tags.get(0), snapshot.id(), path.getValue())); - } else { - // Create based on snapshotId. - pq.add(new TableBranch(branchName, earliestSnapshotId, path.getValue())); - } - } - } - - List branches = new ArrayList<>(pq.size()); - while (!pq.isEmpty()) { - branches.add(pq.poll()); - } - return branches; + return listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX) + .map(status -> status.getPath().getName().substring(BRANCH_PREFIX.length())) + .collect(Collectors.toList()); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index d005dc4e50a89..33aca03b862c9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -18,8 +18,6 @@ package org.apache.paimon.flink; -import org.apache.paimon.branch.TableBranch; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.SnapshotManager; @@ -137,8 +135,7 @@ public void testCreateBranchFromTag() throws Exception { } @Test - public void testCreateBranchFromSnapshot() throws Catalog.TableNotExistException { - + public void testCreateBranchFromSnapshot() throws Exception { sql( "CREATE TABLE T (" + " pt INT" @@ -158,12 +155,8 @@ public void testCreateBranchFromSnapshot() throws Catalog.TableNotExistException sql("CALL sys.create_branch('default.T', 'test', 1)"); sql("CALL sys.create_branch('default.T', 'test2', 2)"); - FileStoreTable table = paimonTable("T"); - - assertThat( - table.branchManager().branches().stream() - .map(TableBranch::getCreatedFromSnapshot)) - .containsExactlyInAnyOrder(1L, 2L); + assertThat(collectResult("SELECT created_from_snapshot FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[1]", "+I[2]"); assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1)) .isEqualTo(true); @@ -223,25 +216,17 @@ public void testDeleteBranchTable() throws Exception { sql("CALL sys.create_branch('default.T', 'test', 1)"); sql("CALL sys.create_branch('default.T', 'test2', 2)"); - FileStoreTable table = paimonTable("T"); - - assertThat( - table.branchManager().branches().stream() - .map(TableBranch::getCreatedFromSnapshot)) - .containsExactlyInAnyOrder(1L, 2L); - - assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName)) - .containsExactlyInAnyOrder("test", "test2"); + assertThat(collectResult("SELECT branch_name, created_from_snapshot FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[test, 1]", "+I[test2, 2]"); sql("CALL sys.delete_branch('default.T', 'test')"); - assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName)) - .containsExactlyInAnyOrder("test2"); + assertThat(collectResult("SELECT branch_name, created_from_snapshot FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[test2, 2]"); } @Test - public void testBranchManagerGetBranchSnapshotsList() - throws Catalog.TableNotExistException, IOException { + public void testBranchManagerGetBranchSnapshotsList() throws Exception { sql( "CREATE TABLE T (" + " pt INT" @@ -263,10 +248,8 @@ public void testBranchManagerGetBranchSnapshotsList() sql("CALL sys.create_branch('default.T', 'test2', 2)"); sql("CALL sys.create_branch('default.T', 'test3', 3)"); - assertThat( - table.branchManager().branches().stream() - .map(TableBranch::getCreatedFromSnapshot)) - .containsExactlyInAnyOrder(1L, 2L, 3L); + assertThat(collectResult("SELECT created_from_snapshot FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[1]", "+I[2]", "+I[3]"); } @Test @@ -370,7 +353,7 @@ public void testFallbackBranchBatchRead() throws Exception { } @Test - public void testDifferentRowTypes() throws Exception { + public void testDifferentRowTypes() { sql( "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); sql("CALL sys.create_branch('default.t', 'pk')"); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 7156fbba233c0..c5473446bdf96 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -40,6 +40,8 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.TableType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -168,11 +170,13 @@ public Optional lockContext() { @Override public Optional metastoreClientFactory(Identifier identifier) { + Identifier tableIdentifier = + new Identifier(identifier.getDatabaseName(), identifier.getTableName()); try { return Optional.of( new HiveMetastoreClient.Factory( - identifier, - getDataTableSchema(identifier), + tableIdentifier, + getDataTableSchema(tableIdentifier), hiveConf, clientClassName, options)); @@ -284,18 +288,33 @@ public Map loadDatabasePropertiesImpl(String name) } } + private Map convertToProperties(Database database) { + Map properties = new HashMap<>(database.getParameters()); + if (database.getLocationUri() != null) { + properties.put(DB_LOCATION_PROP, database.getLocationUri()); + } + if (database.getDescription() != null) { + properties.put(COMMENT_PROP, database.getDescription()); + } + return properties; + } + @Override public void dropPartition(Identifier identifier, Map partitionSpec) throws TableNotExistException { TableSchema tableSchema = getDataTableSchema(identifier); if (!tableSchema.partitionKeys().isEmpty() - && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { - + && new CoreOptions(tableSchema.options()).partitionedTableInMetastore() + && !partitionExistsInOtherBranches(identifier, partitionSpec)) { try { // Do not close client, it is for HiveCatalog @SuppressWarnings("resource") HiveMetastoreClient metastoreClient = - new HiveMetastoreClient(identifier, tableSchema, clients); + new HiveMetastoreClient( + new Identifier( + identifier.getDatabaseName(), identifier.getTableName()), + tableSchema, + clients); metastoreClient.deletePartition(new LinkedHashMap<>(partitionSpec)); } catch (Exception e) { throw new RuntimeException(e); @@ -304,15 +323,36 @@ && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { super.dropPartition(identifier, partitionSpec); } - private Map convertToProperties(Database database) { - Map properties = new HashMap<>(database.getParameters()); - if (database.getLocationUri() != null) { - properties.put(DB_LOCATION_PROP, database.getLocationUri()); - } - if (database.getDescription() != null) { - properties.put(COMMENT_PROP, database.getDescription()); + private boolean partitionExistsInOtherBranches( + Identifier identifier, Map partitionSpec) + throws TableNotExistException { + FileStoreTable mainTable = + (FileStoreTable) + getTable( + new Identifier( + identifier.getDatabaseName(), identifier.getTableName())); + List branchNames = new ArrayList<>(mainTable.branchManager().branches()); + branchNames.add(DEFAULT_MAIN_BRANCH); + + for (String branchName : branchNames) { + if (branchName.equals(identifier.getBranchNameOrDefault())) { + continue; + } + + Optional branchSchema = + tableSchemaInFileSystem(mainTable.location(), branchName); + if (!branchSchema.isPresent()) { + continue; + } + + FileStoreTable table = + FileStoreTableFactory.create( + mainTable.fileIO(), mainTable.location(), branchSchema.get()); + if (!table.newScan().withPartitionFilter(partitionSpec).listPartitions().isEmpty()) { + return true; + } } - return properties; + return false; } @Override diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index e0e19b9ba7b19..3dd8cb2518172 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -485,11 +485,58 @@ public void testFlinkCreateBranchAndHiveRead() throws Exception { .containsExactlyInAnyOrder("3\tx3", "3\tx33", "4\tx4"); } + @Test + public void testDropPartitionFromBranch() throws Exception { + testDropPartitionFromBranchImpl(); + } + + @Test + @LocationInProperties + public void testDropPartitionFromBranchLocationInProperties() throws Exception { + testDropPartitionFromBranchImpl(); + } + + private void testDropPartitionFromBranchImpl() throws Exception { + tEnv.executeSql( + "CREATE TABLE t ( pt INT, v STRING ) PARTITIONED BY (pt) " + + "WITH ( 'file.format' = 'avro', 'metastore.partitioned-table' = 'true' )") + .await(); + tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await(); + + tEnv.executeSql("INSERT INTO t VALUES (1, 'apple'), (2, 'banana'), (4, 'mango')").await(); + tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'cat'), (3, 'dog'), (4, 'lion')") + .await(); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=3", "pt=4"); + + tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 1)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=3", "pt=4"); + + tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 3)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=4"); + + tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 1)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=2", "pt=4"); + + tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 4)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=2", "pt=4"); + + tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 4)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")).containsExactlyInAnyOrder("pt=2"); + + tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 2)"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")).isEmpty(); + } + @Test public void testFallbackBranchRead() throws Exception { tEnv.executeSql( "CREATE TABLE t ( pt INT, a INT, b STRING ) PARTITIONED BY (pt) " - + "WITH ( 'file.format' = 'avro' )") + + "WITH ( 'file.format' = 'avro', 'metastore.partitioned-table' = 'true' )") .await(); tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await(); tEnv.executeSql( @@ -510,9 +557,11 @@ public void testFallbackBranchRead() throws Exception { Row.of(1, 20, "banana"), Row.of(2, 10, "lion"), Row.of(2, 20, "wolf")); - assertThat(hiveShell.executeQuery("SELECT * FROM t")) + assertThat(hiveShell.executeQuery("SELECT pt, a, b FROM t")) .containsExactlyInAnyOrder( "1\t10\tapple", "1\t20\tbanana", "2\t10\tlion", "2\t20\twolf"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("pt=1", "pt=2"); } /**