diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index ea5dd5dfb163..4e12c88bcdec 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -86,6 +86,12 @@
Duration |
Set a time duration when a partition has no new data after this time duration, mark the done status to indicate that the data is ready. |
+
+ partition.idle-time-to-report-statistic |
+ 1 h |
+ Duration |
+ Set a time duration when a partition has no new data after this time duration, start to report the partition statistics to hms. |
+
partition.time-interval |
(none) |
diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index de185155d08e..c9cafce321dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -22,6 +22,7 @@
import java.io.Serializable;
import java.util.LinkedHashMap;
+import java.util.Map;
/**
* A metastore client related to a table. All methods of this interface operate on the same specific
@@ -37,6 +38,12 @@ public interface MetastoreClient extends AutoCloseable {
void markDone(LinkedHashMap partitionSpec) throws Exception;
+ void alterPartition(
+ LinkedHashMap partitionSpec,
+ Map parameters,
+ long modifyTime)
+ throws Exception;
+
/** Factory to create {@link MetastoreClient}. */
interface Factory extends Serializable {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 2c12b70a2493..73b00460190e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -355,6 +355,14 @@ public class FlinkConnectorOptions {
"You can specify time interval for partition, for example, "
+ "daily partition is '1 d', hourly partition is '1 h'.");
+ public static final ConfigOption PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
+ key("partition.idle-time-to-report-statistic")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription(
+ "Set a time duration when a partition has no new data after this time duration, "
+ + "start to report the partition statistics to hms.");
+
public static final ConfigOption CLUSTERING_COLUMNS =
key("sink.clustering.by-columns")
.stringType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 106000aa85e5..d237f4da56cf 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -21,7 +21,6 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.sink.partition.PartitionListeners;
-import org.apache.paimon.flink.sink.partition.PartitionMarkDone;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
@@ -44,7 +43,6 @@ public class StoreCommitter implements Committer committables)
throws IOException, InterruptedException {
commit.commitMultiple(committables, false);
calcNumBytesAndRecordsOut(committables);
- if (partitionMarkDone != null) {
- partitionMarkDone.notifyCommittable(committables);
- }
partitionListeners.notifyCommittable(committables);
}
@@ -129,9 +113,6 @@ public void commit(List committables)
public int filterAndCommit(
List globalCommittables, boolean checkAppendFiles) {
int committed = commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles);
- if (partitionMarkDone != null) {
- partitionMarkDone.notifyCommittable(globalCommittables);
- }
partitionListeners.notifyCommittable(globalCommittables);
return committed;
}
@@ -139,9 +120,6 @@ public int filterAndCommit(
@Override
public Map> groupByCheckpoint(Collection committables) {
try {
- if (partitionMarkDone != null) {
- partitionMarkDone.snapshotState();
- }
partitionListeners.snapshotState();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -157,9 +135,6 @@ public Map> groupByCheckpoint(Collection co
@Override
public void close() throws Exception {
commit.close();
- if (partitionMarkDone != null) {
- partitionMarkDone.close();
- }
partitionListeners.close();
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
new file mode 100644
index 000000000000..9cb776340116
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.partition;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+
+/** Action to report the table statistic from the latest snapshot to HMS. */
+public class HmsReporter implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HmsReporter.class);
+
+ private final MetastoreClient metastoreClient;
+ private final SnapshotReader snapshotReader;
+ private final SnapshotManager snapshotManager;
+
+ public HmsReporter(FileStoreTable table, MetastoreClient client) {
+ this.metastoreClient =
+ Preconditions.checkNotNull(client, "the metastore client factory is null");
+ this.snapshotReader = table.newSnapshotReader();
+ this.snapshotManager = table.snapshotManager();
+ }
+
+ public void report(String partition, long modifyTime) throws Exception {
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot != null) {
+ LinkedHashMap partitionSpec =
+ extractPartitionSpecFromPath(new Path(partition));
+ List splits =
+ new ArrayList<>(
+ snapshotReader
+ .withMode(ScanMode.ALL)
+ .withPartitionFilter(partitionSpec)
+ .withSnapshot(snapshot)
+ .read()
+ .dataSplits());
+ long rowCount = 0;
+ long totalSize = 0;
+ long fileCount = 0;
+ for (DataSplit split : splits) {
+ List fileMetas = split.dataFiles();
+ rowCount += split.rowCount();
+ fileCount += fileMetas.size();
+ for (DataFileMeta fileMeta : fileMetas) {
+ totalSize += fileMeta.fileSize();
+ }
+
+ if (split.deletionFiles().isPresent()) {
+ fileCount += split.deletionFiles().get().size();
+ totalSize +=
+ split.deletionFiles().get().stream()
+ .map(DeletionFile::length)
+ .reduce(0L, Long::sum);
+ }
+ }
+ Map statistic = new HashMap<>();
+ // refer to org.apache.hadoop.hive.common.StatsSetupConst
+ statistic.put("numFiles", String.valueOf(fileCount));
+ statistic.put("totalSize", String.valueOf(totalSize));
+ statistic.put("numRows", String.valueOf(rowCount));
+ // refer to org.apache.hadoop.hive.metastore.api.hive_metastoreConstants
+ statistic.put("transient_lastDdlTime", String.valueOf(modifyTime / 1000));
+
+ LOG.info("alter partition {} with statistic {}.", partition, statistic);
+ metastoreClient.alterPartition(partitionSpec, statistic, modifyTime);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ metastoreClient.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index 776de9216e51..dbdf77601480 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -18,18 +18,17 @@
package org.apache.paimon.flink.sink.partition;
+import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.IOUtils;
-import org.apache.flink.api.common.state.OperatorStateStore;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-/** Partition collector. */
+/** Partition listeners. */
public class PartitionListeners implements Closeable {
private final List listeners;
@@ -55,13 +54,19 @@ public void close() throws IOException {
IOUtils.closeAllQuietly(listeners);
}
- public static PartitionListeners create(
- boolean isStreaming,
- boolean isRestored,
- OperatorStateStore stateStore,
- FileStoreTable table)
+ public static PartitionListeners create(Committer.Context context, FileStoreTable table)
throws Exception {
+ List listeners = new ArrayList<>();
+
+ ReportHmsListener.create(context.isRestored(), context.stateStore(), table)
+ .ifPresent(listeners::add);
+ PartitionMarkDone.create(
+ context.streamingCheckpointEnabled(),
+ context.isRestored(),
+ context.stateStore(),
+ table)
+ .ifPresent(listeners::add);
- return new PartitionListeners(new ArrayList<>());
+ return new PartitionListeners(listeners);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index d0825bcdb752..8714e0006e7b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -33,28 +33,25 @@
import org.apache.flink.api.common.state.OperatorStateStore;
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
/** Mark partition done. */
-public class PartitionMarkDone implements Closeable {
+public class PartitionMarkDone implements PartitionListener {
private final InternalRowPartitionComputer partitionComputer;
private final PartitionMarkDoneTrigger trigger;
private final List actions;
private final boolean waitCompaction;
- @Nullable
- public static PartitionMarkDone create(
+ public static Optional create(
boolean isStreaming,
boolean isRestored,
OperatorStateStore stateStore,
@@ -64,7 +61,7 @@ public static PartitionMarkDone create(
Options options = coreOptions.toConfiguration();
if (disablePartitionMarkDone(isStreaming, table, options)) {
- return null;
+ return Optional.empty();
}
InternalRowPartitionComputer partitionComputer =
@@ -87,7 +84,8 @@ public static PartitionMarkDone create(
&& (coreOptions.deletionVectorsEnabled()
|| coreOptions.mergeEngine() == MergeEngine.FIRST_ROW);
- return new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction);
+ return Optional.of(
+ new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction));
}
private static boolean disablePartitionMarkDone(
@@ -116,6 +114,7 @@ public PartitionMarkDone(
this.waitCompaction = waitCompaction;
}
+ @Override
public void notifyCommittable(List committables) {
Set partitions = new HashSet<>();
boolean endInput = false;
@@ -153,6 +152,7 @@ public static void markDone(List partitions, List> PENDING_REPORT_STATE_DESC =
+ new ListStateDescriptor<>(
+ "pending-report-hms-partition",
+ new MapSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE));
+
+ private final InternalRowPartitionComputer partitionComputer;
+ private final HmsReporter hmsReporter;
+ private final ListState