diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 7fbb777e3310..444299f6fdab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -57,6 +57,11 @@ public interface Catalog extends AutoCloseable { String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; String DB_LOCATION_PROP = "location"; String DB_SUFFIX = ".db"; + String NUM_ROWS_PROP = "numRows"; + String NUM_FILES_PROP = "numFiles"; + String TOTAL_SIZE_PROP = "totalSize"; + String LAST_UPDATE_TIME_PROP = "lastUpdateTime"; + String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime"; /** Warehouse root path containing all database directories in this catalog. */ String warehouse(); diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java index c9cafce321dd..ac12bfc73490 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java @@ -38,11 +38,13 @@ public interface MetastoreClient extends AutoCloseable { void markDone(LinkedHashMap partitionSpec) throws Exception; - void alterPartition( + default void alterPartition( LinkedHashMap partitionSpec, Map parameters, long modifyTime) - throws Exception; + throws Exception { + throw new UnsupportedOperationException(); + } /** Factory to create {@link MetastoreClient}. */ interface Factory extends Serializable { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index ad624b5602e3..bfcdfdb55f37 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -139,6 +139,10 @@ import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_REFRESH_STATUS; import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_SNAPSHOT; import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; +import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; +import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; +import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT; @@ -160,12 +164,8 @@ public class FlinkCatalog extends AbstractCatalog { private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class); - public static final String NUM_ROWS_KEY = "numRows"; - public static final String LAST_UPDATE_TIME_KEY = "lastUpdateTime"; - public static final String TOTAL_SIZE_KEY = "totalSize"; - public static final String NUM_FILES_KEY = "numFiles"; - private final ClassLoader classLoader; + private final ClassLoader classLoader; private final Catalog catalog; private final String name; private final boolean logStoreAutoRegister; @@ -1198,11 +1198,11 @@ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec // This was already filtered by the expected partition. PartitionEntry partitionEntry = partitionEntries.get(0); Map properties = new HashMap<>(); - properties.put(NUM_ROWS_KEY, String.valueOf(partitionEntry.recordCount())); + properties.put(NUM_ROWS_PROP, String.valueOf(partitionEntry.recordCount())); properties.put( - LAST_UPDATE_TIME_KEY, String.valueOf(partitionEntry.lastFileCreationTime())); - properties.put(NUM_FILES_KEY, String.valueOf(partitionEntry.fileCount())); - properties.put(TOTAL_SIZE_KEY, String.valueOf(partitionEntry.fileSizeInBytes())); + LAST_UPDATE_TIME_PROP, String.valueOf(partitionEntry.lastFileCreationTime())); + properties.put(NUM_FILES_PROP, String.valueOf(partitionEntry.fileCount())); + properties.put(TOTAL_SIZE_PROP, String.valueOf(partitionEntry.fileSizeInBytes())); return new CatalogPartitionImpl(properties, ""); } catch (TableNotPartitionedException | TableNotExistException e) { throw new PartitionNotExistException(getName(), tablePath, partitionSpec); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java index 9cb776340116..e6b6c49834c2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java @@ -41,6 +41,10 @@ import java.util.List; import java.util.Map; +import static org.apache.paimon.catalog.Catalog.HIVE_LAST_UPDATE_TIME_PROP; +import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; +import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; +import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; /** Action to report the table statistic from the latest snapshot to HMS. */ @@ -92,12 +96,10 @@ public void report(String partition, long modifyTime) throws Exception { } } Map statistic = new HashMap<>(); - // refer to org.apache.hadoop.hive.common.StatsSetupConst - statistic.put("numFiles", String.valueOf(fileCount)); - statistic.put("totalSize", String.valueOf(totalSize)); - statistic.put("numRows", String.valueOf(rowCount)); - // refer to org.apache.hadoop.hive.metastore.api.hive_metastoreConstants - statistic.put("transient_lastDdlTime", String.valueOf(modifyTime / 1000)); + statistic.put(NUM_FILES_PROP, String.valueOf(fileCount)); + statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize)); + statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount)); + statistic.put(HIVE_LAST_UPDATE_TIME_PROP, String.valueOf(modifyTime / 1000)); LOG.info("alter partition {} with statistic {}.", partition, statistic); metastoreClient.alterPartition(partitionSpec, statistic, modifyTime); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 975c6a49007f..b614b5953843 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -42,10 +42,10 @@ import java.util.stream.Collectors; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; -import static org.apache.paimon.flink.FlinkCatalog.LAST_UPDATE_TIME_KEY; -import static org.apache.paimon.flink.FlinkCatalog.NUM_FILES_KEY; -import static org.apache.paimon.flink.FlinkCatalog.NUM_ROWS_KEY; -import static org.apache.paimon.flink.FlinkCatalog.TOTAL_SIZE_KEY; +import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; +import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; +import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; +import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -554,10 +554,10 @@ void testPKTableGetPartition() throws Exception { assertThat(partitionPropertiesMap1) .allSatisfy( (par, properties) -> { - assertThat(properties.get(NUM_ROWS_KEY)).isEqualTo("2"); - assertThat(properties.get(LAST_UPDATE_TIME_KEY)).isNotBlank(); - assertThat(properties.get(NUM_FILES_KEY)).isEqualTo("1"); - assertThat(properties.get(TOTAL_SIZE_KEY)).isNotBlank(); + assertThat(properties.get(NUM_ROWS_PROP)).isEqualTo("2"); + assertThat(properties.get(LAST_UPDATE_TIME_PROP)).isNotBlank(); + assertThat(properties.get(NUM_FILES_PROP)).isEqualTo("1"); + assertThat(properties.get(TOTAL_SIZE_PROP)).isNotBlank(); }); // update p1 data sql("UPDATE PK_T SET word = 'c' WHERE id = 2"); @@ -589,8 +589,8 @@ void testNonPKTableGetPartition() throws Exception { assertThat(partitionPropertiesMap1) .allSatisfy( (par, properties) -> { - assertThat(properties.get(NUM_ROWS_KEY)).isEqualTo("1"); - assertThat(properties.get(LAST_UPDATE_TIME_KEY)).isNotBlank(); + assertThat(properties.get(NUM_ROWS_PROP)).isEqualTo("1"); + assertThat(properties.get(LAST_UPDATE_TIME_PROP)).isNotBlank(); }); // append data to p1 @@ -1063,13 +1063,13 @@ private static void assertPartitionUpdateTo( Long expectedNumFiles) { Map newPartitionProperties = newProperties.get(partition); Map oldPartitionProperties = oldProperties.get(partition); - assertThat(newPartitionProperties.get(NUM_ROWS_KEY)) + assertThat(newPartitionProperties.get(NUM_ROWS_PROP)) .isEqualTo(String.valueOf(expectedNumRows)); - assertThat(Long.valueOf(newPartitionProperties.get(LAST_UPDATE_TIME_KEY))) - .isGreaterThan(Long.valueOf(oldPartitionProperties.get(LAST_UPDATE_TIME_KEY))); - assertThat(newPartitionProperties.get(NUM_FILES_KEY)) + assertThat(Long.valueOf(newPartitionProperties.get(LAST_UPDATE_TIME_PROP))) + .isGreaterThan(Long.valueOf(oldPartitionProperties.get(LAST_UPDATE_TIME_PROP))); + assertThat(newPartitionProperties.get(NUM_FILES_PROP)) .isEqualTo(String.valueOf(expectedNumFiles)); - assertThat(Long.valueOf(newPartitionProperties.get(TOTAL_SIZE_KEY))) - .isGreaterThan(Long.valueOf(oldPartitionProperties.get(TOTAL_SIZE_KEY))); + assertThat(Long.valueOf(newPartitionProperties.get(TOTAL_SIZE_PROP))) + .isGreaterThan(Long.valueOf(oldPartitionProperties.get(TOTAL_SIZE_PROP))); } }