Skip to content

Commit

Permalink
for exipre_snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Dec 17, 2024
1 parent 6bc9413 commit f0f5b47
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.paimon.utils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.ExpireConfig;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;

/** Utils for procedure. */
public class ProcedureUtils {
Expand All @@ -34,6 +38,9 @@ public static Map<String, String> fillInPartitionOptions(
Integer maxExpires,
String options) {
Map<String, String> dynamicOptions = new HashMap<>();
if (!StringUtils.isNullOrWhitespaceOnly(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}
if (!StringUtils.isNullOrWhitespaceOnly(expireStrategy)) {
dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
}
Expand All @@ -50,9 +57,31 @@ public static Map<String, String> fillInPartitionOptions(
dynamicOptions.put(
CoreOptions.PARTITION_EXPIRATION_MAX_NUM.key(), String.valueOf(maxExpires));
}
if (!StringUtils.isNullOrWhitespaceOnly(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}
return dynamicOptions;
}

public static ExpireConfig.Builder fillInSnapshotOptions(
CoreOptions tableOptions,
Integer retainMax,
Integer retainMin,
String olderThanStr,
Integer maxDeletes) {

ExpireConfig.Builder builder = ExpireConfig.builder();
builder.snapshotRetainMax(
Optional.ofNullable(retainMax).orElse(tableOptions.snapshotNumRetainMax()));
builder.snapshotRetainMin(
Optional.ofNullable(retainMin).orElse(tableOptions.snapshotNumRetainMin()));
builder.snapshotTimeRetain(tableOptions.snapshotTimeRetain());
if (!StringUtils.isNullOrWhitespaceOnly(olderThanStr)) {
long olderThanMills =
DateTimeUtils.parseTimestampData(olderThanStr, 3, TimeZone.getDefault())
.getMillisecond();
builder.snapshotTimeRetain(
Duration.ofMillis(System.currentTimeMillis() - olderThanMills));
}
builder.snapshotMaxDeletes(
Optional.ofNullable(maxDeletes).orElse(tableOptions.snapshotExpireLimit()));
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ExpireSnapshotsAction extends ActionBase {
private final Integer retainMin;
private final String olderThan;
private final Integer maxDeletes;
private final String options;

public ExpireSnapshotsAction(
String warehouse,
Expand All @@ -40,13 +41,15 @@ public ExpireSnapshotsAction(
Integer retainMax,
Integer retainMin,
String olderThan,
Integer maxDeletes) {
Integer maxDeletes,
String options) {
super(warehouse, catalogConfig);
this.identifier = identifier;
this.retainMax = retainMax;
this.retainMin = retainMin;
this.olderThan = olderThan;
this.maxDeletes = maxDeletes;
this.options = options;
}

public void run() throws Exception {
Expand All @@ -58,6 +61,7 @@ public void run() throws Exception {
retainMax,
retainMin,
olderThan,
maxDeletes);
maxDeletes,
options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ExpireSnapshotsActionFactory implements ActionFactory {
private static final String RETAIN_MIN = "retain_min";
private static final String OLDER_THAN = "older_than";
private static final String MAX_DELETES = "max_deletes";
private static final String OPTIONS = "options";

@Override
public String identifier() {
Expand All @@ -51,6 +52,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
String olderThan = params.has(OLDER_THAN) ? params.get(OLDER_THAN) : null;
Integer maxDeletes =
params.has(MAX_DELETES) ? Integer.parseInt(params.get(MAX_DELETES)) : null;
String options = params.has(OPTIONS) ? params.get(OPTIONS) : null;

ExpireSnapshotsAction action =
new ExpireSnapshotsAction(
Expand All @@ -60,7 +62,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
retainMax,
retainMin,
olderThan,
maxDeletes);
maxDeletes,
options);

return Optional.of(action);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/** Compact manifest file to reduce deleted manifest entries. */
public class CompactManifestProcedure extends ProcedureBase {
Expand All @@ -39,15 +42,21 @@ public String identifier() {
return "compact_manifest";
}

@ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))})
public String[] call(ProcedureContext procedureContext, String tableId) throws Exception {
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true)
})
public String[] call(ProcedureContext procedureContext, String tableId, String options)
throws Exception {

FileStoreTable table =
(FileStoreTable)
table(tableId)
.copy(
Collections.singletonMap(
CoreOptions.COMMIT_USER_PREFIX.key(), COMMIT_USER));
FileStoreTable table = (FileStoreTable) table(tableId);
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(CoreOptions.COMMIT_USER_PREFIX.key(), COMMIT_USER);
if (!StringUtils.isNullOrWhitespaceOnly(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}
table = table.copy(dynamicOptions);

try (FileStoreCommit commit =
table.store()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.ProcedureUtils;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.time.Duration;
import java.util.TimeZone;
import java.util.HashMap;
import java.util.Map;

/** A procedure to expire snapshots. */
public class ExpireSnapshotsProcedure extends ProcedureBase {
Expand Down Expand Up @@ -57,35 +62,30 @@ public String identifier() {
@ArgumentHint(
name = "max_deletes",
type = @DataTypeHint("INTEGER"),
isOptional = true)
isOptional = true),
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String tableId,
Integer retainMax,
Integer retainMin,
String olderThanStr,
Integer maxDeletes)
Integer maxDeletes,
String options)
throws Catalog.TableNotExistException {
ExpireSnapshots expireSnapshots = table(tableId).newExpireSnapshots();
ExpireConfig.Builder builder = ExpireConfig.builder();
if (retainMax != null) {
builder.snapshotRetainMax(retainMax);
}
if (retainMin != null) {
builder.snapshotRetainMin(retainMin);
}
if (olderThanStr != null) {
builder.snapshotTimeRetain(
Duration.ofMillis(
System.currentTimeMillis()
- DateTimeUtils.parseTimestampData(
olderThanStr, 3, TimeZone.getDefault())
.getMillisecond()));
}
if (maxDeletes != null) {
builder.snapshotMaxDeletes(maxDeletes);
Table table = table(tableId);
Map<String, String> dynamicOptions = new HashMap<>();
if (!StringUtils.isNullOrWhitespaceOnly(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}
table = table.copy(dynamicOptions);
ExpireSnapshots expireSnapshots = table.newExpireSnapshots();

CoreOptions tableOptions = ((FileStoreTable) table).store().options();
ExpireConfig.Builder builder =
ProcedureUtils.fillInSnapshotOptions(
tableOptions, retainMax, retainMin, olderThanStr, maxDeletes);
return new String[] {expireSnapshots.config(builder.build()).expire() + ""};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class ExpireSnapshotsProcedureITCase extends CatalogITCaseBase {
public void testExpireSnapshotsProcedure() throws Exception {
sql(
"CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT)"
+ " WITH ( 'num-sorted-run.compaction-trigger' = '9999' )");
+ " WITH ( 'num-sorted-run.compaction-trigger' = '9999',"
+ "'write-only' = 'true', 'snapshot.num-retained.min' = '1')");
FileStoreTable table = paimonTable("word_count");
SnapshotManager snapshotManager = table.snapshotManager();

Expand Down Expand Up @@ -81,7 +82,9 @@ public void testExpireSnapshotsProcedure() throws Exception {
public void testExpireSnapshotsAction() throws Exception {
sql(
"CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT)"
+ " WITH ( 'num-sorted-run.compaction-trigger' = '9999' )");
+ " WITH ( 'num-sorted-run.compaction-trigger' = '9999',"
+ "'write-only' = 'true', 'snapshot.num-retained.min' = '1')");

FileStoreTable table = paimonTable("word_count");
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().streamingMode().build();
Expand Down Expand Up @@ -154,6 +157,35 @@ public void testExpireSnapshotsAction() throws Exception {
checkSnapshots(snapshotManager, 6, 6);
}

@Test
public void testLoadTablePropsFirstAndOptions() throws Exception {
sql(
"CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT)"
+ " WITH ( 'num-sorted-run.compaction-trigger' = '9999',"
+ "'write-only' = 'true', 'snapshot.num-retained.min' = '1', 'snapshot.num-retained.max' = '5')");
FileStoreTable table = paimonTable("word_count");
SnapshotManager snapshotManager = table.snapshotManager();

// initially prepare 6 snapshots, expected snapshots (1, 2, 3, 4, 5, 6)
for (int i = 0; i < 6; ++i) {
sql("INSERT INTO word_count VALUES ('" + String.valueOf(i) + "', " + i + ")");
}
checkSnapshots(snapshotManager, 1, 6);

// snapshot.num-retained.max is 5, expected snapshots (2, 3, 4, 5, 6)
sql("CALL sys.expire_snapshots(`table` => 'default.word_count')");
checkSnapshots(snapshotManager, 2, 6);

// older_than => timestamp of snapshot 6, snapshot.expire.limit => 1, expected snapshots (3,
// 4, 5, 6)
Timestamp ts6 = new Timestamp(snapshotManager.latestSnapshot().timeMillis());
sql(
"CALL sys.expire_snapshots(`table` => 'default.word_count', older_than => '"
+ ts6.toString()
+ "', options => 'snapshot.expire.limit=1')");
checkSnapshots(snapshotManager, 3, 6);
}

private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException {
assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.StringUtils;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
Expand All @@ -29,6 +31,9 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.HashMap;
import java.util.Map;

import static org.apache.spark.sql.types.DataTypes.StringType;

/**
Expand All @@ -41,7 +46,10 @@
public class CompactManifestProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {ProcedureParameter.required("table", StringType)};
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("options", StringType)
};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand All @@ -67,7 +75,14 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {

Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String options = args.isNullAt(1) ? null : args.getString(1);

FileStoreTable table = (FileStoreTable) loadSparkTable(tableIdent).getTable();
Map<String, String> dynamicOptions = new HashMap<>();
if (!StringUtils.isNullOrWhitespaceOnly(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}
table = table.copy(dynamicOptions);

try (FileStoreCommit commit =
table.store()
Expand Down
Loading

0 comments on commit f0f5b47

Please sign in to comment.