Skip to content

Commit

Permalink
replace-tag-procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Oct 17, 2024
1 parent e1419ef commit d7423fa
Show file tree
Hide file tree
Showing 13 changed files with 313 additions and 72 deletions.
13 changes: 13 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ This section introduce all available spark procedures about paimon.
CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1', target_tag_name => 'tag2')
</td>
</tr>
<tr>
<td>replace_tag</td>
<td>
Replace an existing tag with new tag info. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>tag: name of the existed tag. Cannot be empty.</li>
<li>snapshot(Long): id of the snapshot which the tag is based on.</li>
<li>time_retained: The maximum time retained for the existing tag.</li>
</td>
<td>
CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d')
</td>
</tr>
<tr>
<td>delete_tag</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,25 @@ public void renameTag(String tagName, String targetTagName) {
tagManager().renameTag(tagName, targetTagName);
}

@Override
public void replaceTag(String tagName, @Nullable Duration timeRetained) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
SnapshotNotExistException.checkNotNull(
latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
tagManager()
.replaceTag(latestSnapshot, tagName, timeRetained, store().createTagCallbacks());
}

@Override
public void replaceTag(String tagName, long fromSnapshotId, @Nullable Duration timeRetained) {
tagManager()
.replaceTag(
findSnapshot(fromSnapshotId),
tagName,
timeRetained,
store().createTagCallbacks());
}

@Override
public void deleteTag(String tagName) {
tagManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ public void renameTag(String tagName, String targetTagName) {
wrapped.renameTag(tagName, targetTagName);
}

@Override
public void replaceTag(String tagName, Duration timeRetained) {
wrapped.replaceTag(tagName, timeRetained);
}

@Override
public void replaceTag(String tagName, long fromSnapshotId, Duration timeRetained) {
wrapped.replaceTag(tagName, fromSnapshotId, timeRetained);
}

@Override
public void deleteTag(String tagName) {
wrapped.deleteTag(tagName);
Expand Down
10 changes: 10 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ default void renameTag(String tagName, String targetTagName) {
throw new UnsupportedOperationException();
}

@Override
default void replaceTag(String tagName, Duration timeRetained) {
throw new UnsupportedOperationException();
}

@Override
default void replaceTag(String tagName, long fromSnapshotId, Duration timeRetained) {
throw new UnsupportedOperationException();
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ default void renameTag(String tagName, String targetTagName) {
this.getClass().getSimpleName()));
}

@Override
default void replaceTag(String tagName, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support replaceTag.",
this.getClass().getSimpleName()));
}

@Override
default void replaceTag(String tagName, long fromSnapshotId, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support replaceTag.",
this.getClass().getSimpleName()));
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException(
Expand Down
6 changes: 6 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ default String fullName() {
@Experimental
void renameTag(String tagName, String targetTagName);

@Experimental
void replaceTag(String tagName, Duration timeRetained);

@Experimental
void replaceTag(String tagName, long fromSnapshotId, Duration timeRetained);

/** Delete a tag by name. */
@Experimental
void deleteTag(String tagName);
Expand Down
30 changes: 21 additions & 9 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,27 @@ public void createTag(
List<TagCallback> callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(!tagExists(tagName), "Tag %s already exists.", tagName);
createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
}

/** Replace a tag from given snapshot and save it in the storage. */
public void replaceTag(
Snapshot snapshot,
String tagName,
@Nullable Duration timeRetained,
List<TagCallback> callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(tagExists(tagName), "Tag %s does not exist.", tagName);
createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
}

public void createOrReplaceTag(
Snapshot snapshot,
String tagName,
@Nullable Duration timeRetained,
List<TagCallback> callbacks) {
// When timeRetained is not defined, please do not write the tagCreatorTime field,
// as this will cause older versions (<= 0.7) of readers to be unable to read this
// tag.
Expand All @@ -117,15 +137,7 @@ public void createTag(
Path tagPath = tagPath(tagName);

try {
if (tagExists(tagName)) {
Snapshot tagged = taggedSnapshot(tagName);
Preconditions.checkArgument(
tagged.id() == snapshot.id(), "Tag name '%s' already exists.", tagName);
// update tag metadata into for the same snapshot of the same tag name.
fileIO.overwriteFileUtf8(tagPath, content);
} else {
fileIO.writeFile(tagPath, content, false);
}
fileIO.overwriteFileUtf8(tagPath, content);
} catch (IOException e) {
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
import org.apache.paimon.spark.procedure.RenameTagProcedure;
import org.apache.paimon.spark.procedure.RepairProcedure;
import org.apache.paimon.spark.procedure.ReplaceTagTagProcedure;
import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;

Expand All @@ -63,6 +64,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
ImmutableMap.builder();
procedureBuilders.put("rollback", RollbackProcedure::builder);
procedureBuilders.put("create_tag", CreateTagProcedure::builder);
procedureBuilders.put("replace_tag", ReplaceTagTagProcedure::builder);
procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
procedureBuilders.put(
"create_tag_from_timestamp", CreateTagFromTimestampProcedure::builder);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.procedure;

import org.apache.paimon.table.Table;
import org.apache.paimon.utils.TimeUtils;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.time.Duration;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/** A base procedure to create or replace a tag. */
public abstract class CreateOrReplaceTagBaseProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("tag", StringType),
ProcedureParameter.optional("snapshot", LongType),
ProcedureParameter.optional("time_retained", StringType)
};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
});

protected CreateOrReplaceTagBaseProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String tag = args.getString(1);
Long snapshot = args.isNullAt(2) ? null : args.getLong(2);
Duration timeRetained =
args.isNullAt(3) ? null : TimeUtils.parseDuration(args.getString(3));

return modifyPaimonTable(
tableIdent,
table -> {
createOrReplaceTag(table, tag, snapshot, timeRetained);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
}

abstract void createOrReplaceTag(
Table table, String tagName, Long snapshotId, Duration timeRetained);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,71 +18,26 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.utils.TimeUtils;
import org.apache.paimon.table.Table;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.time.Duration;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/** A procedure to create a tag. */
public class CreateTagProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("tag", StringType),
ProcedureParameter.optional("snapshot", LongType),
ProcedureParameter.optional("time_retained", StringType)
};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
});
public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {

protected CreateTagProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String tag = args.getString(1);
Long snapshot = args.isNullAt(2) ? null : args.getLong(2);
Duration timeRetained =
args.isNullAt(3) ? null : TimeUtils.parseDuration(args.getString(3));

return modifyPaimonTable(
tableIdent,
table -> {
if (snapshot == null) {
table.createTag(tag, timeRetained);
} else {
table.createTag(tag, snapshot, timeRetained);
}
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
if (snapshotId == null) {
table.createTag(tagName, timeRetained);
} else {
table.createTag(tagName, snapshotId, timeRetained);
}
}

public static ProcedureBuilder builder() {
Expand Down
Loading

0 comments on commit d7423fa

Please sign in to comment.