Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support rename tag action and procedure #4277

Merged
merged 12 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ This section introduce all available spark procedures about paimon.
CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d')
</td>
</tr>
<tr>
<td>rename_tag</td>
<td>
Rename a tag with a new tag name. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>tag_name: name of the tag. Cannot be empty.</li>
<li>target_tag_name: the new tag name to rename. Cannot be empty.</li>
</td>
<td>
CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1', target_tag_name => 'tag2')
</td>
</tr>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target_name will be better ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

<tr>
<td>delete_tag</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ public void createTag(String tagName, Duration timeRetained) {
wrapped.createTag(tagName, timeRetained);
}

@Override
public void renameTag(String tagName, String targetTagName) {
privilegeChecker.assertCanInsert(identifier);
wrapped.renameTag(tagName, targetTagName);
}

@Override
public void createTag(String tagName, long fromSnapshotId, Duration timeRetained) {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration
tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks());
}

@Override
public void renameTag(String tagName, String targetTagName) {
tagManager().renameTag(tagName, targetTagName);
}

@Override
public void deleteTag(String tagName) {
tagManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public void createTag(String tagName, long fromSnapshotId, Duration timeRetained
wrapped.createTag(tagName, fromSnapshotId, timeRetained);
}

@Override
public void renameTag(String tagName, String targetTagName) {
wrapped.renameTag(tagName, targetTagName);
}

@Override
public void deleteTag(String tagName) {
wrapped.deleteTag(tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ default void createTag(String tagName, Duration timeRetained) {
throw new UnsupportedOperationException();
}

@Override
default void renameTag(String tagName, String targetTagName) {
throw new UnsupportedOperationException();
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ default void createTag(String tagName, Duration timeRetained) {
this.getClass().getSimpleName()));
}

@Override
default void renameTag(String tagName, String targetTagName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support renameTag.",
this.getClass().getSimpleName()));
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException(
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ default String fullName() {
@Experimental
void createTag(String tagName, Duration timeRetained);

@Experimental
void renameTag(String tagName, String targetTagName);

/** Delete a tag by name. */
@Experimental
void deleteTag(String tagName);
Expand Down
18 changes: 18 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,24 @@ public void createTag(
}
}

public void renameTag(String tagName, String targetTagName) {
try {
if (!tagExists(tagName)) {
throw new RuntimeException(
String.format("The specified tag name [%s] does not exist.", tagName));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specified tag name [%s] does not exist ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (tagExists(targetTagName)) {
throw new RuntimeException(
String.format(
"The specified target tag name [%s] existed, please set a non-existent tag name.",
targetTagName));
}
fileIO.rename(tagPath(tagName), tagPath(targetTagName));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Make sure the tagNames are ALL tags of one snapshot. */
public void deleteAllTagsOfOneSnapshot(
List<String> tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.utils.StringUtils;

import java.util.Map;

/** Rename Tag action for Flink. */
public class RenameTagAction extends TableActionBase {
private final String tagName;
private final String targetTagName;

public RenameTagAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String tagName,
String targetTagName) {
super(warehouse, databaseName, tableName, catalogConfig);
this.tagName = tagName;
this.targetTagName = targetTagName;
}

@Override
public void run() throws Exception {
if (StringUtils.isEmpty(tagName) || StringUtils.isEmpty(targetTagName)) {
throw new RuntimeException(
String.format(
"The specified tag name [%s] or target tag name [%s] cannot be empty.",
tagName, targetTagName));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.format("The specified tag name [%s] or target tag name [%s] cannot be empty.", tagName, targetName)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

table.renameTag(tagName, targetTagName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link RenameTagActionFactory}. */
public class RenameTagActionFactory implements ActionFactory {

private static final String IDENTIFIER = "rename_tag";

private static final String TAG_NAME = "tag_name";
private static final String TARGET_TAG_NAME = "target_tag_name";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, TAG_NAME);
checkRequiredArgument(params, TARGET_TAG_NAME);

Tuple3<String, String, String> tablePath = getTablePath(params);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tagName = params.get(TAG_NAME);
String targetTagName = params.get(TARGET_TAG_NAME);

RenameTagAction action =
new RenameTagAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
catalogConfig,
tagName,
targetTagName);
return Optional.of(action);
}

@Override
public void printHelp() {
System.out.println("Action \"rename_tag\" rename a tag with a new tag name.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" rename_tag --warehouse <warehouse_path> --tag_name <tag_name> "
+ "--target_tag_name <target_tag_name>");
System.out.println();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

/**
* Create tag procedure. Usage:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create tag procedure?

*
* <pre><code>
* CALL sys.rename_tag('tableId', 'tagName', 'targetTagName')
* </code></pre>
*/
public class RenameTagProcedure extends ProcedureBase {
private static final String IDENTIFIER = "rename_tag";

@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "tagName", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "targetTagName", type = @DataTypeHint("STRING"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a empty line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

})
public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, String targetTagName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.renameTag(tagName, targetTagName);
String ret = String.format("Rename [%s] to [%s] successfully.", tagName, targetTagName);
return new String[] {ret};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ org.apache.paimon.flink.action.MarkPartitionDoneActionFactory
org.apache.paimon.flink.action.CreateBranchActionFactory
org.apache.paimon.flink.action.DeleteBranchActionFactory
org.apache.paimon.flink.action.FastForwardActionFactory
org.apache.paimon.flink.action.RenameTagActionFactory
org.apache.paimon.flink.action.RepairActionFactory
org.apache.paimon.flink.action.RewriteFileIndexActionFactory
org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
Expand Down Expand Up @@ -67,6 +68,7 @@ org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
org.apache.paimon.flink.procedure.RenameTagProcedure
org.apache.paimon.flink.procedure.FastForwardProcedure
org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
org.apache.paimon.flink.procedure.CloneProcedure
Loading
Loading