diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index db6909295556..3fdefe6cacaa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -417,7 +417,7 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist lockFactory().orElse(null), lockContext().orElse(null), identifier), - metastoreClientFactory(identifier, tableMeta.schema).orElse(null))); + metastoreClientFactory(identifier).orElse(null))); CoreOptions options = table.coreOptions(); if (options.type() == TableType.OBJECT_TABLE) { String objectLocation = options.objectLocation(); @@ -485,8 +485,7 @@ protected abstract TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException; /** Get metastore client factory for the table specified by {@code identifier}. */ - public Optional metastoreClientFactory( - Identifier identifier, TableSchema schema) { + public Optional metastoreClientFactory(Identifier identifier) { return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java index 599f88e512c0..26fb9ed48db2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.shade.guava30.com.google.common.cache.Cache; import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder; @@ -48,9 +49,12 @@ public class AddPartitionCommitCallback implements CommitCallback { .build(); private final MetastoreClient client; + private final InternalRowPartitionComputer partitionComputer; - public AddPartitionCommitCallback(MetastoreClient client) { + public AddPartitionCommitCallback( + MetastoreClient client, InternalRowPartitionComputer partitionComputer) { this.client = client; + this.partitionComputer = partitionComputer; } @Override @@ -81,7 +85,10 @@ private void addPartitions(Set partitions) { } } if (!newPartitions.isEmpty()) { - client.addPartitions(newPartitions); + client.addPartitions( + newPartitions.stream() + .map(partitionComputer::generatePartValues) + .collect(Collectors.toList())); newPartitions.forEach(partition -> cache.put(partition, true)); } } catch (Exception e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java index 70efe68e83f3..31bb521e88d1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java @@ -49,7 +49,7 @@ public void notifyDeletion(String tagName) { LinkedHashMap partitionSpec = new LinkedHashMap<>(); partitionSpec.put(partitionField, tagName); try { - client.deletePartition(partitionSpec); + client.dropPartition(partitionSpec); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java index 75f7af5abbdc..ccf5f3853873 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java @@ -18,12 +18,9 @@ package org.apache.paimon.metastore; -import org.apache.paimon.data.BinaryRow; - import java.io.Serializable; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; /** * A metastore client related to a table. All methods of this interface operate on the same specific @@ -31,32 +28,18 @@ */ public interface MetastoreClient extends AutoCloseable { - void addPartition(BinaryRow partition) throws Exception; - - default void addPartitions(List partitions) throws Exception { - for (BinaryRow partition : partitions) { - addPartition(partition); - } - } + void addPartition(LinkedHashMap partition) throws Exception; - void addPartition(LinkedHashMap partitionSpec) throws Exception; + void addPartitions(List> partitions) throws Exception; - default void addPartitionsSpec(List> partitionSpecsList) - throws Exception { - for (LinkedHashMap partitionSpecs : partitionSpecsList) { - addPartition(partitionSpecs); - } - } + void dropPartition(LinkedHashMap partition) throws Exception; - void deletePartition(LinkedHashMap partitionSpec) throws Exception; + void dropPartitions(List> partitions) throws Exception; - void markDone(LinkedHashMap partitionSpec) throws Exception; + void markPartitionDone(LinkedHashMap partition) throws Exception; default void alterPartition( - LinkedHashMap partitionSpec, - Map parameters, - long modifyTime, - boolean ignoreIfNotExist) + LinkedHashMap partition, PartitionStats partitionStats) throws Exception { throw new UnsupportedOperationException(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java new file mode 100644 index 000000000000..eacc400f52c3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metastore; + +/** Statistic for partition. */ +public interface PartitionStats { + + long numFiles(); + + long totalSize(); + + long numRows(); + + long lastUpdateTimeMillis(); + + static PartitionStats create( + long numFiles, long totalSize, long numRows, long lastUpdateTimeMillis) { + return new PartitionStats() { + + @Override + public long numFiles() { + return numFiles; + } + + @Override + public long totalSize() { + return totalSize; + } + + @Override + public long numRows() { + return numRows; + } + + @Override + public long lastUpdateTimeMillis() { + return lastUpdateTimeMillis; + } + + @Override + public String toString() { + return String.format( + "numFiles: %s, totalSize: %s, numRows: %s, lastUpdateTimeMillis: %s", + numFiles, totalSize, numRows, lastUpdateTimeMillis); + } + }; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index d432a37dfd9c..68ef8a123746 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -167,15 +167,13 @@ private List> doExpire( } private void deleteMetastorePartitions(List> partitions) { - if (metastoreClient != null) { - partitions.forEach( - partition -> { - try { - metastoreClient.deletePartition(new LinkedHashMap<>(partition)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + if (metastoreClient != null && partitions.size() > 0) { + try { + metastoreClient.dropPartitions( + partitions.stream().map(LinkedHashMap::new).collect(Collectors.toList())); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java index a5ebe34051c1..8cc1c93ba937 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java @@ -39,7 +39,7 @@ public MarkPartitionDoneEventAction(MetastoreClient metastoreClient) { public void markDone(String partition) throws Exception { LinkedHashMap partitionSpec = extractPartitionSpecFromPath(new Path(partition)); - metastoreClient.markDone(partitionSpec); + metastoreClient.markPartitionDone(partitionSpec); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 57966d24ce47..7e008698c4fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -61,6 +61,7 @@ import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.tag.TagPreview; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SimpleFileReader; @@ -469,7 +470,15 @@ protected List createCommitCallbacks(String commitUser) { if (options.partitionedTableInMetastore() && metastoreClientFactory != null && !tableSchema.partitionKeys().isEmpty()) { - callbacks.add(new AddPartitionCommitCallback(metastoreClientFactory.create())); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + options.partitionDefaultName(), + tableSchema.logicalPartitionType(), + tableSchema.partitionKeys().toArray(new String[0]), + options.legacyPartitionName()); + callbacks.add( + new AddPartitionCommitCallback( + metastoreClientFactory.create(), partitionComputer)); } TagPreview tagPreview = TagPreview.create(options); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java index 931bac59c756..893fe1bf5762 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java @@ -27,8 +27,12 @@ import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.CommitMessage; @@ -54,6 +58,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -61,9 +66,11 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL; import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME; import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER; +import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.CoreOptions.WRITE_ONLY; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -75,12 +82,54 @@ public class PartitionExpireTest { private Path path; private FileStoreTable table; + private List> deletedPartitions; @BeforeEach public void beforeEach() { path = new Path(tempDir.toUri()); } + private void newTable() { + LocalFileIO fileIO = LocalFileIO.create(); + Options options = new Options(); + options.set(PATH, path.toString()); + Path tablePath = CoreOptions.path(options); + String branchName = CoreOptions.branch(options.toMap()); + TableSchema tableSchema = new SchemaManager(fileIO, tablePath, branchName).latest().get(); + deletedPartitions = new ArrayList<>(); + MetastoreClient.Factory factory = + () -> + new MetastoreClient() { + @Override + public void addPartition(LinkedHashMap partition) {} + + @Override + public void addPartitions( + List> partitions) {} + + @Override + public void dropPartition(LinkedHashMap partition) { + deletedPartitions.add(partition); + } + + @Override + public void dropPartitions( + List> partitions) { + deletedPartitions.addAll(partitions); + } + + @Override + public void markPartitionDone(LinkedHashMap partition) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() {} + }; + CatalogEnvironment env = new CatalogEnvironment(null, null, Lock.emptyFactory(), factory); + table = FileStoreTableFactory.create(fileIO, path, tableSchema, env); + } + @Test public void testNonPartitionedTable() { SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), path); @@ -108,7 +157,7 @@ public void testIllegalPartition() throws Exception { emptyList(), Collections.emptyMap(), "")); - table = FileStoreTableFactory.create(LocalFileIO.create(), path); + newTable(); write("20230101", "11"); write("abcd", "12"); write("20230101", "12"); @@ -129,9 +178,9 @@ public void test() throws Exception { RowType.of(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE).getFields(), singletonList("f0"), emptyList(), - Collections.emptyMap(), + Collections.singletonMap(METASTORE_PARTITIONED_TABLE.key(), "true"), "")); - table = FileStoreTableFactory.create(LocalFileIO.create(), path); + newTable(); write("20230101", "11"); write("20230101", "12"); @@ -156,6 +205,12 @@ public void test() throws Exception { expire.expire(date(8), Long.MAX_VALUE); assertThat(read()).isEmpty(); + + assertThat(deletedPartitions) + .containsExactlyInAnyOrder( + new LinkedHashMap<>(Collections.singletonMap("f0", "20230101")), + new LinkedHashMap<>(Collections.singletonMap("f0", "20230103")), + new LinkedHashMap<>(Collections.singletonMap("f0", "20230105"))); } @Test @@ -169,7 +224,7 @@ public void testFilterCommittedAfterExpiring() throws Exception { Collections.emptyMap(), "")); - table = FileStoreTableFactory.create(LocalFileIO.create(), path); + newTable(); // disable compaction and snapshot expiration table = table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true")); String commitUser = UUID.randomUUID().toString(); @@ -243,7 +298,7 @@ public void testDeleteExpiredPartition() throws Exception { emptyList(), Collections.emptyMap(), "")); - table = FileStoreTableFactory.create(LocalFileIO.create(), path); + newTable(); table = newExpireTable(); List commitMessages = write("20230101", "11"); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java index b75889d567ee..ced37726f1eb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.metastore.PartitionStats; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; @@ -35,15 +36,9 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; -import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; -import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; -import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; /** Action to report the table statistic from the latest snapshot to HMS. */ @@ -51,8 +46,6 @@ public class PartitionStatisticsReporter implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(PartitionStatisticsReporter.class); - private static final String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime"; - private final MetastoreClient metastoreClient; private final SnapshotReader snapshotReader; private final SnapshotManager snapshotManager; @@ -64,7 +57,7 @@ public PartitionStatisticsReporter(FileStoreTable table, MetastoreClient client) this.snapshotManager = table.snapshotManager(); } - public void report(String partition, long modifyTime) throws Exception { + public void report(String partition, long modifyTimeMillis) throws Exception { Snapshot snapshot = snapshotManager.latestSnapshot(); if (snapshot != null) { LinkedHashMap partitionSpec = @@ -88,19 +81,11 @@ public void report(String partition, long modifyTime) throws Exception { totalSize += fileMeta.fileSize(); } } - Map statistic = new HashMap<>(); - statistic.put(NUM_FILES_PROP, String.valueOf(fileCount)); - statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize)); - statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount)); - - String modifyTimeSeconds = String.valueOf(modifyTime / 1000); - statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); - - // just for being compatible with hive metastore - statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds); - LOG.info("alter partition {} with statistic {}.", partitionSpec, statistic); - metastoreClient.alterPartition(partitionSpec, statistic, modifyTime, true); + PartitionStats partitionStats = + PartitionStats.create(fileCount, totalSize, rowCount, modifyTimeMillis); + LOG.info("alter partition {} with statistic {}.", partitionSpec, partitionStats); + metastoreClient.alterPartition(partitionSpec, partitionStats); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java index fca5dcf0ed69..3bdbdd20ad3e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java @@ -18,15 +18,15 @@ package org.apache.paimon.flink.sink.partition; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.metastore.PartitionStats; import org.apache.paimon.partition.actions.AddDonePartitionAction; import org.junit.jupiter.api.Test; import java.util.HashSet; import java.util.LinkedHashMap; -import java.util.Map; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,39 +41,41 @@ public void test() throws Exception { Set donePartitions = new HashSet<>(); MetastoreClient metastoreClient = new MetastoreClient() { + @Override - public void addPartition(BinaryRow partition) { - throw new UnsupportedOperationException(); + public void addPartition(LinkedHashMap partition) { + donePartitions.add(generatePartitionPath(partition)); + } + + @Override + public void addPartitions(List> partitions) { + partitions.forEach(this::addPartition); } @Override - public void addPartition(LinkedHashMap partitionSpec) { - donePartitions.add(generatePartitionPath(partitionSpec)); + public void dropPartition(LinkedHashMap partition) { + throw new UnsupportedOperationException(); } @Override - public void deletePartition(LinkedHashMap partitionSpec) { + public void dropPartitions(List> partitions) { throw new UnsupportedOperationException(); } @Override - public void markDone(LinkedHashMap partitionSpec) - throws Exception { + public void markPartitionDone(LinkedHashMap partitions) { throw new UnsupportedOperationException(); } @Override public void alterPartition( LinkedHashMap partitionSpec, - Map parameters, - long modifyTime, - boolean ignoreIfNotExist) - throws Exception { + PartitionStats partitionStats) { throw new UnsupportedOperationException(); } @Override - public void close() throws Exception { + public void close() { closed.set(true); } }; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java index 142a0c32f781..0f761efa2278 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java @@ -18,12 +18,12 @@ package org.apache.paimon.flink.sink.partition; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.metastore.PartitionStats; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; @@ -35,7 +35,6 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.PartitionPathUtils; -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; @@ -86,47 +85,47 @@ public void testReportAction() throws Exception { BatchTableCommit committer = table.newBatchWriteBuilder().newCommit(); committer.commit(messages); AtomicBoolean closed = new AtomicBoolean(false); - Map> partitionParams = Maps.newHashMap(); + Map partitionParams = Maps.newHashMap(); MetastoreClient client = new MetastoreClient() { + + @Override + public void addPartition(LinkedHashMap partition) { + throw new UnsupportedOperationException(); + } + @Override - public void addPartition(BinaryRow partition) throws Exception { + public void addPartitions(List> partitions) { throw new UnsupportedOperationException(); } @Override - public void addPartition(LinkedHashMap partitionSpec) - throws Exception { + public void dropPartition(LinkedHashMap partition) { throw new UnsupportedOperationException(); } @Override - public void deletePartition(LinkedHashMap partitionSpec) - throws Exception { + public void dropPartitions(List> partitions) { throw new UnsupportedOperationException(); } @Override - public void markDone(LinkedHashMap partitionSpec) - throws Exception { + public void markPartitionDone(LinkedHashMap partitionSpec) { throw new UnsupportedOperationException(); } @Override public void alterPartition( LinkedHashMap partitionSpec, - Map parameters, - long modifyTime, - boolean ignoreIfNotExist) - throws Exception { + PartitionStats partitionStats) { partitionParams.put( PartitionPathUtils.generatePartitionPath(partitionSpec), - parameters); + partitionStats); } @Override - public void close() throws Exception { + public void close() { closed.set(true); } }; @@ -135,19 +134,9 @@ public void close() throws Exception { long time = 1729598544974L; action.report("c1=a/", time); Assertions.assertThat(partitionParams).containsKey("c1=a/"); - Assertions.assertThat(partitionParams.get("c1=a/")) + Assertions.assertThat(partitionParams.get("c1=a/").toString()) .isEqualTo( - ImmutableMap.of( - "numFiles", - "1", - "totalSize", - "591", - "numRows", - "1", - "lastUpdateTime", - String.valueOf(time / 1000), - "transient_lastDdlTime", - String.valueOf(time / 1000))); + "numFiles: 1, totalSize: 591, numRows: 1, lastUpdateTimeMillis: 1729598544974"); action.close(); Assertions.assertThat(closed).isTrue(); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 5744ac894d12..f5ae504850ca 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -27,7 +27,6 @@ import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.hive.pool.CachedClientPool; @@ -48,6 +47,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.view.View; @@ -191,13 +191,12 @@ public Optional lockContext() { } @Override - public Optional metastoreClientFactory( - Identifier identifier, TableSchema schema) { + public Optional metastoreClientFactory(Identifier identifier) { Identifier tableIdentifier = new Identifier(identifier.getDatabaseName(), identifier.getTableName()); return Optional.of( new HiveMetastoreClient.Factory( - tableIdentifier, schema, hiveConf, clientClassName, options)); + tableIdentifier, hiveConf, clientClassName, options)); } @Override @@ -350,9 +349,8 @@ && new CoreOptions(tableSchema.options()).partitionedTableInMetastore() new HiveMetastoreClient( new Identifier( identifier.getDatabaseName(), identifier.getTableName()), - tableSchema, clients); - metastoreClient.deletePartition(new LinkedHashMap<>(partitionSpec)); + metastoreClient.dropPartition(new LinkedHashMap<>(partitionSpec)); } catch (Exception e) { throw new RuntimeException(e); } @@ -610,7 +608,7 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier) lockFactory().orElse(null), lockContext().orElse(null), identifier), - metastoreClientFactory(identifier, tableMeta.schema()).orElse(null))); + metastoreClientFactory(identifier).orElse(null))); } catch (TableNotExistException ignore) { } @@ -968,14 +966,19 @@ public void repairTable(Identifier identifier) throws TableNotExistException { // repair partitions if (!tableSchema.partitionKeys().isEmpty() && !newTable.getPartitionKeys().isEmpty()) { // Do not close client, it is for HiveCatalog + CoreOptions options = new CoreOptions(tableSchema.options()); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + options.partitionDefaultName(), + tableSchema.logicalPartitionType(), + tableSchema.partitionKeys().toArray(new String[0]), + options.legacyPartitionName()); @SuppressWarnings("resource") - HiveMetastoreClient metastoreClient = - new HiveMetastoreClient(identifier, tableSchema, clients); - List partitions = - getTable(identifier).newReadBuilder().newScan().listPartitions(); - for (BinaryRow partition : partitions) { - metastoreClient.addPartition(partition); - } + HiveMetastoreClient metastoreClient = new HiveMetastoreClient(identifier, clients); + metastoreClient.addPartitions( + getTable(identifier).newReadBuilder().newScan().listPartitions().stream() + .map(partitionComputer::generatePartValues) + .collect(Collectors.toList())); } } catch (Exception e) { throw new RuntimeException(e); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 885fa463e5a7..f7be538c259d 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -18,15 +18,12 @@ package org.apache.paimon.hive; -import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.hive.pool.CachedClientPool; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.metastore.PartitionStats; import org.apache.paimon.options.Options; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.PartitionPathUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -39,34 +36,30 @@ import org.apache.thrift.TException; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; +import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; +import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; +import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; + /** {@link MetastoreClient} for Hive tables. */ public class HiveMetastoreClient implements MetastoreClient { + private static final String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime"; + private final Identifier identifier; - private final InternalRowPartitionComputer partitionComputer; private final ClientPool clients; private final StorageDescriptor sd; - HiveMetastoreClient( - Identifier identifier, - TableSchema schema, - ClientPool clients) + HiveMetastoreClient(Identifier identifier, ClientPool clients) throws TException, InterruptedException { this.identifier = identifier; - CoreOptions options = new CoreOptions(schema.options()); - this.partitionComputer = - new InternalRowPartitionComputer( - options.partitionDefaultName(), - schema.logicalPartitionType(), - schema.partitionKeys().toArray(new String[0]), - options.legacyPartitionName()); - this.clients = clients; this.sd = this.clients @@ -79,22 +72,9 @@ public class HiveMetastoreClient implements MetastoreClient { } @Override - public void addPartition(BinaryRow partition) throws Exception { - addPartition(partitionComputer.generatePartValues(partition)); - } - - @Override - public void addPartitions(List partitions) throws Exception { - addPartitionsSpec( - partitions.stream() - .map(partitionComputer::generatePartValues) - .collect(Collectors.toList())); - } - - @Override - public void addPartition(LinkedHashMap partitionSpec) throws Exception { + public void addPartition(LinkedHashMap partition) throws Exception { Partition hivePartition = - toHivePartition(partitionSpec, (int) (System.currentTimeMillis() / 1000)); + toHivePartition(partition, (int) (System.currentTimeMillis() / 1000)); clients.execute( client -> { try { @@ -105,11 +85,10 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc } @Override - public void addPartitionsSpec(List> partitionSpecsList) - throws Exception { + public void addPartitions(List> partitions) throws Exception { int currentTime = (int) (System.currentTimeMillis() / 1000); List hivePartitions = - partitionSpecsList.stream() + partitions.stream() .map(partitionSpec -> toHivePartition(partitionSpec, currentTime)) .collect(Collectors.toList()); clients.execute(client -> client.add_partitions(hivePartitions, true, false)); @@ -117,43 +96,45 @@ public void addPartitionsSpec(List> partitionSpecs @Override public void alterPartition( - LinkedHashMap partitionSpec, - Map parameters, - long modifyTime, - boolean ignoreIfNotExist) + LinkedHashMap partition, PartitionStats partitionStats) throws Exception { - List partitionValues = new ArrayList<>(partitionSpec.values()); - int currentTime = (int) (modifyTime / 1000); - Partition hivePartition; + List partitionValues = new ArrayList<>(partition.values()); + + Map statistic = new HashMap<>(); + statistic.put(NUM_FILES_PROP, String.valueOf(partitionStats.numFiles())); + statistic.put(TOTAL_SIZE_PROP, String.valueOf(partitionStats.totalSize())); + statistic.put(NUM_ROWS_PROP, String.valueOf(partitionStats.numRows())); + + String modifyTimeSeconds = String.valueOf(partitionStats.lastUpdateTimeMillis() / 1000); + statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); + + // just for being compatible with hive metastore + statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds); + try { - hivePartition = + Partition hivePartition = clients.run( client -> client.getPartition( identifier.getDatabaseName(), identifier.getObjectName(), partitionValues)); + hivePartition.setValues(partitionValues); + hivePartition.setLastAccessTime((int) (partitionStats.lastUpdateTimeMillis() / 1000)); + hivePartition.getParameters().putAll(statistic); + clients.execute( + client -> + client.alter_partition( + identifier.getDatabaseName(), + identifier.getObjectName(), + hivePartition)); } catch (NoSuchObjectException e) { - if (ignoreIfNotExist) { - return; - } else { - throw e; - } + // do nothing if the partition not exists } - - hivePartition.setValues(partitionValues); - hivePartition.setLastAccessTime(currentTime); - hivePartition.getParameters().putAll(parameters); - clients.execute( - client -> - client.alter_partition( - identifier.getDatabaseName(), - identifier.getObjectName(), - hivePartition)); } @Override - public void deletePartition(LinkedHashMap partitionSpec) throws Exception { + public void dropPartition(LinkedHashMap partitionSpec) throws Exception { List partitionValues = new ArrayList<>(partitionSpec.values()); try { clients.execute( @@ -169,7 +150,14 @@ public void deletePartition(LinkedHashMap partitionSpec) throws } @Override - public void markDone(LinkedHashMap partitionSpec) throws Exception { + public void dropPartitions(List> partitions) throws Exception { + for (LinkedHashMap partition : partitions) { + dropPartition(partition); + } + } + + @Override + public void markPartitionDone(LinkedHashMap partitionSpec) throws Exception { try { clients.execute( client -> @@ -213,19 +201,13 @@ public static class Factory implements MetastoreClient.Factory { private static final long serialVersionUID = 1L; private final Identifier identifier; - private final TableSchema schema; private final SerializableHiveConf hiveConf; private final String clientClassName; private final Options options; public Factory( - Identifier identifier, - TableSchema schema, - HiveConf hiveConf, - String clientClassName, - Options options) { + Identifier identifier, HiveConf hiveConf, String clientClassName, Options options) { this.identifier = identifier; - this.schema = schema; this.hiveConf = new SerializableHiveConf(hiveConf); this.clientClassName = clientClassName; this.options = options; @@ -236,7 +218,7 @@ public MetastoreClient create() { HiveConf conf = hiveConf.conf(); try { return new HiveMetastoreClient( - identifier, schema, new CachedClientPool(conf, options, clientClassName)); + identifier, new CachedClientPool(conf, options, clientClassName)); } catch (TException e) { throw new RuntimeException( "Can not get table " + identifier + " info from metastore.", e); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 840f1341a69d..c385f243ae66 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -78,7 +78,7 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { // sync to metastore with delete partitions if (clientFactory != null && fileStoreTable.coreOptions().partitionedTableInMetastore()) { metastoreClient = clientFactory.create() - toPaimonPartitions(rows).foreach(metastoreClient.deletePartition) + metastoreClient.dropPartitions(toPaimonPartitions(rows).toSeq.asJava) } } finally { commit.close()