Skip to content

Commit

Permalink
[core] Adjust MetastoreClient new method and extract string constants
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Oct 30, 2024
1 parent 0d064f9 commit 31c13f0
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public interface Catalog extends AutoCloseable {
String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String DB_LOCATION_PROP = "location";
String DB_SUFFIX = ".db";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
String TOTAL_SIZE_PROP = "totalSize";
String LAST_UPDATE_TIME_PROP = "lastUpdateTime";
String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime";

/** Warehouse root path containing all database directories in this catalog. */
String warehouse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ public interface MetastoreClient extends AutoCloseable {

void markDone(LinkedHashMap<String, String> partitionSpec) throws Exception;

void alterPartition(
default void alterPartition(
LinkedHashMap<String, String> partitionSpec,
Map<String, String> parameters,
long modifyTime)
throws Exception;
throws Exception {
throw new UnsupportedOperationException();
}

/** Factory to create {@link MetastoreClient}. */
interface Factory extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@
import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_REFRESH_STATUS;
import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_SNAPSHOT;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
Expand All @@ -160,12 +164,8 @@
public class FlinkCatalog extends AbstractCatalog {

private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class);
public static final String NUM_ROWS_KEY = "numRows";
public static final String LAST_UPDATE_TIME_KEY = "lastUpdateTime";
public static final String TOTAL_SIZE_KEY = "totalSize";
public static final String NUM_FILES_KEY = "numFiles";
private final ClassLoader classLoader;

private final ClassLoader classLoader;
private final Catalog catalog;
private final String name;
private final boolean logStoreAutoRegister;
Expand Down Expand Up @@ -1198,11 +1198,11 @@ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec
// This was already filtered by the expected partition.
PartitionEntry partitionEntry = partitionEntries.get(0);
Map<String, String> properties = new HashMap<>();
properties.put(NUM_ROWS_KEY, String.valueOf(partitionEntry.recordCount()));
properties.put(NUM_ROWS_PROP, String.valueOf(partitionEntry.recordCount()));
properties.put(
LAST_UPDATE_TIME_KEY, String.valueOf(partitionEntry.lastFileCreationTime()));
properties.put(NUM_FILES_KEY, String.valueOf(partitionEntry.fileCount()));
properties.put(TOTAL_SIZE_KEY, String.valueOf(partitionEntry.fileSizeInBytes()));
LAST_UPDATE_TIME_PROP, String.valueOf(partitionEntry.lastFileCreationTime()));
properties.put(NUM_FILES_PROP, String.valueOf(partitionEntry.fileCount()));
properties.put(TOTAL_SIZE_PROP, String.valueOf(partitionEntry.fileSizeInBytes()));
return new CatalogPartitionImpl(properties, "");
} catch (TableNotPartitionedException | TableNotExistException e) {
throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.catalog.Catalog.HIVE_LAST_UPDATE_TIME_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;

/** Action to report the table statistic from the latest snapshot to HMS. */
Expand Down Expand Up @@ -92,12 +96,10 @@ public void report(String partition, long modifyTime) throws Exception {
}
}
Map<String, String> statistic = new HashMap<>();
// refer to org.apache.hadoop.hive.common.StatsSetupConst
statistic.put("numFiles", String.valueOf(fileCount));
statistic.put("totalSize", String.valueOf(totalSize));
statistic.put("numRows", String.valueOf(rowCount));
// refer to org.apache.hadoop.hive.metastore.api.hive_metastoreConstants
statistic.put("transient_lastDdlTime", String.valueOf(modifyTime / 1000));
statistic.put(NUM_FILES_PROP, String.valueOf(fileCount));
statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize));
statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount));
statistic.put(HIVE_LAST_UPDATE_TIME_PROP, String.valueOf(modifyTime / 1000));

LOG.info("alter partition {} with statistic {}.", partition, statistic);
metastoreClient.alterPartition(partitionSpec, statistic, modifyTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
import java.util.stream.Collectors;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.apache.paimon.flink.FlinkCatalog.LAST_UPDATE_TIME_KEY;
import static org.apache.paimon.flink.FlinkCatalog.NUM_FILES_KEY;
import static org.apache.paimon.flink.FlinkCatalog.NUM_ROWS_KEY;
import static org.apache.paimon.flink.FlinkCatalog.TOTAL_SIZE_KEY;
import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
Expand Down Expand Up @@ -554,10 +554,10 @@ void testPKTableGetPartition() throws Exception {
assertThat(partitionPropertiesMap1)
.allSatisfy(
(par, properties) -> {
assertThat(properties.get(NUM_ROWS_KEY)).isEqualTo("2");
assertThat(properties.get(LAST_UPDATE_TIME_KEY)).isNotBlank();
assertThat(properties.get(NUM_FILES_KEY)).isEqualTo("1");
assertThat(properties.get(TOTAL_SIZE_KEY)).isNotBlank();
assertThat(properties.get(NUM_ROWS_PROP)).isEqualTo("2");
assertThat(properties.get(LAST_UPDATE_TIME_PROP)).isNotBlank();
assertThat(properties.get(NUM_FILES_PROP)).isEqualTo("1");
assertThat(properties.get(TOTAL_SIZE_PROP)).isNotBlank();
});
// update p1 data
sql("UPDATE PK_T SET word = 'c' WHERE id = 2");
Expand Down Expand Up @@ -589,8 +589,8 @@ void testNonPKTableGetPartition() throws Exception {
assertThat(partitionPropertiesMap1)
.allSatisfy(
(par, properties) -> {
assertThat(properties.get(NUM_ROWS_KEY)).isEqualTo("1");
assertThat(properties.get(LAST_UPDATE_TIME_KEY)).isNotBlank();
assertThat(properties.get(NUM_ROWS_PROP)).isEqualTo("1");
assertThat(properties.get(LAST_UPDATE_TIME_PROP)).isNotBlank();
});

// append data to p1
Expand Down Expand Up @@ -1063,13 +1063,13 @@ private static void assertPartitionUpdateTo(
Long expectedNumFiles) {
Map<String, String> newPartitionProperties = newProperties.get(partition);
Map<String, String> oldPartitionProperties = oldProperties.get(partition);
assertThat(newPartitionProperties.get(NUM_ROWS_KEY))
assertThat(newPartitionProperties.get(NUM_ROWS_PROP))
.isEqualTo(String.valueOf(expectedNumRows));
assertThat(Long.valueOf(newPartitionProperties.get(LAST_UPDATE_TIME_KEY)))
.isGreaterThan(Long.valueOf(oldPartitionProperties.get(LAST_UPDATE_TIME_KEY)));
assertThat(newPartitionProperties.get(NUM_FILES_KEY))
assertThat(Long.valueOf(newPartitionProperties.get(LAST_UPDATE_TIME_PROP)))
.isGreaterThan(Long.valueOf(oldPartitionProperties.get(LAST_UPDATE_TIME_PROP)));
assertThat(newPartitionProperties.get(NUM_FILES_PROP))
.isEqualTo(String.valueOf(expectedNumFiles));
assertThat(Long.valueOf(newPartitionProperties.get(TOTAL_SIZE_KEY)))
.isGreaterThan(Long.valueOf(oldPartitionProperties.get(TOTAL_SIZE_KEY)));
assertThat(Long.valueOf(newPartitionProperties.get(TOTAL_SIZE_PROP)))
.isGreaterThan(Long.valueOf(oldPartitionProperties.get(TOTAL_SIZE_PROP)));
}
}

0 comments on commit 31c13f0

Please sign in to comment.