diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index ea5dd5dfb163..4e12c88bcdec 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -86,6 +86,12 @@ Duration Set a time duration when a partition has no new data after this time duration, mark the done status to indicate that the data is ready. + +
partition.idle-time-to-report-statistic
+ 1 h + Duration + Set a time duration when a partition has no new data after this time duration, start to report the partition statistics to hms. +
partition.time-interval
(none) diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java index de185155d08e..c9cafce321dd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.LinkedHashMap; +import java.util.Map; /** * A metastore client related to a table. All methods of this interface operate on the same specific @@ -37,6 +38,12 @@ public interface MetastoreClient extends AutoCloseable { void markDone(LinkedHashMap partitionSpec) throws Exception; + void alterPartition( + LinkedHashMap partitionSpec, + Map parameters, + long modifyTime) + throws Exception; + /** Factory to create {@link MetastoreClient}. */ interface Factory extends Serializable { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 2c12b70a2493..73b00460190e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -355,6 +355,14 @@ public class FlinkConnectorOptions { "You can specify time interval for partition, for example, " + "daily partition is '1 d', hourly partition is '1 h'."); + public static final ConfigOption PARTITION_IDLE_TIME_TO_REPORT_STATISTIC = + key("partition.idle-time-to-report-statistic") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDescription( + "Set a time duration when a partition has no new data after this time duration, " + + "start to report the partition statistics to hms."); + public static final ConfigOption CLUSTERING_COLUMNS = key("sink.clustering.by-columns") .stringType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index 9ae53f401325..d237f4da56cf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -20,7 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; -import org.apache.paimon.flink.sink.partition.PartitionMarkDone; +import org.apache.paimon.flink.sink.partition.PartitionListeners; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; @@ -43,7 +43,7 @@ public class StoreCommitter implements Committer committables) throws IOException, InterruptedException { commit.commitMultiple(committables, false); calcNumBytesAndRecordsOut(committables); - if (partitionMarkDone != null) { - partitionMarkDone.notifyCommittable(committables); - } + partitionListeners.notifyCommittable(committables); } @Override public int filterAndCommit( List globalCommittables, boolean checkAppendFiles) { int committed = commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles); - if (partitionMarkDone != null) { - partitionMarkDone.notifyCommittable(globalCommittables); - } + partitionListeners.notifyCommittable(globalCommittables); return committed; } @Override public Map> groupByCheckpoint(Collection committables) { - if (partitionMarkDone != null) { - try { - partitionMarkDone.snapshotState(); - } catch (Exception e) { - throw new RuntimeException(e); - } + try { + partitionListeners.snapshotState(); + } catch (Exception e) { + throw new RuntimeException(e); } Map> grouped = new HashMap<>(); @@ -146,9 +135,7 @@ public Map> groupByCheckpoint(Collection co @Override public void close() throws Exception { commit.close(); - if (partitionMarkDone != null) { - partitionMarkDone.close(); - } + partitionListeners.close(); } private void calcNumBytesAndRecordsOut(List committables) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java new file mode 100644 index 000000000000..9cb776340116 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.SnapshotManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; + +/** Action to report the table statistic from the latest snapshot to HMS. */ +public class HmsReporter implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(HmsReporter.class); + + private final MetastoreClient metastoreClient; + private final SnapshotReader snapshotReader; + private final SnapshotManager snapshotManager; + + public HmsReporter(FileStoreTable table, MetastoreClient client) { + this.metastoreClient = + Preconditions.checkNotNull(client, "the metastore client factory is null"); + this.snapshotReader = table.newSnapshotReader(); + this.snapshotManager = table.snapshotManager(); + } + + public void report(String partition, long modifyTime) throws Exception { + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot != null) { + LinkedHashMap partitionSpec = + extractPartitionSpecFromPath(new Path(partition)); + List splits = + new ArrayList<>( + snapshotReader + .withMode(ScanMode.ALL) + .withPartitionFilter(partitionSpec) + .withSnapshot(snapshot) + .read() + .dataSplits()); + long rowCount = 0; + long totalSize = 0; + long fileCount = 0; + for (DataSplit split : splits) { + List fileMetas = split.dataFiles(); + rowCount += split.rowCount(); + fileCount += fileMetas.size(); + for (DataFileMeta fileMeta : fileMetas) { + totalSize += fileMeta.fileSize(); + } + + if (split.deletionFiles().isPresent()) { + fileCount += split.deletionFiles().get().size(); + totalSize += + split.deletionFiles().get().stream() + .map(DeletionFile::length) + .reduce(0L, Long::sum); + } + } + Map statistic = new HashMap<>(); + // refer to org.apache.hadoop.hive.common.StatsSetupConst + statistic.put("numFiles", String.valueOf(fileCount)); + statistic.put("totalSize", String.valueOf(totalSize)); + statistic.put("numRows", String.valueOf(rowCount)); + // refer to org.apache.hadoop.hive.metastore.api.hive_metastoreConstants + statistic.put("transient_lastDdlTime", String.valueOf(modifyTime / 1000)); + + LOG.info("alter partition {} with statistic {}.", partition, statistic); + metastoreClient.alterPartition(partitionSpec, statistic, modifyTime); + } + } + + @Override + public void close() throws IOException { + try { + metastoreClient.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java new file mode 100644 index 000000000000..65d25fbc0271 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.manifest.ManifestCommittable; + +import java.io.Closeable; +import java.util.List; + +/** The partition listener. */ +public interface PartitionListener extends Closeable { + + void notifyCommittable(List committables); + + void snapshotState() throws Exception; +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java new file mode 100644 index 000000000000..dbdf77601480 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.flink.sink.Committer; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.IOUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** Partition listeners. */ +public class PartitionListeners implements Closeable { + + private final List listeners; + + private PartitionListeners(List listeners) { + this.listeners = listeners; + } + + public void notifyCommittable(List committables) { + for (PartitionListener trigger : listeners) { + trigger.notifyCommittable(committables); + } + } + + public void snapshotState() throws Exception { + for (PartitionListener trigger : listeners) { + trigger.snapshotState(); + } + } + + @Override + public void close() throws IOException { + IOUtils.closeAllQuietly(listeners); + } + + public static PartitionListeners create(Committer.Context context, FileStoreTable table) + throws Exception { + List listeners = new ArrayList<>(); + + ReportHmsListener.create(context.isRestored(), context.stateStore(), table) + .ifPresent(listeners::add); + PartitionMarkDone.create( + context.streamingCheckpointEnabled(), + context.isRestored(), + context.stateStore(), + table) + .ifPresent(listeners::add); + + return new PartitionListeners(listeners); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java index d0825bcdb752..8714e0006e7b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java @@ -33,28 +33,25 @@ import org.apache.flink.api.common.state.OperatorStateStore; -import javax.annotation.Nullable; - -import java.io.Closeable; import java.io.IOException; import java.time.Duration; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE; /** Mark partition done. */ -public class PartitionMarkDone implements Closeable { +public class PartitionMarkDone implements PartitionListener { private final InternalRowPartitionComputer partitionComputer; private final PartitionMarkDoneTrigger trigger; private final List actions; private final boolean waitCompaction; - @Nullable - public static PartitionMarkDone create( + public static Optional create( boolean isStreaming, boolean isRestored, OperatorStateStore stateStore, @@ -64,7 +61,7 @@ public static PartitionMarkDone create( Options options = coreOptions.toConfiguration(); if (disablePartitionMarkDone(isStreaming, table, options)) { - return null; + return Optional.empty(); } InternalRowPartitionComputer partitionComputer = @@ -87,7 +84,8 @@ public static PartitionMarkDone create( && (coreOptions.deletionVectorsEnabled() || coreOptions.mergeEngine() == MergeEngine.FIRST_ROW); - return new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction); + return Optional.of( + new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction)); } private static boolean disablePartitionMarkDone( @@ -116,6 +114,7 @@ public PartitionMarkDone( this.waitCompaction = waitCompaction; } + @Override public void notifyCommittable(List committables) { Set partitions = new HashSet<>(); boolean endInput = false; @@ -153,6 +152,7 @@ public static void markDone(List partitions, List> PENDING_REPORT_STATE_DESC = + new ListStateDescriptor<>( + "pending-report-hms-partition", + new MapSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE)); + + private final InternalRowPartitionComputer partitionComputer; + private final HmsReporter hmsReporter; + private final ListState> pendingPartitionsState; + private final Map pendingPartitions; + private final long idleTime; + + private ReportHmsListener( + InternalRowPartitionComputer partitionComputer, + HmsReporter hmsReporter, + OperatorStateStore store, + boolean isRestored, + long idleTime) + throws Exception { + this.partitionComputer = partitionComputer; + this.hmsReporter = hmsReporter; + this.pendingPartitionsState = store.getListState(PENDING_REPORT_STATE_DESC); + this.pendingPartitions = new HashMap<>(); + if (isRestored) { + Iterator> it = pendingPartitionsState.get().iterator(); + if (it.hasNext()) { + Map state = it.next(); + pendingPartitions.putAll(state); + } + } + this.idleTime = idleTime; + } + + public void notifyCommittable(List committables) { + Set partition = new HashSet<>(); + boolean endInput = false; + for (ManifestCommittable committable : committables) { + for (CommitMessage commitMessage : committable.fileCommittables()) { + CommitMessageImpl message = (CommitMessageImpl) commitMessage; + if (!message.newFilesIncrement().isEmpty() + || !message.compactIncrement().isEmpty()) { + partition.add( + PartitionPathUtils.generatePartitionPath( + partitionComputer.generatePartValues(message.partition()))); + } + } + if (committable.identifier() == Long.MAX_VALUE) { + endInput = true; + } + } + // append to map + long current = System.currentTimeMillis(); + partition.forEach(p -> pendingPartitions.put(p, current)); + + try { + Map partitions = reportPartition(endInput); + for (Map.Entry entry : partitions.entrySet()) { + hmsReporter.report(entry.getKey(), entry.getValue()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Map reportPartition(boolean endInput) { + if (endInput) { + return pendingPartitions; + } + + Iterator> iterator = pendingPartitions.entrySet().iterator(); + Map result = new HashMap<>(); + long current = System.currentTimeMillis(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (current - entry.getValue() > idleTime) { + result.put(entry.getKey(), entry.getValue()); + iterator.remove(); + } + } + + return result; + } + + public void snapshotState() throws Exception { + pendingPartitionsState.update(Collections.singletonList(pendingPartitions)); + } + + public static Optional create( + boolean isRestored, OperatorStateStore stateStore, FileStoreTable table) + throws Exception { + + CoreOptions coreOptions = table.coreOptions(); + Options options = coreOptions.toConfiguration(); + if (options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis() + <= 0) { + return Optional.empty(); + } + + if ((table.partitionKeys().isEmpty())) { + return Optional.empty(); + } + + if (!coreOptions.partitionedTableInMetastore()) { + return Optional.empty(); + } + + if (table.catalogEnvironment().metastoreClientFactory() == null) { + return Optional.empty(); + } + + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + coreOptions.partitionDefaultName(), + table.schema().logicalPartitionType(), + table.partitionKeys().toArray(new String[0]), + coreOptions.legacyPartitionName()); + + return Optional.of( + new ReportHmsListener( + partitionComputer, + new HmsReporter( + table, + table.catalogEnvironment().metastoreClientFactory().create()), + stateStore, + isRestored, + options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC) + .toMillis())); + } + + @Override + public void close() throws IOException { + if (hmsReporter != null) { + hmsReporter.close(); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java index 5338b3886001..19c22d137c7f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -61,6 +62,15 @@ public void markDone(LinkedHashMap partitionSpec) throw new UnsupportedOperationException(); } + @Override + public void alterPartition( + LinkedHashMap partitionSpec, + Map parameters, + long modifyTime) + throws Exception { + throw new UnsupportedOperationException(); + } + @Override public void close() throws Exception { closed.set(true); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java new file mode 100644 index 000000000000..0050f3083a8e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.PartitionPathUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Test for {@link HmsReporter}. */ +public class HmsReporterTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testReportAction() throws Exception { + Path tablePath = new Path(tempDir.toString(), "table"); + SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); + Schema schema = + new Schema( + Lists.newArrayList( + new DataField(0, "c1", DataTypes.STRING()), + new DataField(1, "c2", DataTypes.STRING()), + new DataField(2, "c3", DataTypes.STRING())), + Collections.singletonList("c1"), + Collections.emptyList(), + Maps.newHashMap(), + ""); + schemaManager.createTable(schema); + + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + BatchTableWrite writer = table.newBatchWriteBuilder().newWrite(); + writer.write( + GenericRow.of( + BinaryString.fromString("a"), + BinaryString.fromString("a"), + BinaryString.fromString("a"))); + writer.write( + GenericRow.of( + BinaryString.fromString("b"), + BinaryString.fromString("a"), + BinaryString.fromString("a"))); + List messages = writer.prepareCommit(); + BatchTableCommit committer = table.newBatchWriteBuilder().newCommit(); + committer.commit(messages); + AtomicBoolean closed = new AtomicBoolean(false); + Map> partitionParams = Maps.newHashMap(); + + MetastoreClient client = + new MetastoreClient() { + @Override + public void addPartition(BinaryRow partition) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void addPartition(LinkedHashMap partitionSpec) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void deletePartition(LinkedHashMap partitionSpec) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void markDone(LinkedHashMap partitionSpec) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition( + LinkedHashMap partitionSpec, + Map parameters, + long modifyTime) + throws Exception { + partitionParams.put( + PartitionPathUtils.generatePartitionPath(partitionSpec), + parameters); + } + + @Override + public void close() throws Exception { + closed.set(true); + } + }; + + HmsReporter action = new HmsReporter(table, client); + long time = 1729598544974L; + action.report("c1=a/", time); + Assertions.assertThat(partitionParams).containsKey("c1=a/"); + Assertions.assertThat(partitionParams.get("c1=a/")) + .isEqualTo( + ImmutableMap.of( + "numFiles", + "1", + "totalSize", + "591", + "numRows", + "1", + "transient_lastDdlTime", + String.valueOf(time / 1000))); + action.close(); + Assertions.assertThat(closed).isTrue(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java index ecef6d337a6b..f0f4596c61bb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java @@ -86,7 +86,7 @@ private void innerTest(boolean deletionVectors) throws Exception { Path location = catalog.getTableLocation(identifier); Path successFile = new Path(location, "a=0/_SUCCESS"); PartitionMarkDone markDone = - PartitionMarkDone.create(false, false, new MockOperatorStateStore(), table); + PartitionMarkDone.create(false, false, new MockOperatorStateStore(), table).get(); notifyCommits(markDone, true); assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 6ec3b086db4d..5856515bb866 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** {@link MetastoreClient} for Hive tables. */ public class HiveMetastoreClient implements MetastoreClient { @@ -112,6 +113,32 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc } } + @Override + public void alterPartition( + LinkedHashMap partitionSpec, + Map parameters, + long modifyTime) + throws Exception { + List partitionValues = new ArrayList<>(partitionSpec.values()); + int currentTime = (int) (modifyTime / 1000); + Partition hivePartition = + clients.run( + client -> + client.getPartition( + identifier.getDatabaseName(), + identifier.getObjectName(), + partitionValues)); + hivePartition.setValues(partitionValues); + hivePartition.setLastAccessTime(currentTime); + hivePartition.getParameters().putAll(parameters); + clients.execute( + client -> + client.alter_partition( + identifier.getDatabaseName(), + identifier.getObjectName(), + hivePartition)); + } + @Override public void deletePartition(LinkedHashMap partitionSpec) throws Exception { List partitionValues = new ArrayList<>(partitionSpec.values());