diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index d7447c37dd79..14457a3698de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -24,11 +24,11 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -59,6 +59,7 @@ import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.CoreOptions.createCommitUser; +import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; @@ -193,9 +194,8 @@ public void dropPartition(Identifier identifier, Map partitionSp } @Override - public List listPartitions(Identifier identifier) - throws TableNotExistException { - return getTable(identifier).newReadBuilder().newScan().listPartitionEntries(); + public List listPartitions(Identifier identifier) throws TableNotExistException { + return listPartitionsFromFileSystem(getTable(identifier)); } protected abstract void createDatabaseImpl(String name, Map properties); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 34e53f32f267..4796276972b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -19,9 +19,9 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -61,7 +61,7 @@ public class CachingCatalog extends DelegateCatalog { @Nullable protected final SegmentsCache manifestCache; // partition cache will affect data latency - @Nullable protected final Cache> partitionCache; + @Nullable protected final Cache> partitionCache; public CachingCatalog(Catalog wrapped) { this( @@ -130,7 +130,7 @@ public CachingCatalog( .executor(Runnable::run) .expireAfterAccess(expirationInterval) .weigher( - (Weigher>) + (Weigher>) (identifier, v) -> v.size()) .maximumWeight(cachedPartitionMaxNum) .ticker(ticker) @@ -281,13 +281,12 @@ private void putTableCache(Identifier identifier, Table table) { } @Override - public List listPartitions(Identifier identifier) - throws TableNotExistException { + public List listPartitions(Identifier identifier) throws TableNotExistException { if (partitionCache == null) { return wrapped.listPartitions(identifier); } - List result = partitionCache.getIfPresent(identifier); + List result = partitionCache.getIfPresent(identifier); if (result == null) { result = wrapped.listPartitions(identifier); partitionCache.put(identifier, result); @@ -321,7 +320,7 @@ public void invalidateTable(Identifier identifier) { */ public void refreshPartitions(Identifier identifier) throws TableNotExistException { if (partitionCache != null) { - List result = wrapped.listPartitions(identifier); + List result = wrapped.listPartitions(identifier); partitionCache.put(identifier, result); } } @@ -341,8 +340,7 @@ public CacheSizes estimatedCacheSizes() { } long partitionCacheSize = 0L; if (partitionCache != null) { - for (Map.Entry> entry : - partitionCache.asMap().entrySet()) { + for (Map.Entry> entry : partitionCache.asMap().entrySet()) { partitionCacheSize += entry.getValue().size(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 37ea6fa5e203..70daba4186f8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -20,7 +20,7 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; @@ -255,12 +255,12 @@ void dropPartition(Identifier identifier, Map partitions) throws TableNotExistException, PartitionNotExistException; /** - * Get PartitionEntry of all partitions of the table. + * Get Partition of all partitions of the table. * * @param identifier path of the table to list partitions * @throws TableNotExistException if the table does not exist */ - List listPartitions(Identifier identifier) throws TableNotExistException; + List listPartitions(Identifier identifier) throws TableNotExistException; /** * Modify an existing table from a {@link SchemaChange}. diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 043da0504d7f..301c7136e95e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -19,10 +19,19 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; +import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME; import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; @@ -60,4 +69,27 @@ public static String table(String path) { public static Map tableDefaultOptions(Map options) { return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX); } + + public static List listPartitionsFromFileSystem(Table table) { + Options options = Options.fromMap(table.options()); + InternalRowPartitionComputer computer = + new InternalRowPartitionComputer( + options.get(PARTITION_DEFAULT_NAME), + table.rowType(), + table.partitionKeys().toArray(new String[0]), + options.get(PARTITION_GENERATE_LEGCY_NAME)); + List partitionEntries = + table.newReadBuilder().newScan().listPartitionEntries(); + List partitions = new ArrayList<>(partitionEntries.size()); + for (PartitionEntry entry : partitionEntries) { + partitions.add( + new Partition( + computer.generatePartValues(entry.partition()), + entry.recordCount(), + entry.fileSizeInBytes(), + entry.fileCount(), + entry.lastFileCreationTime())); + } + return partitions; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 968f00cfcae5..e2d1a94cfaff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -19,7 +19,7 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; @@ -165,8 +165,7 @@ public void dropPartition(Identifier identifier, Map partitions) } @Override - public List listPartitions(Identifier identifier) - throws TableNotExistException { + public List listPartitions(Identifier identifier) throws TableNotExistException { return wrapped.listPartitions(identifier); } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java index ccf5f3853873..f24049eca9bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java @@ -18,6 +18,8 @@ package org.apache.paimon.metastore; +import org.apache.paimon.partition.Partition; + import java.io.Serializable; import java.util.LinkedHashMap; import java.util.List; @@ -38,9 +40,7 @@ public interface MetastoreClient extends AutoCloseable { void markPartitionDone(LinkedHashMap partition) throws Exception; - default void alterPartition( - LinkedHashMap partition, PartitionStats partitionStats) - throws Exception { + default void alterPartition(Partition partition) throws Exception { throw new UnsupportedOperationException(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java deleted file mode 100644 index eacc400f52c3..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.metastore; - -/** Statistic for partition. */ -public interface PartitionStats { - - long numFiles(); - - long totalSize(); - - long numRows(); - - long lastUpdateTimeMillis(); - - static PartitionStats create( - long numFiles, long totalSize, long numRows, long lastUpdateTimeMillis) { - return new PartitionStats() { - - @Override - public long numFiles() { - return numFiles; - } - - @Override - public long totalSize() { - return totalSize; - } - - @Override - public long numRows() { - return numRows; - } - - @Override - public long lastUpdateTimeMillis() { - return lastUpdateTimeMillis; - } - - @Override - public String toString() { - return String.format( - "numFiles: %s, totalSize: %s, numRows: %s, lastUpdateTimeMillis: %s", - numFiles, totalSize, numRows, lastUpdateTimeMillis); - } - }; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java new file mode 100644 index 000000000000..b13082fb4430 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.partition; + +import org.apache.paimon.annotation.Public; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +/** Entry representing a partition. */ +@JsonIgnoreProperties(ignoreUnknown = true) +@Public +public class Partition implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final String FIELD_SPEC = "spec"; + public static final String FIELD_RECORD_COUNT = "recordCount"; + public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes"; + public static final String FIELD_FILE_COUNT = "fileCount"; + public static final String FIELD_LAST_FILE_CREATION_TIME = "lastFileCreationTime"; + + @JsonProperty(FIELD_SPEC) + private final Map spec; + + @JsonProperty(FIELD_RECORD_COUNT) + private final long recordCount; + + @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) + private final long fileSizeInBytes; + + @JsonProperty(FIELD_FILE_COUNT) + private final long fileCount; + + @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) + private final long lastFileCreationTime; + + @JsonCreator + public Partition( + @JsonProperty(FIELD_SPEC) Map spec, + @JsonProperty(FIELD_RECORD_COUNT) long recordCount, + @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes, + @JsonProperty(FIELD_FILE_COUNT) long fileCount, + @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime) { + this.spec = spec; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastFileCreationTime = lastFileCreationTime; + } + + @JsonGetter(FIELD_SPEC) + public Map spec() { + return spec; + } + + @JsonGetter(FIELD_RECORD_COUNT) + public long recordCount() { + return recordCount; + } + + @JsonGetter(FIELD_FILE_SIZE_IN_BYTES) + public long fileSizeInBytes() { + return fileSizeInBytes; + } + + @JsonGetter(FIELD_FILE_COUNT) + public long fileCount() { + return fileCount; + } + + @JsonGetter(FIELD_LAST_FILE_CREATION_TIME) + public long lastFileCreationTime() { + return lastFileCreationTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Partition that = (Partition) o; + return recordCount == that.recordCount + && fileSizeInBytes == that.fileSizeInBytes + && fileCount == that.fileCount + && lastFileCreationTime == that.lastFileCreationTime + && Objects.equals(spec, that.spec); + } + + @Override + public int hashCode() { + return Objects.hash(spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime); + } + + @Override + public String toString() { + return "{" + + "spec=" + + spec + + ", recordCount=" + + recordCount + + ", fileSizeInBytes=" + + fileSizeInBytes + + ", fileCount=" + + fileCount + + ", lastFileCreationTime=" + + lastFileCreationTime + + '}'; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 8b53bef8486b..b60481520902 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -23,9 +23,9 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.auth.AuthSession; import org.apache.paimon.rest.auth.CredentialsProvider; import org.apache.paimon.rest.auth.CredentialsProviderFactory; @@ -251,8 +251,7 @@ public void dropPartition(Identifier identifier, Map partitions) throws TableNotExistException, PartitionNotExistException {} @Override - public List listPartitions(Identifier identifier) - throws TableNotExistException { + public List listPartitions(Identifier identifier) throws TableNotExistException { throw new UnsupportedOperationException(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index fee6d1433143..c028fa7421d5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -21,9 +21,9 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; @@ -245,12 +245,12 @@ public void testPartitionCache() throws Exception { Collections.emptyMap(), ""); catalog.createTable(tableIdent, schema, false); - List partitionEntryList = catalog.listPartitions(tableIdent); + List partitionEntryList = catalog.listPartitions(tableIdent); assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent); catalog.invalidateTable(tableIdent); catalog.refreshPartitions(tableIdent); assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent); - List partitionEntryListFromCache = + List partitionEntryListFromCache = catalog.partitionCache().getIfPresent(tableIdent); assertThat(partitionEntryListFromCache).isNotNull(); assertThat(partitionEntryListFromCache).containsAll(partitionEntryList); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java index 1d4a9b0e8a58..0eaf23a1a28d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java @@ -18,8 +18,8 @@ package org.apache.paimon.catalog; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.partition.Partition; import org.apache.paimon.table.Table; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -56,7 +56,7 @@ public Cache tableCache() { return tableCache; } - public Cache> partitionCache() { + public Cache> partitionCache() { partitionCache.cleanUp(); return partitionCache; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java index ced37726f1eb..84542af4768b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java @@ -22,7 +22,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.metastore.PartitionStats; +import org.apache.paimon.partition.Partition; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; @@ -82,10 +82,10 @@ public void report(String partition, long modifyTimeMillis) throws Exception { } } - PartitionStats partitionStats = - PartitionStats.create(fileCount, totalSize, rowCount, modifyTimeMillis); + Partition partitionStats = + new Partition(partitionSpec, fileCount, totalSize, rowCount, modifyTimeMillis); LOG.info("alter partition {} with statistic {}.", partitionSpec, partitionStats); - metastoreClient.alterPartition(partitionSpec, partitionStats); + metastoreClient.alterPartition(partitionStats); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java index 3bdbdd20ad3e..3c5cd2f8e927 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.sink.partition; import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.metastore.PartitionStats; +import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.actions.AddDonePartitionAction; import org.junit.jupiter.api.Test; @@ -68,9 +68,7 @@ public void markPartitionDone(LinkedHashMap partitions) { } @Override - public void alterPartition( - LinkedHashMap partitionSpec, - PartitionStats partitionStats) { + public void alterPartition(Partition partition) { throw new UnsupportedOperationException(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java index 0f761efa2278..3c01772d6d3b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java @@ -23,7 +23,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.metastore.PartitionStats; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; @@ -85,7 +85,7 @@ public void testReportAction() throws Exception { BatchTableCommit committer = table.newBatchWriteBuilder().newCommit(); committer.commit(messages); AtomicBoolean closed = new AtomicBoolean(false); - Map partitionParams = Maps.newHashMap(); + Map partitionParams = Maps.newHashMap(); MetastoreClient client = new MetastoreClient() { @@ -116,12 +116,12 @@ public void markPartitionDone(LinkedHashMap partitionSpec) { } @Override - public void alterPartition( - LinkedHashMap partitionSpec, - PartitionStats partitionStats) { + public void alterPartition(Partition partition) { partitionParams.put( - PartitionPathUtils.generatePartitionPath(partitionSpec), - partitionStats); + PartitionPathUtils.generatePartitionPath( + partition.spec(), + table.rowType().project(table.partitionKeys())), + partition); } @Override @@ -136,7 +136,7 @@ public void close() { Assertions.assertThat(partitionParams).containsKey("c1=a/"); Assertions.assertThat(partitionParams.get("c1=a/").toString()) .isEqualTo( - "numFiles: 1, totalSize: 591, numRows: 1, lastUpdateTimeMillis: 1729598544974"); + "{spec={c1=a}, recordCount=1, fileSizeInBytes=591, fileCount=1, lastFileCreationTime=1729598544974}"); action.close(); Assertions.assertThat(closed).isTrue(); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 0661988648f4..755b2df2069f 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -23,13 +23,13 @@ import org.apache.paimon.client.ClientPool; import org.apache.paimon.hive.pool.CachedClientPool; import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.metastore.PartitionStats; import org.apache.paimon.options.Options; import org.apache.paimon.utils.PartitionPathUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; @@ -57,6 +57,7 @@ public class HiveMetastoreClient implements MetastoreClient { private final Identifier identifier; private final ClientPool clients; + private final List partitionKeys; private final StorageDescriptor sd; private final String dataFilePath; @@ -69,6 +70,10 @@ public class HiveMetastoreClient implements MetastoreClient { client -> client.getTable( identifier.getDatabaseName(), identifier.getTableName())); + this.partitionKeys = + table.getPartitionKeys().stream() + .map(FieldSchema::getName) + .collect(Collectors.toList()); this.sd = table.getSd(); this.dataFilePath = table.getParameters().containsKey(CoreOptions.DATA_FILE_PATH_DIRECTORY.key()) @@ -103,17 +108,17 @@ public void addPartitions(List> partitions) throws } @Override - public void alterPartition( - LinkedHashMap partition, PartitionStats partitionStats) - throws Exception { - List partitionValues = new ArrayList<>(partition.values()); + public void alterPartition(org.apache.paimon.partition.Partition partition) throws Exception { + Map spec = partition.spec(); + List partitionValues = + partitionKeys.stream().map(spec::get).collect(Collectors.toList()); Map statistic = new HashMap<>(); - statistic.put(NUM_FILES_PROP, String.valueOf(partitionStats.numFiles())); - statistic.put(TOTAL_SIZE_PROP, String.valueOf(partitionStats.totalSize())); - statistic.put(NUM_ROWS_PROP, String.valueOf(partitionStats.numRows())); + statistic.put(NUM_FILES_PROP, String.valueOf(partition.fileCount())); + statistic.put(TOTAL_SIZE_PROP, String.valueOf(partition.fileSizeInBytes())); + statistic.put(NUM_ROWS_PROP, String.valueOf(partition.recordCount())); - String modifyTimeSeconds = String.valueOf(partitionStats.lastUpdateTimeMillis() / 1000); + String modifyTimeSeconds = String.valueOf(partition.lastFileCreationTime() / 1000); statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); // just for being compatible with hive metastore @@ -128,7 +133,7 @@ public void alterPartition( identifier.getObjectName(), partitionValues)); hivePartition.setValues(partitionValues); - hivePartition.setLastAccessTime((int) (partitionStats.lastUpdateTimeMillis() / 1000)); + hivePartition.setLastAccessTime((int) (partition.lastFileCreationTime() / 1000)); hivePartition.getParameters().putAll(statistic); clients.execute( client ->