Skip to content

Commit

Permalink
[spark] Compact procedure support where parameter (#3119)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Apr 1, 2024
1 parent 2d1a106 commit 5b2ed7b
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 45 deletions.
15 changes: 13 additions & 2 deletions docs/content/engines/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,19 @@ s
<tbody style="font-size: 12px; ">
<tr>
<td>compact</td>
<td>identifier: the target table identifier. Cannot be empty.<br><br><nobr>partitions: partition filter. Left empty for all partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'. <br><br><nobr>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two partitions date=01 and date=02, you need to write 'date=01;date=02'<br><br>If you want sort one partition with date=01 and day=01, you need to write 'date=01,day=01'</td>
<td><nobr>SET spark.sql.shuffle.partitions=10; --set the compact parallelism</nobr><br><nobr>CALL sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b')</nobr></td>
<td>
To compact files. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
<li>partitions: partition filter. "," means "AND"<br>";" means "OR".If you want to compact one partition with date=01 and day=01, you need to write 'date=01,day=01'. Left empty for all partitions. (Can't be used together with "where")</li>
<li>where: partition predicate. Left empty for all partitions. (Can't be used together with "partitions")</li>
<li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.</li>
<li>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.</li>
</td>
<td>
SET spark.sql.shuffle.partitions=10; --set the compact parallelism <br/>
CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b') <br/>
CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')
</td>
</tr>
<tr>
<td>expire_snapshots</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
Expand Down Expand Up @@ -108,6 +109,11 @@ protected SparkTable loadSparkTable(Identifier ident) {
}
}

protected LogicalPlan createRelation(Identifier ident) {
return DataSourceV2Relation.create(
loadSparkTable(ident), Option.apply(tableCatalog), Option.apply(ident));
}

protected void refreshSparkCache(Identifier ident, Table table) {
CacheManager cacheManager = spark.sharedState().cacheManager();
DataSourceV2Relation relation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.spark.DynamicOverWrite$;
import org.apache.paimon.spark.SparkUtils;
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper;
import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.table.BucketMode;
Expand All @@ -44,7 +44,6 @@
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.StringUtils;

Expand All @@ -53,7 +52,11 @@
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Utils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.plans.logical.Filter;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
Expand All @@ -74,13 +77,14 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.spark.sql.types.DataTypes.StringType;

/**
* Compact procedure. Usage:
*
* <pre><code>
* CALL sys.compact(table => 'tableId', [partitions => 'p1;p2'], [order_strategy => 'xxx'], [order_by => 'xxx'])
* CALL sys.compact(table => 'tableId', [partitions => 'p1=0,p2=0;p1=0,p2=1'], [order_strategy => 'xxx'], [order_by => 'xxx'], [where => 'p1>0'])
* </code></pre>
*/
public class CompactProcedure extends BaseProcedure {
Expand All @@ -91,6 +95,7 @@ public class CompactProcedure extends BaseProcedure {
ProcedureParameter.optional("partitions", StringType),
ProcedureParameter.optional("order_strategy", StringType),
ProcedureParameter.optional("order_by", StringType),
ProcedureParameter.optional("where", StringType)
};

private static final StructType OUTPUT_TYPE =
Expand All @@ -115,30 +120,47 @@ public StructType outputType() {

@Override
public InternalRow[] call(InternalRow args) {
Preconditions.checkArgument(args.numFields() >= 1);
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String partitions = blank(args, 1) ? null : args.getString(1);
String sortType = blank(args, 2) ? TableSorter.OrderType.NONE.name() : args.getString(2);
List<String> sortColumns =
blank(args, 3)
? Collections.emptyList()
: Arrays.asList(args.getString(3).split(","));
String where = blank(args, 4) ? null : args.getString(4);
if (TableSorter.OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
throw new IllegalArgumentException(
"order_strategy \"none\" cannot work with order_by columns.");
}

checkArgument(
partitions == null || where == null,
"partitions and where cannot be used together.");
String finalWhere = partitions != null ? toWhere(partitions) : where;
return modifyPaimonTable(
tableIdent,
table -> {
Preconditions.checkArgument(table instanceof FileStoreTable);
checkArgument(table instanceof FileStoreTable);
LogicalPlan relation = createRelation(tableIdent);
Expression condition = null;
if (!StringUtils.isBlank(finalWhere)) {
condition = ExpressionHelper.resolveFilter(spark(), relation, finalWhere);
checkArgument(
ExpressionHelper.onlyHasPartitionPredicate(
spark(),
condition,
table.partitionKeys().toArray(new String[0])),
"Only partition predicate is supported, your predicate is %s, but partition keys are %s",
condition,
table.partitionKeys());
}
InternalRow internalRow =
newInternalRow(
execute(
(FileStoreTable) table,
sortType,
sortColumns,
partitions));
relation,
condition));
return new InternalRow[] {internalRow};
});
}
Expand All @@ -156,18 +178,18 @@ private boolean execute(
FileStoreTable table,
String sortType,
List<String> sortColumns,
@Nullable String partitions) {
LogicalPlan relation,
@Nullable Expression condition) {
table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
BucketMode bucketMode = table.bucketMode();
TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);

if (orderType.equals(TableSorter.OrderType.NONE)) {
JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext());
Predicate filter =
StringUtils.isBlank(partitions)
condition == null
? null
: PredicateBuilder.partitions(
ParameterUtils.getPartitions(partitions), table.rowType());
: ExpressionHelper.convertConditionToPaimonPredicate(
condition, relation.output(), table.rowType());
switch (bucketMode) {
case FIXED:
case DYNAMIC:
Expand All @@ -183,7 +205,8 @@ private boolean execute(
} else {
switch (bucketMode) {
case UNAWARE:
sortCompactUnAwareBucketTable(table, orderType, sortColumns, partitions);
sortCompactUnAwareBucketTable(
table, orderType, sortColumns, relation, condition);
break;
default:
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -336,10 +359,11 @@ private void sortCompactUnAwareBucketTable(
FileStoreTable table,
TableSorter.OrderType orderType,
List<String> sortColumns,
@Nullable String partitions) {
LogicalPlan relation,
@Nullable Expression condition) {
Dataset<Row> row =
spark().read().format("paimon").load(table.coreOptions().path().toString());
row = StringUtils.isBlank(partitions) ? row : row.where(toWhere(partitions));
Utils.createDataset(
spark(), condition == null ? relation : new Filter(condition, relation));
new WriteIntoPaimonTable(
table,
DynamicOverWrite$.MODULE$,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
val matched = merge.matchedActions
val notMatched = merge.notMatchedActions

val resolve: (Expression, LogicalPlan) => Expression = resolveExpression(spark) _
val resolve: (Expression, LogicalPlan) => Expression = resolveExpression(spark)

def resolveMergeAction(action: MergeAction): MergeAction = {
action match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

package org.apache.paimon.spark.catalyst.analysis.expressions

import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.SparkFilterConverter
import org.apache.paimon.types.RowType

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, Expression, GetStructField, Literal, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, NullType}

Expand Down Expand Up @@ -106,4 +111,49 @@ object ExpressionHelper {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children = newChildren)
}

def resolveFilter(spark: SparkSession, plan: LogicalPlan, where: String): Expression = {
val unResolvedExpression = spark.sessionState.sqlParser.parseExpression(where)
val filter = Filter(unResolvedExpression, plan)
spark.sessionState.analyzer.execute(filter) match {
case filter: Filter => filter.condition
case _ => throw new RuntimeException(s"Could not resolve expression $where in plan: $plan")
}
}

def onlyHasPartitionPredicate(
spark: SparkSession,
expr: Expression,
partitionCols: Array[String]): Boolean = {
val resolvedNameEquals = spark.sessionState.analyzer.resolver
splitConjunctivePredicates(expr).forall(
e =>
e.references.forall(r => partitionCols.exists(resolvedNameEquals(r.name, _))) &&
!SubqueryExpression.hasSubquery(expr))
}

def convertConditionToPaimonPredicate(
condition: Expression,
output: Seq[Attribute],
rowType: RowType): Predicate = {
val converter = new SparkFilterConverter(rowType)
val filters = normalizeExprs(Seq(condition), output)
.flatMap(splitConjunctivePredicates(_).map {
f =>
translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
throw new RuntimeException("Exec update failed:" +
s" cannot translate expression to source filter: $f"))
})
.toArray
val predicates = filters.map(converter.convert)
PredicateBuilder.and(predicates: _*)
}

def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.options.Options
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper.convertConditionToPaimonPredicate
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
Expand Down Expand Up @@ -51,7 +52,9 @@ trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with Pa
(None, false)
} else {
try {
(Some(convertConditionToPaimonPredicate(condition, relation.output)), false)
(
Some(convertConditionToPaimonPredicate(condition(), relation.output, table.rowType())),
false)
} catch {
case NonFatal(_) =>
(None, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,15 @@

package org.apache.paimon.spark.commands

import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.SparkFilterConverter
import org.apache.paimon.types.RowType

import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}

import java.io.IOException

/** Helper trait for all paimon commands. */
trait PaimonCommand extends WithFileStoreTable with PredicateHelper {

protected def convertConditionToPaimonPredicate(
condition: Expression,
output: Seq[Attribute]): Predicate = {
val converter = new SparkFilterConverter(table.rowType)
val filters = normalizeExprs(Seq(condition), output)
.flatMap(splitConjunctivePredicates(_).map {
f =>
translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
throw new RuntimeException("Exec update failed:" +
s" cannot translate expression to source filter: $f"))
})
.toArray
val predicates = filters.map(converter.convert)
PredicateBuilder.and(predicates: _*)
}

/**
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call the `truncate`
* methods where the `AlwaysTrue` Filter is used.
Expand Down
Loading

0 comments on commit 5b2ed7b

Please sign in to comment.