From 671b3adb7e7f346670ba35cd6a3ec16a91b91f1f Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Thu, 26 Sep 2024 20:27:39 +0800 Subject: [PATCH] fix test failed by adding support for flink 1.18 --- .../procedure/ExpirePartitionsProcedure.java | 111 ++++++++++++++++++ .../ProcedurePositionalArgumentsITCase.java | 5 +- .../ExpirePartitionsProcedureITCase.java | 2 +- 3 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java new file mode 100644 index 000000000000..c0e5a65c49ef --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.FileStore; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.operation.PartitionExpire; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.TimeUtils; + +import org.apache.flink.table.procedure.ProcedureContext; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; + +/** A procedure to expire partitions. */ +public class ExpirePartitionsProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "expire_partitions"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String expirationTime, + String timestampFormatter, + String timestampPattern, + String expireStrategy) + throws Catalog.TableNotExistException { + return call( + procedureContext, + tableId, + expirationTime, + timestampFormatter, + timestampPattern, + expireStrategy, + null); + } + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String expirationTime, + String timestampFormatter, + String timestampPattern, + String expireStrategy, + Integer maxExpires) + throws Catalog.TableNotExistException { + FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); + FileStore fileStore = fileStoreTable.store(); + Map map = new HashMap<>(); + map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy); + map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); + map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); + + PartitionExpire partitionExpire = + new PartitionExpire( + TimeUtils.parseDuration(expirationTime), + Duration.ofMillis(0L), + createPartitionExpireStrategy( + CoreOptions.fromMap(map), fileStore.partitionType()), + fileStore.newScan(), + fileStore.newCommit(""), + Optional.ofNullable( + fileStoreTable + .catalogEnvironment() + .metastoreClientFactory()) + .map(MetastoreClient.Factory::create) + .orElse(null)); + if (maxExpires != null) { + partitionExpire.withMaxExpires(maxExpires); + } + List> expired = partitionExpire.expire(Long.MAX_VALUE); + return expired == null || expired.isEmpty() + ? new String[] {"No expired partitions."} + : expired.stream() + .map( + x -> { + String r = x.toString(); + return r.substring(1, r.length() - 1); + }) + .toArray(String[]::new); + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java index da2eaef0bd49..a48de667bf3d 100644 --- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java @@ -244,7 +244,10 @@ public void testExpirePartitionsProcedure() throws Exception { sql("INSERT INTO T VALUES ('1', '2024-06-01')"); sql("INSERT INTO T VALUES ('2', '9024-06-01')"); assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01"); - sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')"); + assertThat( + sql( + "CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')")) + .containsExactly(Row.of("dt=2024-06-01")); assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01"); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java index 243c71c5821a..bc2e84902f35 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java @@ -399,7 +399,7 @@ public void testSortAndLimitExpirePartition() throws Exception { "dt=2024-06-02, hm=02:00"); assertThat(read(table, consumerReadResult)) - .containsExactly("4:2024-06-03:01:00", "Never-expire:9999-09-09:99:99"); + .containsExactlyInAnyOrder("4:2024-06-03:01:00", "Never-expire:9999-09-09:99:99"); } /** Return a list of expired partitions. */