Skip to content

Commit

Permalink
[spark] Introduce spark compact procedure (apache#2296)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Nov 16, 2023
1 parent 41c195d commit 48c79c0
Show file tree
Hide file tree
Showing 25 changed files with 1,157 additions and 57 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/an
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveProcedures.scala
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
from http://iceberg.apache.org/ version 1.3.0

paimon-hive/paimon-hive-common/src/test/resources/hive-schema-3.1.0.derby.sql
Expand Down
23 changes: 23 additions & 0 deletions docs/content/engines/spark3.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,29 @@ val query = spark.readStream
*/
```

## Spark Procedure

This section introduce all available spark procedures about paimon.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 4%">Procedure Name</th>
<th class="text-left" style="width: 4%">Usage</th>
<th class="text-left" style="width: 20%">Explaination</th>
<th class="text-left" style="width: 4%">Example</th>
</tr>
</thead>
<tbody style="font-size: 12px; ">
<tr>
<td>compact</td>
<td><nobr>CALL [paimon.]sys.compact('&ltidentifier&gt','&ltpartitions&gt','&ltsort_type&gt','&ltcolumns&gt')</nobr><br>CALL [paimon.]sys.compact(table => '&ltidentifier&gt' [,partitions => '&ltpartitions&gt'] [, order_strategy =>'&ltsort_type&gt'] [,order_by => '&ltcolumns&gt'])</td>
<td>identifier: the target table identifier. Cannot be empty.<br><br><nobr>partitions: partition filter. Left empty for all partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy: 'order' or 'zorder' or 'none'. Left empty for 'none'. <br><br><nobr>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two partitions date=01 and date=02, you need to write 'date=01;date=02'<br><br>If you want sort one partition with date=01 and day=01, you need to write 'date=01,day=01'</td>
<td><nobr>SET spark.sql.shuffle.partitions=10; --set the sort parallelism</nobr> <nobr>CALL paimon.sys.compact('my_db.Orders1','f0=0,f1=1;f0=1,f1=1', 'zorder', 'f1,f2');</nobr><br><nobr>CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b')</nobr></td>
</tr>
</tbody>
</table>

## Spark Type Conversion

This section lists all supported type conversion between Spark and Paimon.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** This is a util class for converting string parameter to another format. */
public class ParameterUtils {

public static List<Map<String, String>> getPartitions(String... partitionStrings) {
List<Map<String, String>> partitions = new ArrayList<>();
for (String partition : partitionStrings) {
partitions.add(parseCommaSeparatedKeyValues(partition));
}
return partitions;
}

public static Map<String, String> parseCommaSeparatedKeyValues(String keyValues) {
Map<String, String> kvs = new HashMap<>();
for (String kvString : keyValues.split(",")) {
parseKeyValueString(kvs, kvString);
}
return kvs;
}

public static void parseKeyValueString(Map<String, String> map, String kvString) {
String[] kv = kvString.split("=", 2);
if (kv.length != 2) {
throw new IllegalArgumentException(
String.format(
"Invalid key-value string '%s'. Please use format 'key=value'",
kvString));
}
map.put(kv[0].trim(), kv[1].trim());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges;
import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.ParameterUtils.parseKeyValueString;

/** Factory to create {@link Action}. */
public interface ActionFactory extends Factory {

Expand Down Expand Up @@ -146,23 +149,4 @@ default String getRequiredValue(MultipleParameterTool params, String key) {
checkRequiredArgument(params, key);
return params.get(key);
}

static Map<String, String> parseCommaSeparatedKeyValues(String keyValues) {
Map<String, String> kvs = new HashMap<>();
for (String kvString : keyValues.split(",")) {
parseKeyValueString(kvs, kvString);
}
return kvs;
}

static void parseKeyValueString(Map<String, String> map, String kvString) {
String[] kv = kvString.split("=", 2);
if (kv.length != 2) {
throw new IllegalArgumentException(
String.format(
"Invalid key-value string '%s'. Please use format 'key=value'",
kvString));
}
map.put(kv[0].trim(), kv[1].trim());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;

/**
* Flink action for 'MERGE INTO', which references the syntax as follows (we use 'upsert' semantics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import java.util.Map;

import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;

/**
* Compact database procedure. Usage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.SortCompactAction;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.procedure.ProcedureContext;
Expand Down Expand Up @@ -83,7 +83,7 @@ public String[] call(
Map<String, String> tableConf =
StringUtils.isBlank(tableOptions)
? Collections.emptyMap()
: ActionFactory.parseCommaSeparatedKeyValues(tableOptions);
: ParameterUtils.parseCommaSeparatedKeyValues(tableOptions);
Identifier identifier = Identifier.fromString(tableId);
CompactAction action;
String jobName;
Expand Down Expand Up @@ -113,7 +113,7 @@ public String[] call(
}

if (!(StringUtils.isBlank(partitions) || "ALL".equals(partitions))) {
action.withPartitions(getPartitions(partitions.split(";")));
action.withPartitions(ParameterUtils.getPartitions(partitions.split(";")));
}

return execute(procedureContext, action, jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.procedure.ProcedureContext;

Expand Down Expand Up @@ -50,7 +51,9 @@ public String[] call(
FileStoreTable fileStoreTable =
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
commit.dropPartitions(getPartitions(partitionStrings), BatchWriteBuilder.COMMIT_IDENTIFIER);
commit.dropPartitions(
ParameterUtils.getPartitions(partitionStrings),
BatchWriteBuilder.COMMIT_IDENTIFIER);

return new String[] {"Success"};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;

/** Base implementation for flink {@link Procedure}. */
public abstract class ProcedureBase implements Procedure, Factory {
Expand All @@ -51,14 +46,6 @@ ProcedureBase withCatalog(Catalog catalog) {
return this;
}

protected List<Map<String, String>> getPartitions(String... partitionStrings) {
List<Map<String, String>> partitions = new ArrayList<>();
for (String partition : partitionStrings) {
partitions.add(parseCommaSeparatedKeyValues(partition));
}
return partitions;
}

@Nullable
protected String nullable(String arg) {
return StringUtils.isBlank(arg) ? null : arg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ public String toString() {
}

public static OrderType of(String orderType) {
if (ORDER.orderType.equals(orderType)) {
if (ORDER.orderType.equalsIgnoreCase(orderType)) {
return ORDER;
} else if (ZORDER.orderType.equals(orderType)) {
} else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
return ZORDER;
} else if (HILBERT.orderType.equals(orderType)) {
} else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
return HILBERT;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
checkAnswer(
spark.sql("CALL create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"),
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"),
Row(true) :: Nil)
checkAnswer(
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("test_tag") :: Nil)
checkAnswer(
spark.sql("CALL delete_tag(table => 'test.T', tag => 'test_tag')"),
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
checkAnswer(query(), Row(1, "a") :: Nil)

checkAnswer(
spark.sql("CALL create_tag(table => 'test.T', tag => 'test_tag', snapshot => 1)"),
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 1)"),
Row(true) :: Nil)

// snapshot-2
Expand All @@ -72,17 +73,17 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
assertThrows[RuntimeException] {
spark.sql("CALL rollback(table => 'test.T_exception', version => '2')")
spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')")
}
// rollback to snapshot
checkAnswer(
spark.sql("CALL rollback(table => 'test.T', version => '2')"),
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"),
Row(true) :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)

// rollback to tag
checkAnswer(
spark.sql("CALL rollback(table => 'test.T', version => 'test_tag')"),
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => 'test_tag')"),
Row(true) :: Nil)
checkAnswer(query(), Row(1, "a") :: Nil)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ public boolean dropTable(Identifier ident) {

@Override
public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException {
if (isValidateNamespace(identifier.namespace())) {
if (isValidateNamespace(identifier.namespace())
&& Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) {
ProcedureBuilder builder = SparkProcedures.newBuilder(identifier.name());
if (builder != null) {
return builder.withTableCatalog(this).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark;

import org.apache.paimon.spark.procedure.CompactProcedure;
import org.apache.paimon.spark.procedure.CreateTagProcedure;
import org.apache.paimon.spark.procedure.DeleteTagProcedure;
import org.apache.paimon.spark.procedure.Procedure;
Expand Down Expand Up @@ -48,6 +49,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put("rollback", RollbackProcedure::builder);
procedureBuilders.put("create_tag", CreateTagProcedure::builder);
procedureBuilders.put("delete_tag", DeleteTagProcedure::builder);
procedureBuilders.put("compact", CompactProcedure::builder);
return procedureBuilders.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimeType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
Expand Down Expand Up @@ -137,6 +138,11 @@ public DataType visit(DateType dateType) {
return DataTypes.DateType;
}

@Override
public DataType visit(TimeType timeType) {
return DataTypes.IntegerType;
}

@Override
public DataType visit(TimestampType timestampType) {
return DataTypes.TimestampType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ protected InternalRow newInternalRow(Object... values) {
return new GenericInternalRow(values);
}

protected SparkSession spark() {
return spark;
}

protected abstract static class Builder<T extends BaseProcedure> implements ProcedureBuilder {
private TableCatalog tableCatalog;

Expand Down
Loading

0 comments on commit 48c79c0

Please sign in to comment.