Skip to content

Commit

Permalink
[flink] Add expire snapshots action. (apache#4091)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Aug 29, 2024
1 parent 98071a1 commit abb72db
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Expire snapshots action for Flink. */
public class ExpireSnapshotsAction extends ActionBase {

private final String identifier;
private final Integer retainMax;
private final Integer retainMin;
private final String olderThan;
private final Integer maxDeletes;

public ExpireSnapshotsAction(
String warehouse,
String identifier,
Map<String, String> catalogConfig,
Integer retainMax,
Integer retainMin,
String olderThan,
Integer maxDeletes) {
super(warehouse, catalogConfig);
this.identifier = identifier;
this.retainMax = retainMax;
this.retainMin = retainMin;
this.olderThan = olderThan;
this.maxDeletes = maxDeletes;
}

public void run() throws Exception {
ExpireSnapshotsProcedure expireSnapshotsProcedure = new ExpireSnapshotsProcedure();
expireSnapshotsProcedure.withCatalog(catalog);
expireSnapshotsProcedure.call(
new DefaultProcedureContext(env),
identifier,
retainMax,
retainMin,
olderThan,
maxDeletes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link ExpireSnapshotsAction}. */
public class ExpireSnapshotsActionFactory implements ActionFactory {

public static final String IDENTIFIER = "expire_snapshots";

private static final String IDENTIFIER_KEY = "identifier";

private static final String RETAIN_MAX = "retain_max";
private static final String RETAIN_MIN = "retain_min";
private static final String OLDER_THAN = "older_than";
private static final String MAX_DELETES = "max_deletes";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String identifier = params.get(IDENTIFIER_KEY);

Integer retainMax =
params.has(RETAIN_MAX) ? Integer.parseInt(params.get(RETAIN_MAX)) : null;
Integer retainMin =
params.has(RETAIN_MIN) ? Integer.parseInt(params.get(RETAIN_MIN)) : null;
String olderThan = params.has(OLDER_THAN) ? params.get(OLDER_THAN) : null;
Integer maxDeletes =
params.has(MAX_DELETES) ? Integer.parseInt(params.get(MAX_DELETES)) : null;

ExpireSnapshotsAction action =
new ExpireSnapshotsAction(
warehouse,
identifier,
catalogConfig,
retainMax,
retainMin,
olderThan,
maxDeletes);

return Optional.of(action);
}

@Override
public void printHelp() {
System.out.println("Action \"expire_snapshots\" expire the target snapshots.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" expire_snapshots --warehouse <warehouse_path> --identifier <database.table> --retain_max <max> --retain_min <min> --older_than <older_than> --max_delete <max_delete>");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ org.apache.paimon.flink.action.DeleteBranchActionFactory
org.apache.paimon.flink.action.FastForwardActionFactory
org.apache.paimon.flink.action.RepairActionFactory
org.apache.paimon.flink.action.RewriteFileIndexActionFactory
org.apache.paimon.flink.action.ExpireSnapshotsActionFactory

### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.paimon.flink.procedure;

import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.ExpireSnapshotsAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand Down Expand Up @@ -73,9 +77,93 @@ public void testExpireSnapshotsProcedure() throws Exception {
checkSnapshots(snapshotManager, 6, 6);
}

@Test
public void testExpireSnapshotsAction() throws Exception {
sql(
"CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT)"
+ " WITH ( 'num-sorted-run.compaction-trigger' = '9999' )");
FileStoreTable table = paimonTable("word_count");
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().streamingMode().build();
SnapshotManager snapshotManager = table.snapshotManager();

// initially prepare 6 snapshots, expected snapshots (1, 2, 3, 4, 5, 6)
for (int i = 0; i < 6; ++i) {
sql("INSERT INTO word_count VALUES ('" + String.valueOf(i) + "', " + i + ")");
}
checkSnapshots(snapshotManager, 1, 6);

// retain_max => 5, expected snapshots (2, 3, 4, 5, 6)
createAction(
ExpireSnapshotsAction.class,
"expire_snapshots",
"--warehouse",
path,
"--identifier",
"default.word_count",
"--retain_max",
"5")
.withStreamExecutionEnvironment(env)
.run();
checkSnapshots(snapshotManager, 2, 6);

// older_than => timestamp of snapshot 6, max_deletes => 1, expected snapshots (3, 4, 5, 6)
Timestamp ts6 = new Timestamp(snapshotManager.latestSnapshot().timeMillis());
createAction(
ExpireSnapshotsAction.class,
"expire_snapshots",
"--warehouse",
path,
"--identifier",
"default.word_count",
"--older_than",
ts6.toString(),
"--max_deletes",
"1")
.withStreamExecutionEnvironment(env)
.run();
checkSnapshots(snapshotManager, 3, 6);

createAction(
ExpireSnapshotsAction.class,
"expire_snapshots",
"--warehouse",
path,
"--identifier",
"default.word_count",
"--older_than",
ts6.toString(),
"--retain_min",
"3")
.withStreamExecutionEnvironment(env)
.run();
checkSnapshots(snapshotManager, 4, 6);

// older_than => timestamp of snapshot 6, expected snapshots (6)
createAction(
ExpireSnapshotsAction.class,
"expire_snapshots",
"--warehouse",
path,
"--identifier",
"default.word_count",
"--older_than",
ts6.toString())
.withStreamExecutionEnvironment(env)
.run();
checkSnapshots(snapshotManager, 6, 6);
}

private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException {
assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
assertThat(sm.latestSnapshotId()).isEqualTo(latest);
}

private <T extends ActionBase> T createAction(Class<T> clazz, String... args) {
return ActionFactory.createAction(args)
.filter(clazz::isInstance)
.map(clazz::cast)
.orElseThrow(() -> new RuntimeException("Failed to create action"));
}
}

0 comments on commit abb72db

Please sign in to comment.