Skip to content

Commit

Permalink
[core] Introduce TTL for tag
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 committed Apr 7, 2024
1 parent 7ccc7dd commit 486f993
Show file tree
Hide file tree
Showing 30 changed files with 794 additions and 117 deletions.
7 changes: 7 additions & 0 deletions docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ You can create a tag with given name and snapshot ID.
--database <database-name> \
--table <table-name> \
--tag_name <tag-name> \
--time_retained <time-retained> \
[--snapshot <snapshot_id>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
Expand All @@ -126,6 +127,7 @@ public class CreateTag {
public static void main(String[] args) {
Table table = ...;
table.createTag("my-tag", 1);
table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1);
}
}
```
Expand All @@ -138,6 +140,11 @@ Run the following sql:
CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2);
```
To create a tag with retained 1 day, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', snapshot => 2);
```
To create a tag based on the latest snapshot id, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag');
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,12 @@
<td>Integer</td>
<td>The maximum number of tags to retain.</td>
</tr>
<tr>
<td><h5>tag.time-retained</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum time retained for all tags.</td>
</tr>
<tr>
<td><h5>tag.period-formatter</h5></td>
<td style="word-wrap: break-word;">with_dashes</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class CoreOptions implements Serializable {

public static final String DISTINCT = "distinct";

public static final Duration DEFAULT_TAG_TIME_RETAINED = Duration.ofDays(3650000);

public static final ConfigOption<Integer> BUCKET =
key("bucket")
.intType()
Expand Down Expand Up @@ -989,6 +991,12 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The maximum number of tags to retain.");

public static final ConfigOption<Duration> TAG_TIME_RETAINED =
key("tag.time-retained")
.durationType()
.defaultValue(DEFAULT_TAG_TIME_RETAINED)
.withDescription("The maximum time retained for all tags.");

public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
Expand Down Expand Up @@ -1617,6 +1625,10 @@ public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
}

public Duration tagTimeRetained() {
return options.get(TAG_TIME_RETAINED);
}

public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
Expand Down Expand Up @@ -249,8 +249,8 @@ public PartitionExpire newPartitionExpire(String commitUser) {

@Override
@Nullable
public TagAutoCreation newTagCreationManager() {
return TagAutoCreation.create(
public TagAutoManager newTagCreationManager() {
return TagAutoManager.create(
options,
snapshotManager(),
newTagManager(),
Expand Down
4 changes: 2 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -93,7 +93,7 @@ public interface FileStore<T> extends Serializable {
PartitionExpire newPartitionExpire(String commitUser);

@Nullable
TagAutoCreation newTagCreationManager();
TagAutoManager newTagCreationManager();

ServiceManager newServiceManager();

Expand Down
34 changes: 17 additions & 17 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,37 +93,37 @@ public class Snapshot {
// null for paimon <= 0.2
@JsonProperty(FIELD_VERSION)
@Nullable
private final Integer version;
protected final Integer version;

@JsonProperty(FIELD_ID)
private final long id;
protected final long id;

@JsonProperty(FIELD_SCHEMA_ID)
private final long schemaId;
protected final long schemaId;

// a manifest list recording all changes from the previous snapshots
@JsonProperty(FIELD_BASE_MANIFEST_LIST)
private final String baseManifestList;
protected final String baseManifestList;

// a manifest list recording all new changes occurred in this snapshot
// for faster expire and streaming reads
@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
private final String deltaManifestList;
protected final String deltaManifestList;

// a manifest list recording all changelog produced in this snapshot
// null if no changelog is produced, or for paimon <= 0.2
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
private final String changelogManifestList;
protected final String changelogManifestList;

// a manifest recording all index files of this table
// null if no index file
@JsonProperty(FIELD_INDEX_MANIFEST)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final String indexManifest;
protected final String indexManifest;

@JsonProperty(FIELD_COMMIT_USER)
private final String commitUser;
protected final String commitUser;

// Mainly for snapshot deduplication.
//
Expand All @@ -133,49 +133,49 @@ public class Snapshot {
// If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be
// committed before snapshot B, and thus snapshot A must contain older records than snapshot B.
@JsonProperty(FIELD_COMMIT_IDENTIFIER)
private final long commitIdentifier;
protected final long commitIdentifier;

@JsonProperty(FIELD_COMMIT_KIND)
private final CommitKind commitKind;
protected final CommitKind commitKind;

@JsonProperty(FIELD_TIME_MILLIS)
private final long timeMillis;
protected final long timeMillis;

@JsonProperty(FIELD_LOG_OFFSETS)
private final Map<Integer, Long> logOffsets;
protected final Map<Integer, Long> logOffsets;

// record count of all changes occurred in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_TOTAL_RECORD_COUNT)
@Nullable
private final Long totalRecordCount;
protected final Long totalRecordCount;

// record count of all new changes occurred in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_DELTA_RECORD_COUNT)
@Nullable
private final Long deltaRecordCount;
protected final Long deltaRecordCount;

// record count of all changelog produced in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT)
@Nullable
private final Long changelogRecordCount;
protected final Long changelogRecordCount;

// watermark for input records
// null for paimon <= 0.3
// null if there is no watermark in new committing, and the previous snapshot does not have a
// watermark
@JsonProperty(FIELD_WATERMARK)
@Nullable
private final Long watermark;
protected final Long watermark;

// stats file name for statistics of this table
// null if no stats file
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_STATISTICS)
@Nullable
private final String statistics;
protected final String statistics;

public Snapshot(
long id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -443,8 +444,7 @@ public void rollbackTo(long snapshotId) {
rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
}

@Override
public void createTag(String tagName, long fromSnapshotId) {
public Snapshot createTagInternal(long fromSnapshotId) {
SnapshotManager snapshotManager = snapshotManager();
Snapshot snapshot = null;
if (snapshotManager.snapshotExists(fromSnapshotId)) {
Expand All @@ -464,18 +464,35 @@ public void createTag(String tagName, long fromSnapshotId) {
snapshot != null,
"Cannot create tag because given snapshot #%s doesn't exist.",
fromSnapshotId);
createTag(tagName, snapshot);
return snapshot;
}

@Override
public void createTag(String tagName, long fromSnapshotId) {
createTag(tagName, coreOptions().tagTimeRetained(), createTagInternal(fromSnapshotId));
}

@Override
public void createTag(String tagName, Duration timeRetained, long fromSnapshotId) {
createTag(tagName, timeRetained, createTagInternal(fromSnapshotId));
}

@Override
public void createTag(String tagName) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, latestSnapshot);
createTag(tagName, coreOptions().tagTimeRetained(), latestSnapshot);
}

@Override
public void createTag(String tagName, Duration timeRetained) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, timeRetained, latestSnapshot);
}

private void createTag(String tagName, Snapshot fromSnapshot) {
tagManager().createTag(fromSnapshot, tagName, store().createTagCallbacks());
private void createTag(String tagName, Duration timeRetained, Snapshot fromSnapshot) {
tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.paimon.table;

import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.InnerStreamTableScan;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,6 +111,14 @@ default void createTag(String tagName, long fromSnapshotId) {
this.getClass().getSimpleName()));
}

@Experimental
default void createTag(String tagName, Duration timeRetained, long fromSnapshotId) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
this.getClass().getSimpleName()));
}

@Override
default void createTag(String tagName) {
throw new UnsupportedOperationException(
Expand All @@ -117,6 +127,14 @@ default void createTag(String tagName) {
this.getClass().getSimpleName()));
}

@Experimental
default void createTag(String tagName, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
this.getClass().getSimpleName()));
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException(
Expand Down
7 changes: 7 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.types.RowType;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -76,10 +77,16 @@ public interface Table extends Serializable {
@Experimental
void createTag(String tagName, long fromSnapshotId);

@Experimental
void createTag(String tagName, Duration timeRetained, long fromSnapshotId);

/** Create a tag from latest snapshot. */
@Experimental
void createTag(String tagName);

@Experimental
void createTag(String tagName, Duration timeRetained);

/** Delete a tag by name. */
@Experimental
void deleteTag(String tagName);
Expand Down
Loading

0 comments on commit 486f993

Please sign in to comment.