From c1ebb2e739b609181fbd29586d18aec24fdf6c4a Mon Sep 17 00:00:00 2001 From: YanZhangN Date: Mon, 1 Jul 2024 11:22:57 +0800 Subject: [PATCH] [flink] add MarkPartitionDoneEventAction to metastore (#3620) --- .../paimon/metastore/MetastoreClient.java | 2 + .../MarkPartitionDoneEventAction.java | 52 +++++++++++++++++++ .../partition/PartitionMarkDoneAction.java | 3 ++ .../partition/AddDonePartitionActionTest.java | 6 +++ .../paimon/hive/HiveMetastoreClient.java | 18 +++++++ .../paimon/hive/HiveCatalogITCaseBase.java | 30 ++++++++++- 6 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java index 9247d49232c5..de185155d08e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java @@ -35,6 +35,8 @@ public interface MetastoreClient extends AutoCloseable { void deletePartition(LinkedHashMap partitionSpec) throws Exception; + void markDone(LinkedHashMap partitionSpec) throws Exception; + /** Factory to create {@link MetastoreClient}. */ interface Factory extends Serializable { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java new file mode 100644 index 000000000000..a6dc0bd1e364 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.metastore.MetastoreClient; + +import java.io.IOException; +import java.util.LinkedHashMap; + +import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; + +/** A {@link PartitionMarkDoneAction} which add mark "PartitionEventType.LOAD_DONE". */ +public class MarkPartitionDoneEventAction implements PartitionMarkDoneAction { + private final MetastoreClient metastoreClient; + + public MarkPartitionDoneEventAction(MetastoreClient metastoreClient) { + this.metastoreClient = metastoreClient; + } + + @Override + public void markDone(String partition) throws Exception { + LinkedHashMap partitionSpec = + extractPartitionSpecFromPath(new Path(partition)); + metastoreClient.markDone(partitionSpec); + } + + @Override + public void close() throws IOException { + try { + metastoreClient.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java index 7dde9ee835e0..2177f7d0fb6c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java @@ -50,6 +50,9 @@ static List createActions( case "done-partition": return new AddDonePartitionAction( createMetastoreClient(fileStoreTable, options)); + case "mark-event": + return new MarkPartitionDoneEventAction( + createMetastoreClient(fileStoreTable, options)); default: throw new UnsupportedOperationException(action); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java index 4fc31e41dc1f..cff4479d99f5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java @@ -54,6 +54,12 @@ public void deletePartition(LinkedHashMap partitionSpec) { throw new UnsupportedOperationException(); } + @Override + public void markDone(LinkedHashMap partitionSpec) + throws Exception { + throw new UnsupportedOperationException(); + } + @Override public void close() throws Exception { closed.set(true); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index e2771c0e6012..6f3473e8eac7 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import java.util.ArrayList; @@ -105,11 +106,28 @@ public void deletePartition(LinkedHashMap partitionSpec) throws } } + @Override + public void markDone(LinkedHashMap partitionSpec) throws Exception { + try { + client.markPartitionForEvent( + identifier.getDatabaseName(), + identifier.getObjectName(), + partitionSpec, + PartitionEventType.LOAD_DONE); + } catch (NoSuchObjectException e) { + // do nothing if the partition not exists + } + } + @Override public void close() throws Exception { client.close(); } + public IMetaStoreClient client() { + return this.client; + } + /** Factory to create {@link HiveMetastoreClient}. */ public static class Factory implements MetastoreClient.Factory { diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index a73510df42e6..4692f31c65ad 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -25,8 +25,11 @@ import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.hive.annotation.Minio; import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner; +import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.privilege.NoPrivilegeException; import org.apache.paimon.s3.MinioTestContainer; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.IOUtils; import com.klarna.hiverunner.HiveShell; @@ -47,7 +50,9 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.junit.Rule; import org.junit.Test; import org.junit.jupiter.api.function.Executable; @@ -76,7 +81,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** IT cases for using Paimon {@link HiveCatalog} together with Paimon Hive connector. */ @RunWith(PaimonEmbeddedHiveRunner.class) @@ -1155,7 +1162,7 @@ public void testMarkDone() throws Exception { + "'partition.idle-time-to-done'='1 s'," + "'partition.time-interval'='1 d'," + "'metastore.partitioned-table'='true'," - + "'partition.mark-done-action'='done-partition,success-file'" + + "'partition.mark-done-action'='done-partition,success-file,mark-event'" + ")") .await(); @@ -1164,7 +1171,28 @@ public void testMarkDone() throws Exception { tEnv.executeSql("INSERT INTO mark_done_t1 VALUES (5, '20240501')").await(); + // check event. + Catalog catalog = + ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog(); + Identifier identifier = new Identifier("test_db", "mark_done_t2"); + Table table = catalog.getTable(identifier); + assertThat(table instanceof FileStoreTable); + FileStoreTable fileStoreTable = (FileStoreTable) table; + MetastoreClient.Factory metastoreClientFactory = + fileStoreTable.catalogEnvironment().metastoreClientFactory(); + HiveMetastoreClient metastoreClient = (HiveMetastoreClient) metastoreClientFactory.create(); + IMetaStoreClient hmsClient = metastoreClient.client(); + Map partitionSpec = Collections.singletonMap("dt", "20240501"); + // LOAD_DONE event is not marked by now. + assertFalse( + hmsClient.isPartitionMarkedForEvent( + "test_db", "mark_done_t2", partitionSpec, PartitionEventType.LOAD_DONE)); + Thread.sleep(10 * 1000); + // after sleep, LOAD_DONE event should be marked. + assertTrue( + hmsClient.isPartitionMarkedForEvent( + "test_db", "mark_done_t2", partitionSpec, PartitionEventType.LOAD_DONE)); assertThat(hiveShell.executeQuery("SHOW PARTITIONS mark_done_t2")) .containsExactlyInAnyOrder("dt=20240501", "dt=20240501.done");