Skip to content

Commit

Permalink
[core] Adjust codes TTL for tag
Browse files Browse the repository at this point in the history
This closes #3159
  • Loading branch information
JingsongLi committed Apr 16, 2024
1 parent 135fa70 commit c0c4e08
Show file tree
Hide file tree
Showing 27 changed files with 283 additions and 515 deletions.
6 changes: 3 additions & 3 deletions docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ You can create a tag with given name and snapshot ID.
--database <database-name> \
--table <table-name> \
--tag_name <tag-name> \
--time_retained <time-retained> \
[--snapshot <snapshot_id>] \
[--time_retained <time-retained>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
Expand All @@ -127,7 +127,7 @@ public class CreateTag {
public static void main(String[] args) {
Table table = ...;
table.createTag("my-tag", 1);
table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1);
table.createTag("my-tag-retained-12-hours", 1, Duration.ofHours(12));
}
}
```
Expand All @@ -142,7 +142,7 @@ CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2);
To create a tag with retained 1 day, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', snapshot => 2);
CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2, time_retained => '1 d');
```
To create a tag based on the latest snapshot id, run the following sql:
Expand Down
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@
<td><h5>tag.default-time-retained</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum default time retained for all tags.</td>
<td>The default maximum time retained for newly created tags.</td>
</tr>
<tr>
<td><h5>tag.num-retained-max</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ public class CoreOptions implements Serializable {
key("tag.default-time-retained")
.durationType()
.noDefaultValue()
.withDescription("The maximum default time retained for all tags.");
.withDescription("The default maximum time retained for newly created tags.");

public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
Expand Down Expand Up @@ -1649,6 +1649,7 @@ public TagPeriodFormatter tagPeriodFormatter() {
return options.get(TAG_PERIOD_FORMATTER);
}

@Nullable
public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ public PartitionExpire newPartitionExpire(String commitUser) {
}

@Override
@Nullable
public TagAutoManager newTagCreationManager() {
return TagAutoManager.create(
options,
Expand Down
1 change: 0 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public interface FileStore<T> extends Serializable {
@Nullable
PartitionExpire newPartitionExpire(String commitUser);

@Nullable
TagAutoManager newTagCreationManager();

ServiceManager newServiceManager();
Expand Down
8 changes: 4 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* This file is the entrance to all data committed at some specific time point.
Expand Down Expand Up @@ -447,12 +446,13 @@ public static Snapshot fromPath(FileIO fileIO, Path path) {
}
}

public static Optional<Snapshot> safelyFromPath(FileIO fileIO, Path path) throws IOException {
@Nullable
public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws IOException {
try {
String json = fileIO.readFileUtf8(path);
return Optional.of(Snapshot.fromJson(json));
return Snapshot.fromJson(json);
} catch (FileNotFoundException e) {
return Optional.empty();
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,29 +488,29 @@ public Snapshot createTagInternal(long fromSnapshotId) {
@Override
public void createTag(String tagName, long fromSnapshotId) {
createTag(
tagName, coreOptions().tagDefaultTimeRetained(), createTagInternal(fromSnapshotId));
tagName, createTagInternal(fromSnapshotId), coreOptions().tagDefaultTimeRetained());
}

@Override
public void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId) {
createTag(tagName, timeRetained, createTagInternal(fromSnapshotId));
public void createTag(String tagName, long fromSnapshotId, Duration timeRetained) {
createTag(tagName, createTagInternal(fromSnapshotId), timeRetained);
}

@Override
public void createTag(String tagName) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, coreOptions().tagDefaultTimeRetained(), latestSnapshot);
createTag(tagName, latestSnapshot, coreOptions().tagDefaultTimeRetained());
}

@Override
public void createTag(String tagName, @Nullable Duration timeRetained) {
public void createTag(String tagName, Duration timeRetained) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, timeRetained, latestSnapshot);
createTag(tagName, latestSnapshot, timeRetained);
}

private void createTag(String tagName, @Nullable Duration timeRetained, Snapshot fromSnapshot) {
private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration timeRetained) {
tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

package org.apache.paimon.table;

import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.InnerStreamTableScan;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -113,8 +110,8 @@ default void createTag(String tagName, long fromSnapshotId) {
this.getClass().getSimpleName()));
}

@Experimental
default void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId) {
@Override
default void createTag(String tagName, long fromSnapshotId, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
Expand All @@ -129,8 +126,8 @@ default void createTag(String tagName) {
this.getClass().getSimpleName()));
}

@Experimental
default void createTag(String tagName, @Nullable Duration timeRetained) {
@Override
default void createTag(String tagName, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
Expand Down
6 changes: 2 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -80,14 +78,14 @@ public interface Table extends Serializable {
void createTag(String tagName, long fromSnapshotId);

@Experimental
void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId);
void createTag(String tagName, long fromSnapshotId, Duration timeRetained);

/** Create a tag from latest snapshot. */
@Experimental
void createTag(String tagName);

@Experimental
void createTag(String tagName, @Nullable Duration timeRetained);
void createTag(String tagName, Duration timeRetained);

/** Delete a tag by name. */
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,9 @@ public boolean forceCreatingSnapshot() {
if (this.forceCreatingSnapshot) {
return true;
}
if (tagAutoManager != null) {
return tagAutoManager.getTagAutoCreation() != null
&& tagAutoManager.getTagAutoCreation().forceCreatingSnapshot();
}
return false;
return tagAutoManager != null
&& tagAutoManager.getTagAutoCreation() != null
&& tagAutoManager.getTagAutoCreation().forceCreatingSnapshot();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.TagManager;
Expand All @@ -61,7 +62,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -208,12 +208,10 @@ public RecordReader<InternalRow> createReader(Split split) {
Options options = new Options();
options.set(CoreOptions.PATH, location.toUri().toString());
FileStoreTable table = FileStoreTableFactory.create(fileIO, options);
SortedMap<Tag, List<String>> tags = table.tagManager().tagsWithTimeRetained();
List<Pair<Tag, String>> tags = table.tagManager().tagObjects();
Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
for (Map.Entry<Tag, List<String>> tag : tags.entrySet()) {
for (String tagName : tag.getValue()) {
nameToSnapshot.put(tagName, tag.getKey());
}
for (Pair<Tag, String> tag : tags) {
nameToSnapshot.put(tag.getValue(), tag.getKey());
}
Map<String, List<String>> tagBranches = new HashMap<>();
table.branchManager()
Expand Down
Loading

0 comments on commit c0c4e08

Please sign in to comment.