Skip to content

Commit

Permalink
[core] Introduce TTL for tag (#3127)
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 committed Apr 5, 2024
1 parent b0ff65f commit 261d762
Show file tree
Hide file tree
Showing 26 changed files with 780 additions and 88 deletions.
7 changes: 7 additions & 0 deletions docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ You can create a tag with given name and snapshot ID.
--database <database-name> \
--table <table-name> \
--tag_name <tag-name> \
--time_retained <time-retained> \
[--snapshot <snapshot_id>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
Expand All @@ -126,6 +127,7 @@ public class CreateTag {
public static void main(String[] args) {
Table table = ...;
table.createTag("my-tag", 1);
table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1);
}
}
```
Expand All @@ -138,6 +140,11 @@ Run the following sql:
CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2);
```
To create a tag with retained 12 hours, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', snapshot => 2);
```
To create a tag based on the latest snapshot id, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag');
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,12 @@
<td>Integer</td>
<td>The maximum number of tags to retain.</td>
</tr>
<tr>
<td><h5>tag.time-retained</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum retained time for all tags.</td>
</tr>
<tr>
<td><h5>tag.period-formatter</h5></td>
<td style="word-wrap: break-word;">with_dashes</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class CoreOptions implements Serializable {

public static final String DISTINCT = "distinct";

public static final Duration DEFAULT_TAG_TIME_RETAINED = Duration.ofDays(3650000);

public static final ConfigOption<Integer> BUCKET =
key("bucket")
.intType()
Expand Down Expand Up @@ -989,6 +991,12 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The maximum number of tags to retain.");

public static final ConfigOption<Duration> TAG_TIME_RETAINED =
key("tag.time-retained")
.durationType()
.defaultValue(DEFAULT_TAG_TIME_RETAINED)
.withDescription("The maximum retained time for all tags.");

public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
Expand Down Expand Up @@ -1617,6 +1625,10 @@ public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
}

public Duration tagTimeRetained() {
return options.get(TAG_TIME_RETAINED);
}

public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
Expand Down
34 changes: 17 additions & 17 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,37 +93,37 @@ public class Snapshot {
// null for paimon <= 0.2
@JsonProperty(FIELD_VERSION)
@Nullable
private final Integer version;
protected final Integer version;

@JsonProperty(FIELD_ID)
private final long id;
protected final long id;

@JsonProperty(FIELD_SCHEMA_ID)
private final long schemaId;
protected final long schemaId;

// a manifest list recording all changes from the previous snapshots
@JsonProperty(FIELD_BASE_MANIFEST_LIST)
private final String baseManifestList;
protected final String baseManifestList;

// a manifest list recording all new changes occurred in this snapshot
// for faster expire and streaming reads
@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
private final String deltaManifestList;
protected final String deltaManifestList;

// a manifest list recording all changelog produced in this snapshot
// null if no changelog is produced, or for paimon <= 0.2
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
private final String changelogManifestList;
protected final String changelogManifestList;

// a manifest recording all index files of this table
// null if no index file
@JsonProperty(FIELD_INDEX_MANIFEST)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final String indexManifest;
protected final String indexManifest;

@JsonProperty(FIELD_COMMIT_USER)
private final String commitUser;
protected final String commitUser;

// Mainly for snapshot deduplication.
//
Expand All @@ -133,49 +133,49 @@ public class Snapshot {
// If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be
// committed before snapshot B, and thus snapshot A must contain older records than snapshot B.
@JsonProperty(FIELD_COMMIT_IDENTIFIER)
private final long commitIdentifier;
protected final long commitIdentifier;

@JsonProperty(FIELD_COMMIT_KIND)
private final CommitKind commitKind;
protected final CommitKind commitKind;

@JsonProperty(FIELD_TIME_MILLIS)
private final long timeMillis;
protected final long timeMillis;

@JsonProperty(FIELD_LOG_OFFSETS)
private final Map<Integer, Long> logOffsets;
protected final Map<Integer, Long> logOffsets;

// record count of all changes occurred in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_TOTAL_RECORD_COUNT)
@Nullable
private final Long totalRecordCount;
protected final Long totalRecordCount;

// record count of all new changes occurred in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_DELTA_RECORD_COUNT)
@Nullable
private final Long deltaRecordCount;
protected final Long deltaRecordCount;

// record count of all changelog produced in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT)
@Nullable
private final Long changelogRecordCount;
protected final Long changelogRecordCount;

// watermark for input records
// null for paimon <= 0.3
// null if there is no watermark in new committing, and the previous snapshot does not have a
// watermark
@JsonProperty(FIELD_WATERMARK)
@Nullable
private final Long watermark;
protected final Long watermark;

// stats file name for statistics of this table
// null if no stats file
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_STATISTICS)
@Nullable
private final String statistics;
protected final String statistics;

public Snapshot(
long id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -443,8 +444,7 @@ public void rollbackTo(long snapshotId) {
rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
}

@Override
public void createTag(String tagName, long fromSnapshotId) {
public Snapshot createTagInternal(long fromSnapshotId) {
SnapshotManager snapshotManager = snapshotManager();
Snapshot snapshot = null;
if (snapshotManager.snapshotExists(fromSnapshotId)) {
Expand All @@ -464,18 +464,35 @@ public void createTag(String tagName, long fromSnapshotId) {
snapshot != null,
"Cannot create tag because given snapshot #%s doesn't exist.",
fromSnapshotId);
createTag(tagName, snapshot);
return snapshot;
}

@Override
public void createTag(String tagName, long fromSnapshotId) {
createTag(tagName, coreOptions().tagTimeRetained(), createTagInternal(fromSnapshotId));
}

@Override
public void createTag(String tagName, Duration timeRetained, long fromSnapshotId) {
createTag(tagName, timeRetained, createTagInternal(fromSnapshotId));
}

@Override
public void createTag(String tagName) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, latestSnapshot);
createTag(tagName, coreOptions().tagTimeRetained(), latestSnapshot);
}

@Override
public void createTag(String tagName, Duration timeRetained) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, timeRetained, latestSnapshot);
}

private void createTag(String tagName, Snapshot fromSnapshot) {
tagManager().createTag(fromSnapshot, tagName, store().createTagCallbacks());
private void createTag(String tagName, Duration timeRetained, Snapshot fromSnapshot) {
tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.paimon.table;

import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.InnerStreamTableScan;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,6 +111,14 @@ default void createTag(String tagName, long fromSnapshotId) {
this.getClass().getSimpleName()));
}

@Experimental
default void createTag(String tagName, Duration timeRetained, long fromSnapshotId) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
this.getClass().getSimpleName()));
}

@Override
default void createTag(String tagName) {
throw new UnsupportedOperationException(
Expand All @@ -117,6 +127,14 @@ default void createTag(String tagName) {
this.getClass().getSimpleName()));
}

@Experimental
default void createTag(String tagName, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
this.getClass().getSimpleName()));
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException(
Expand Down
7 changes: 7 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.types.RowType;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -76,10 +77,16 @@ public interface Table extends Serializable {
@Experimental
void createTag(String tagName, long fromSnapshotId);

@Experimental
void createTag(String tagName, Duration timeRetained, long fromSnapshotId);

/** Create a tag from latest snapshot. */
@Experimental
void createTag(String tagName);

@Experimental
void createTag(String tagName, Duration timeRetained);

/** Delete a tag by name. */
@Experimental
void deleteTag(String tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
Expand All @@ -63,6 +64,7 @@
import java.util.SortedMap;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A {@link Table} for showing tags of table. */
public class TagsTable implements ReadonlyTable {
Expand All @@ -79,7 +81,10 @@ public class TagsTable implements ReadonlyTable {
new DataField(2, "schema_id", new BigIntType(false)),
new DataField(3, "commit_time", new TimestampType(false, 3)),
new DataField(4, "record_count", new BigIntType(true)),
new DataField(5, "branches", SerializationUtils.newStringType(true))));
new DataField(5, "branches", SerializationUtils.newStringType(true)),
new DataField(6, "create_time", SerializationUtils.newStringType(true)),
new DataField(
7, "time_retained", SerializationUtils.newStringType(true))));

private final FileIO fileIO;
private final Path location;
Expand Down Expand Up @@ -234,17 +239,21 @@ public RecordReader<InternalRow> createReader(Split split) {
}

private InternalRow toRow(
Map.Entry<String, Snapshot> tag, Map<String, List<String>> tagBranches) {
Snapshot snapshot = tag.getValue();
List<String> branches = tagBranches.get(tag.getKey());
Map.Entry<String, Snapshot> snapshot, Map<String, List<String>> tagBranches) {
checkArgument(
snapshot.getValue() instanceof Tag,
"There is a bug, Snapshot in tagManager.tags() must be Tag.");
Tag tag = (Tag) snapshot.getValue();
List<String> branches = tagBranches.get(snapshot.getKey());
return GenericRow.of(
BinaryString.fromString(tag.getKey()),
snapshot.id(),
snapshot.schemaId(),
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
snapshot.totalRecordCount(),
BinaryString.fromString(branches == null ? "[]" : branches.toString()));
BinaryString.fromString(snapshot.getKey()),
tag.id(),
tag.schemaId(),
Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(tag.timeMillis())),
tag.totalRecordCount(),
BinaryString.fromString(branches == null ? "[]" : branches.toString()),
tag.createTime(),
tag.timeRetained());
}
}
}
Loading

0 comments on commit 261d762

Please sign in to comment.