diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java new file mode 100644 index 000000000000..0a0972e3c839 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.procedure.ExpireTagsProcedure; + +import org.apache.flink.table.procedure.DefaultProcedureContext; + +import java.util.Map; + +/** Expire tags action for Flink. */ +public class ExpireTagsAction extends ActionBase { + + private final String table; + private final String expirationTime; + + public ExpireTagsAction( + String warehouse, + String table, + String expirationTime, + Map catalogConfig) { + super(warehouse, catalogConfig); + this.table = table; + this.expirationTime = expirationTime; + } + + @Override + public void run() throws Exception { + ExpireTagsProcedure expireTagsProcedure = new ExpireTagsProcedure(); + expireTagsProcedure.withCatalog(catalog); + expireTagsProcedure.call(new DefaultProcedureContext(env), table, expirationTime); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java new file mode 100644 index 000000000000..4fa4459a0dbe --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create {@link ExpireTagsAction}. */ +public class ExpireTagsActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "expire_tags"; + + private static final String EXPIRATION_TIME = "expiration_time"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + String warehouse = params.get(WAREHOUSE); + String table = params.get(TABLE); + String expirationTime = params.get(EXPIRATION_TIME); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + + ExpireTagsAction expireTagsAction = + new ExpireTagsAction(warehouse, table, expirationTime, catalogConfig); + return Optional.of(expireTagsAction); + } + + @Override + public void printHelp() { + System.out.println("Action \"expire_tags\" expire tags by time."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " expire_tags --warehouse " + + "--table " + + "[--expiration_time ]"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 268cb4f86cd5..416476e6b8a8 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -25,6 +25,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory org.apache.paimon.flink.action.CreateTagFromTimestampActionFactory org.apache.paimon.flink.action.CreateTagFromWatermarkActionFactory org.apache.paimon.flink.action.DeleteTagActionFactory +org.apache.paimon.flink.action.ExpireTagsActionFactory org.apache.paimon.flink.action.ResetConsumerActionFactory org.apache.paimon.flink.action.MigrateTableActionFactory org.apache.paimon.flink.action.MigrateFileActionFactory diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionITTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionITTest.java new file mode 100644 index 000000000000..88572a6240bb --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionITTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.table.FileStoreTable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; + +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for {@link ExpireTagsAction}. */ +public class ExpireTagsActionITTest extends ActionITCaseBase { + + @BeforeEach + public void setUp() { + init(warehouse); + } + + @Test + public void testExpireTags() throws Exception { + bEnv.executeSql( + "CREATE TABLE T (id STRING, name STRING," + + " PRIMARY KEY (id) NOT ENFORCED)" + + " WITH ('bucket'='1', 'write-only'='true')"); + + FileStoreTable table = getFileStoreTable("T"); + + // generate 5 snapshots + for (int i = 1; i <= 5; i++) { + bEnv.executeSql("INSERT INTO T VALUES ('" + i + "', '" + i + "')").await(); + } + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(5); + + bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-1', 1)").await(); + bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-2', 2, '1h')").await(); + bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-3', 3, '1h')").await(); + assertThat(table.tagManager().tags().size()).isEqualTo(3); + + createAction( + ExpireTagsAction.class, + "expire_tags", + "--warehouse", + warehouse, + "--table", + database + ".T") + .run(); + // no tags expired + assertThat(table.tagManager().tags().size()).isEqualTo(3); + + bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-4', 4, '1s')").await(); + bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-5', 5, '1s')").await(); + assertThat(table.tagManager().tags().size()).isEqualTo(5); + + Thread.sleep(2000); + createAction( + ExpireTagsAction.class, + "expire_tags", + "--warehouse", + warehouse, + "--table", + database + ".T") + .run(); + // tag-4,tag-5 expires + assertThat(table.tagManager().tags().size()).isEqualTo(3); + assertThat(table.tagManager().tagExists("tag-4")).isFalse(); + assertThat(table.tagManager().tagExists("tag-5")).isFalse(); + + // tag-3 as the base expiration_time + LocalDateTime expirationTime = table.tagManager().tag("tag-3").getTagCreateTime(); + java.sql.Timestamp timestamp = + new java.sql.Timestamp( + Timestamp.fromLocalDateTime(expirationTime).getMillisecond()); + createAction( + ExpireTagsAction.class, + "expire_tags", + "--warehouse", + warehouse, + "--table", + database + ".T", + "--expiration_time", + timestamp.toString()) + .run(); + // tag-2 expires + assertThat(table.tagManager().tags().size()).isEqualTo(2); + assertThat(table.tagManager().tagExists("tag-2")).isFalse(); + } +}