Skip to content

Commit

Permalink
[core] Introduce TTL for tag
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored and hongli committed Apr 15, 2024
1 parent 0d40e68 commit 91a5020
Show file tree
Hide file tree
Showing 36 changed files with 1,353 additions and 152 deletions.
7 changes: 7 additions & 0 deletions docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ You can create a tag with given name and snapshot ID.
--database <database-name> \
--table <table-name> \
--tag_name <tag-name> \
--time_retained <time-retained> \
[--snapshot <snapshot_id>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
Expand All @@ -126,6 +127,7 @@ public class CreateTag {
public static void main(String[] args) {
Table table = ...;
table.createTag("my-tag", 1);
table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1);
}
}
```
Expand All @@ -138,6 +140,11 @@ Run the following sql:
CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2);
```
To create a tag with retained 1 day, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', snapshot => 2);
```
To create a tag based on the latest snapshot id, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag');
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,12 @@
<td><p>Enum</p></td>
<td>What frequency is used to generate tags.<br /><br />Possible values:<ul><li>"daily": Generate a tag every day.</li><li>"hourly": Generate a tag every hour.</li><li>"two-hours": Generate a tag every two hours.</li></ul></td>
</tr>
<tr>
<td><h5>tag.default-time-retained</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum default time retained for all tags.</td>
</tr>
<tr>
<td><h5>tag.num-retained-max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,12 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The maximum number of tags to retain.");

public static final ConfigOption<Duration> TAG_DEFAULT_TIME_RETAINED =
key("tag.default-time-retained")
.durationType()
.noDefaultValue()
.withDescription("The maximum default time retained for all tags.");

public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
Expand Down Expand Up @@ -1630,6 +1636,10 @@ public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
}

public Duration tagDefaultTimeRetained() {
return options.get(TAG_DEFAULT_TIME_RETAINED);
}

public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
Expand Down Expand Up @@ -249,8 +249,8 @@ public PartitionExpire newPartitionExpire(String commitUser) {

@Override
@Nullable
public TagAutoCreation newTagCreationManager() {
return TagAutoCreation.create(
public TagAutoManager newTagCreationManager() {
return TagAutoManager.create(
options,
snapshotManager(),
newTagManager(),
Expand Down
4 changes: 2 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -93,7 +93,7 @@ public interface FileStore<T> extends Serializable {
PartitionExpire newPartitionExpire(String commitUser);

@Nullable
TagAutoCreation newTagCreationManager();
TagAutoManager newTagCreationManager();

ServiceManager newServiceManager();

Expand Down
34 changes: 17 additions & 17 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,37 +95,37 @@ public class Snapshot {
// null for paimon <= 0.2
@JsonProperty(FIELD_VERSION)
@Nullable
private final Integer version;
protected final Integer version;

@JsonProperty(FIELD_ID)
private final long id;
protected final long id;

@JsonProperty(FIELD_SCHEMA_ID)
private final long schemaId;
protected final long schemaId;

// a manifest list recording all changes from the previous snapshots
@JsonProperty(FIELD_BASE_MANIFEST_LIST)
private final String baseManifestList;
protected final String baseManifestList;

// a manifest list recording all new changes occurred in this snapshot
// for faster expire and streaming reads
@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
private final String deltaManifestList;
protected final String deltaManifestList;

// a manifest list recording all changelog produced in this snapshot
// null if no changelog is produced, or for paimon <= 0.2
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
private final String changelogManifestList;
protected final String changelogManifestList;

// a manifest recording all index files of this table
// null if no index file
@JsonProperty(FIELD_INDEX_MANIFEST)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final String indexManifest;
protected final String indexManifest;

@JsonProperty(FIELD_COMMIT_USER)
private final String commitUser;
protected final String commitUser;

// Mainly for snapshot deduplication.
//
Expand All @@ -135,37 +135,37 @@ public class Snapshot {
// If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be
// committed before snapshot B, and thus snapshot A must contain older records than snapshot B.
@JsonProperty(FIELD_COMMIT_IDENTIFIER)
private final long commitIdentifier;
protected final long commitIdentifier;

@JsonProperty(FIELD_COMMIT_KIND)
private final CommitKind commitKind;
protected final CommitKind commitKind;

@JsonProperty(FIELD_TIME_MILLIS)
private final long timeMillis;
protected final long timeMillis;

@JsonProperty(FIELD_LOG_OFFSETS)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Map<Integer, Long> logOffsets;
protected final Map<Integer, Long> logOffsets;

// record count of all changes occurred in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_TOTAL_RECORD_COUNT)
@Nullable
private final Long totalRecordCount;
protected final Long totalRecordCount;

// record count of all new changes occurred in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_DELTA_RECORD_COUNT)
@Nullable
private final Long deltaRecordCount;
protected final Long deltaRecordCount;

// record count of all changelog produced in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Long changelogRecordCount;
protected final Long changelogRecordCount;

// watermark for input records
// null for paimon <= 0.3
Expand All @@ -174,14 +174,14 @@ public class Snapshot {
@JsonProperty(FIELD_WATERMARK)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Long watermark;
protected final Long watermark;

// stats file name for statistics of this table
// null if no stats file
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_STATISTICS)
@Nullable
private final String statistics;
protected final String statistics;

public Snapshot(
long id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ static ConflictCheck noConflictCheck() {
return latestSnapshot -> false;
}

static ConflictCheck mustConflictCheck() {
public static ConflictCheck mustConflictCheck() {
return latestSnapshot -> true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -461,8 +462,7 @@ public void rollbackTo(long snapshotId) {
rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
}

@Override
public void createTag(String tagName, long fromSnapshotId) {
public Snapshot createTagInternal(long fromSnapshotId) {
SnapshotManager snapshotManager = snapshotManager();
Snapshot snapshot = null;
if (snapshotManager.snapshotExists(fromSnapshotId)) {
Expand All @@ -482,18 +482,36 @@ public void createTag(String tagName, long fromSnapshotId) {
snapshot != null,
"Cannot create tag because given snapshot #%s doesn't exist.",
fromSnapshotId);
createTag(tagName, snapshot);
return snapshot;
}

@Override
public void createTag(String tagName, long fromSnapshotId) {
createTag(
tagName, coreOptions().tagDefaultTimeRetained(), createTagInternal(fromSnapshotId));
}

@Override
public void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId) {
createTag(tagName, timeRetained, createTagInternal(fromSnapshotId));
}

@Override
public void createTag(String tagName) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, latestSnapshot);
createTag(tagName, coreOptions().tagDefaultTimeRetained(), latestSnapshot);
}

@Override
public void createTag(String tagName, @Nullable Duration timeRetained) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, timeRetained, latestSnapshot);
}

private void createTag(String tagName, Snapshot fromSnapshot) {
tagManager().createTag(fromSnapshot, tagName, store().createTagCallbacks());
private void createTag(String tagName, @Nullable Duration timeRetained, Snapshot fromSnapshot) {
tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

package org.apache.paimon.table;

import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.InnerStreamTableScan;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,6 +113,14 @@ default void createTag(String tagName, long fromSnapshotId) {
this.getClass().getSimpleName()));
}

@Experimental
default void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
this.getClass().getSimpleName()));
}

@Override
default void createTag(String tagName) {
throw new UnsupportedOperationException(
Expand All @@ -117,6 +129,14 @@ default void createTag(String tagName) {
this.getClass().getSimpleName()));
}

@Experimental
default void createTag(String tagName, @Nullable Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
this.getClass().getSimpleName()));
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException(
Expand Down
9 changes: 9 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -76,10 +79,16 @@ public interface Table extends Serializable {
@Experimental
void createTag(String tagName, long fromSnapshotId);

@Experimental
void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId);

/** Create a tag from latest snapshot. */
@Experimental
void createTag(String tagName);

@Experimental
void createTag(String tagName, @Nullable Duration timeRetained);

/** Delete a tag by name. */
@Experimental
void deleteTag(String tagName);
Expand Down
Loading

0 comments on commit 91a5020

Please sign in to comment.