Skip to content

Commit

Permalink
add flink action
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Sep 10, 2024
1 parent be3cbe0 commit 43e512a
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.ExpireTagsProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Expire tags action for Flink. */
public class ExpireTagsAction extends ActionBase {

private final String table;
private final String expirationTime;

public ExpireTagsAction(
String warehouse,
String table,
String expirationTime,
Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
this.table = table;
this.expirationTime = expirationTime;
}

@Override
public void run() throws Exception {
ExpireTagsProcedure expireTagsProcedure = new ExpireTagsProcedure();
expireTagsProcedure.withCatalog(catalog);
expireTagsProcedure.call(new DefaultProcedureContext(env), table, expirationTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link ExpireTagsAction}. */
public class ExpireTagsActionFactory implements ActionFactory {

public static final String IDENTIFIER = "expire_tags";

private static final String EXPIRATION_TIME = "expiration_time";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
String table = params.get(TABLE);
String expirationTime = params.get(EXPIRATION_TIME);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

ExpireTagsAction expireTagsAction =
new ExpireTagsAction(warehouse, table, expirationTime, catalogConfig);
return Optional.of(expireTagsAction);
}

@Override
public void printHelp() {
System.out.println("Action \"expire_tags\" expire tags by time.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" expire_tags --warehouse <warehouse_path> "
+ "--table <database.table_name> "
+ "[--expiration_time <expiration_time>]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.CreateTagFromTimestampActionFactory
org.apache.paimon.flink.action.CreateTagFromWatermarkActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ExpireTagsActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
org.apache.paimon.flink.action.MigrateFileActionFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.data.Timestamp;
import org.apache.paimon.table.FileStoreTable;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;

import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for {@link ExpireTagsAction}. */
public class ExpireTagsActionITTest extends ActionITCaseBase {

@BeforeEach
public void setUp() {
init(warehouse);
}

@Test
public void testExpireTags() throws Exception {
bEnv.executeSql(
"CREATE TABLE T (id STRING, name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED)"
+ " WITH ('bucket'='1', 'write-only'='true')");

FileStoreTable table = getFileStoreTable("T");

// generate 5 snapshots
for (int i = 1; i <= 5; i++) {
bEnv.executeSql("INSERT INTO T VALUES ('" + i + "', '" + i + "')").await();
}
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(5);

bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-1', 1)").await();
bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-2', 2, '1h')").await();
bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-3', 3, '1h')").await();
assertThat(table.tagManager().tags().size()).isEqualTo(3);

createAction(
ExpireTagsAction.class,
"expire_tags",
"--warehouse",
warehouse,
"--table",
database + ".T")
.run();
// no tags expired
assertThat(table.tagManager().tags().size()).isEqualTo(3);

bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-4', 4, '1s')").await();
bEnv.executeSql("CALL sys.create_tag('default.T', 'tag-5', 5, '1s')").await();
assertThat(table.tagManager().tags().size()).isEqualTo(5);

Thread.sleep(2000);
createAction(
ExpireTagsAction.class,
"expire_tags",
"--warehouse",
warehouse,
"--table",
database + ".T")
.run();
// tag-4,tag-5 expires
assertThat(table.tagManager().tags().size()).isEqualTo(3);
assertThat(table.tagManager().tagExists("tag-4")).isFalse();
assertThat(table.tagManager().tagExists("tag-5")).isFalse();

// tag-3 as the base expiration_time
LocalDateTime expirationTime = table.tagManager().tag("tag-3").getTagCreateTime();
java.sql.Timestamp timestamp =
new java.sql.Timestamp(
Timestamp.fromLocalDateTime(expirationTime).getMillisecond());
createAction(
ExpireTagsAction.class,
"expire_tags",
"--warehouse",
warehouse,
"--table",
database + ".T",
"--expiration_time",
timestamp.toString())
.run();
// tag-2 expires
assertThat(table.tagManager().tags().size()).isEqualTo(2);
assertThat(table.tagManager().tagExists("tag-2")).isFalse();
}
}

0 comments on commit 43e512a

Please sign in to comment.