Skip to content

Commit

Permalink
[spark] Introduce reset_consumer procedure (#3614)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Jun 27, 2024
1 parent a4085ca commit cb6f06e
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 0 deletions.
16 changes: 16 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,21 @@ This section introduce all available spark procedures about paimon.
CALL sys.replace_branch(table => 'test_db.T', branch => 'test_branch')
</td>
</tr>
<tr>
<td>reset_consumer</td>
<td>
-- reset the new next snapshot id in the consumer<br/>
CALL sys.reset_consumer('identifier', 'consumerId', nextSnapshotId)<br/><br/>
-- delete consumer<br/>
CALL sys.reset_consumer(table => 'identifier', consumerId => 'consumerId')
</td>
<td>
To reset or delete consumer. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>consumerId: consumer to be reset or deleted.</li>
<li>nextSnapshotId (Long): the new next snapshot id of the consumer.</li>
</td>
<td>CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId=> 10)</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
import org.apache.paimon.spark.procedure.RepairProcedure;
import org.apache.paimon.spark.procedure.ReplaceBranchProcedure;
import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -70,6 +71,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put("repair", RepairProcedure::builder);
procedureBuilders.put("merge_branch", MergeBranchProcedure::builder);
procedureBuilders.put("replace_branch", ReplaceBranchProcedure::builder);
procedureBuilders.put("reset_consumer", ResetConsumerProcedure::builder);
return procedureBuilders.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.procedure;

import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.table.FileStoreTable;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/**
* Reset consumer procedure. Usage:
*
* <pre><code>
* -- reset the new next snapshot id in the consumer
* CALL sys.reset_consumer('tableId', 'consumerId', nextSnapshotId)
*
* -- delete consumer
* CALL sys.reset_consumer('tableId', 'consumerId')
* </code></pre>
*/
public class ResetConsumerProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("consumerId", StringType),
ProcedureParameter.optional("nextSnapshotId", LongType)
};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
});

protected ResetConsumerProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String consumerId = args.getString(1);
Long nextSnapshotId = args.isNullAt(2) ? null : args.getLong(2);
return modifyPaimonTable(
tableIdent,
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
ConsumerManager consumerManager =
new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location());
if (nextSnapshotId == null) {
consumerManager.deleteConsumer(consumerId);
} else {
consumerManager.resetConsumer(consumerId, new Consumer(nextSnapshotId));
}

InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
}

public static ProcedureBuilder builder() {
return new Builder<ResetConsumerProcedure>() {
@Override
public ResetConsumerProcedure doBuild() {
return new ResetConsumerProcedure(tableCatalog());
}
};
}

@Override
public String description() {
return "ResetConsumerProcedure";
}
}

0 comments on commit cb6f06e

Please sign in to comment.