Skip to content

Commit

Permalink
[spark] Replace create existing tag semantic with replace_tag (#4346)
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang authored Oct 31, 2024
1 parent 4c1777f commit e2acdc2
Show file tree
Hide file tree
Showing 30 changed files with 961 additions and 218 deletions.
26 changes: 26 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,32 @@ All available procedures are listed below.
CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
</td>
</tr>
<tr>
<td>replace_tag</td>
<td>
-- Use named argument<br/>
-- replace tag with new time retained <br/>
CALL [catalog.]sys.replace_tag(`table` => 'identifier', tag => 'tagName', time_retained => 'timeRetained') <br/>
-- replace tag with new snapshot id and time retained <br/>
CALL [catalog.]sys.replace_tag(`table` => 'identifier', snapshot_id => 'snapshotId') <br/><br/>
-- Use indexed argument<br/>
-- replace tag with new snapshot id and time retained <br/>
CALL [catalog.]sys.replace_tag('identifier', 'tagName', 'snapshotId', 'timeRetained') <br/>
</td>
<td>
To replace an existing tag with new tag info. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>tag: name of the existed tag. Cannot be empty.</li>
<li>snapshot(Long): id of the snapshot which the tag is based on, it is optional.</li>
<li>time_retained: The maximum time retained for the existing tag, it is optional.</li>
</td>
<td>
-- for Flink 1.18<br/>
CALL sys.replace_tag('default.T', 'my_tag', 5, '1 d')<br/><br/>
-- for Flink 1.19 and later<br/>
CALL sys.replace_tag(`table` => 'default.T', tag => 'my_tag', snapshot_id => 5, time_retained => '1 d')<br/><br/>
</td>
</tr>
<tr>
<td>expire_tags</td>
<td>
Expand Down
13 changes: 13 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ This section introduce all available spark procedures about paimon.
CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1', target_tag_name => 'tag2')
</td>
</tr>
<tr>
<td>replace_tag</td>
<td>
Replace an existing tag with new tag info. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>tag: name of the existed tag. Cannot be empty.</li>
<li>snapshot(Long): id of the snapshot which the tag is based on, it is optional.</li>
<li>time_retained: The maximum time retained for the existing tag, it is optional.</li>
</td>
<td>
CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d')
</td>
</tr>
<tr>
<td>delete_tag</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,19 @@ public void renameTag(String tagName, String targetTagName) {
tagManager().renameTag(tagName, targetTagName);
}

@Override
public void replaceTag(
String tagName, @Nullable Long fromSnapshotId, @Nullable Duration timeRetained) {
if (fromSnapshotId == null) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
SnapshotNotExistException.checkNotNull(
latestSnapshot, "Cannot replace tag because latest snapshot doesn't exist.");
tagManager().replaceTag(latestSnapshot, tagName, timeRetained);
} else {
tagManager().replaceTag(findSnapshot(fromSnapshotId), tagName, timeRetained);
}
}

@Override
public void deleteTag(String tagName) {
tagManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ public void renameTag(String tagName, String targetTagName) {
wrapped.renameTag(tagName, targetTagName);
}

@Override
public void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained) {
wrapped.replaceTag(tagName, fromSnapshotId, timeRetained);
}

@Override
public void deleteTag(String tagName) {
wrapped.deleteTag(tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ default void renameTag(String tagName, String targetTagName) {
throw new UnsupportedOperationException();
}

@Override
default void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained) {
throw new UnsupportedOperationException();
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ default void renameTag(String tagName, String targetTagName) {
this.getClass().getSimpleName()));
}

@Override
default void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support replaceTag.",
this.getClass().getSimpleName()));
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException(
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ default String fullName() {
@Experimental
void renameTag(String tagName, String targetTagName);

/** Replace a tag with new snapshot id and new time retained. */
@Experimental
void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained);

/** Delete a tag by name. */
@Experimental
void deleteTag(String tagName);
Expand Down
43 changes: 25 additions & 18 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,26 @@ public List<Path> tagPaths(Predicate<Path> predicate) throws IOException {

/** Create a tag from given snapshot and save it in the storage. */
public void createTag(
Snapshot snapshot,
String tagName,
@Nullable Duration timeRetained,
List<TagCallback> callbacks) {
Snapshot snapshot, String tagName, Duration timeRetained, List<TagCallback> callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
}

/** Replace a tag from given snapshot and save it in the storage. */
public void replaceTag(Snapshot snapshot, String tagName, Duration timeRetained) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(tagExists(tagName), "Tag name '%s' does not exist.", tagName);
createOrReplaceTag(snapshot, tagName, timeRetained, null);
}

public void createOrReplaceTag(
Snapshot snapshot,
String tagName,
@Nullable Duration timeRetained,
@Nullable List<TagCallback> callbacks) {
// When timeRetained is not defined, please do not write the tagCreatorTime field,
// as this will cause older versions (<= 0.7) of readers to be unable to read this
// tag.
Expand All @@ -117,15 +130,7 @@ public void createTag(
Path tagPath = tagPath(tagName);

try {
if (tagExists(tagName)) {
Snapshot tagged = taggedSnapshot(tagName);
Preconditions.checkArgument(
tagged.id() == snapshot.id(), "Tag name '%s' already exists.", tagName);
// update tag metadata into for the same snapshot of the same tag name.
fileIO.overwriteFileUtf8(tagPath, content);
} else {
fileIO.writeFile(tagPath, content, false);
}
fileIO.overwriteFileUtf8(tagPath, content);
} catch (IOException e) {
throw new RuntimeException(
String.format(
Expand All @@ -135,11 +140,13 @@ public void createTag(
e);
}

try {
callbacks.forEach(callback -> callback.notifyCreation(tagName));
} finally {
for (TagCallback tagCallback : callbacks) {
IOUtils.closeQuietly(tagCallback);
if (callbacks != null) {
try {
callbacks.forEach(callback -> callback.notifyCreation(tagName));
} finally {
for (TagCallback tagCallback : callbacks) {
IOUtils.closeQuietly(tagCallback);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1136,8 +1136,9 @@ public void testCreateSameTagName() throws Exception {
table.createTag("test-tag", 1);
// verify that tag file exist
assertThat(tagManager.tagExists("test-tag")).isTrue();
// Create again
table.createTag("test-tag", 1);
// Create again failed if tag existed
Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 1))
.hasMessageContaining("Tag name 'test-tag' already exists.");
Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2))
.hasMessageContaining("Tag name 'test-tag' already exists.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.TimeUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import javax.annotation.Nullable;

import java.time.Duration;

/** A base procedure to create or replace a tag. */
public abstract class CreateOrReplaceTagBaseProcedure extends ProcedureBase {

public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, null, null);
}

public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, long snapshotId)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, snapshotId, null);
}

public String[] call(
ProcedureContext procedureContext,
String tableId,
String tagName,
long snapshotId,
String timeRetained)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, snapshotId, timeRetained);
}

public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, String timeRetained)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, null, timeRetained);
}

private String[] innerCall(
String tableId,
String tagName,
@Nullable Long snapshotId,
@Nullable String timeRetained)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
createOrReplaceTag(table, tagName, snapshotId, toDuration(timeRetained));
return new String[] {"Success"};
}

abstract void createOrReplaceTag(
Table table, String tagName, Long snapshotId, Duration timeRetained);

@Nullable
private static Duration toDuration(@Nullable String s) {
if (s == null) {
return null;
}

return TimeUtils.parseDuration(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,7 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.TimeUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import javax.annotation.Nullable;

import java.time.Duration;

Expand All @@ -36,59 +29,17 @@
* CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
* </code></pre>
*/
public class CreateTagProcedure extends ProcedureBase {
public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {

public static final String IDENTIFIER = "create_tag";

public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, long snapshotId)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, snapshotId, null);
}

public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, null, null);
}

public String[] call(
ProcedureContext procedureContext,
String tableId,
String tagName,
long snapshotId,
String timeRetained)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, snapshotId, timeRetained);
}

public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, String timeRetained)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, null, timeRetained);
}

private String[] innerCall(
String tableId,
String tagName,
@Nullable Long snapshotId,
@Nullable String timeRetained)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
@Override
void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
if (snapshotId == null) {
table.createTag(tagName, toDuration(timeRetained));
table.createTag(tagName, timeRetained);
} else {
table.createTag(tagName, snapshotId, toDuration(timeRetained));
table.createTag(tagName, snapshotId, timeRetained);
}
return new String[] {"Success"};
}

@Nullable
private static Duration toDuration(@Nullable String s) {
if (s == null) {
return null;
}

return TimeUtils.parseDuration(s);
}

@Override
Expand Down
Loading

0 comments on commit e2acdc2

Please sign in to comment.