From 3b9dd9b0fc00566ccdb8037a33702e7d4b19e3fb Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Thu, 8 Aug 2024 22:18:16 +0800 Subject: [PATCH] [spark] Support report partitioning to eliminate shuffle exchange (#3912) --- .../org/apache/paimon/schema/TableSchema.java | 22 ++- .../paimon/table/AbstractFileStoreTable.java | 5 - .../org/apache/paimon/table/BucketSpec.java | 65 +++++++++ .../paimon/table/DelegatedFileStoreTable.java | 5 - .../apache/paimon/table/FileStoreTable.java | 10 +- .../spark/catalog/SparkBaseCatalog.java | 57 ++++++++ .../paimon/spark/PaimonInputPartition.scala | 25 ++++ .../spark/catalog/SparkBaseCatalog.java | 57 ++++++++ .../paimon/spark/PaimonInputPartition.scala | 25 ++++ .../org/apache/paimon/spark/PaimonScan.scala | 66 +++++++++ .../spark/catalog/SparkBaseCatalog.java | 43 +++++- .../catalog/functions/PaimonFunctions.java | 88 ++++++++++++ .../apache/paimon/spark/PaimonBaseScan.scala | 4 +- .../paimon/spark/PaimonInputPartition.scala | 22 ++- .../org/apache/paimon/spark/PaimonScan.scala | 48 ++++++- .../paimon/spark/PaimonStatistics.scala | 2 +- .../spark/sql/BucketedTableQueryTest.scala | 136 ++++++++++++++++++ .../paimon/spark/sql/PaimonMetricTest.scala | 2 +- 18 files changed, 652 insertions(+), 30 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/BucketSpec.java create mode 100644 paimon-spark/paimon-spark-3.1/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java create mode 100644 paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala create mode 100644 paimon-spark/paimon-spark-3.2/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java create mode 100644 paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala create mode 100644 paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/BucketedTableQueryTest.scala diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index bcad8e92b8a5..18bf3c893602 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -18,6 +18,7 @@ package org.apache.paimon.schema; +import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.types.DataField; @@ -66,6 +67,10 @@ public class TableSchema implements Serializable { private final List primaryKeys; + private final List bucketKeys; + + private final int numBucket; + private final Map options; private final @Nullable String comment; @@ -115,8 +120,13 @@ public TableSchema( // try to trim to validate primary keys trimmedPrimaryKeys(); - // try to validate bucket keys - originalBucketKeys(); + // try to validate and initalize the bucket keys + List tmpBucketKeys = originalBucketKeys(); + if (tmpBucketKeys.isEmpty()) { + tmpBucketKeys = trimmedPrimaryKeys(); + } + bucketKeys = tmpBucketKeys; + numBucket = CoreOptions.fromMap(options).bucket(); } public int version() { @@ -171,11 +181,11 @@ public Map options() { return options; } + public int numBuckets() { + return numBucket; + } + public List bucketKeys() { - List bucketKeys = originalBucketKeys(); - if (bucketKeys.isEmpty()) { - bucketKeys = trimmedPrimaryKeys(); - } return bucketKeys; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index d30bd11efda6..6e3c79d4da81 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -150,11 +150,6 @@ public Optional statistics() { return Optional.empty(); } - @Override - public BucketMode bucketMode() { - return store().bucketMode(); - } - @Override public Optional newWriteSelector() { switch (bucketMode()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/BucketSpec.java b/paimon-core/src/main/java/org/apache/paimon/table/BucketSpec.java new file mode 100644 index 000000000000..99ca53e04e47 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/BucketSpec.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import java.util.List; + +/** + * Bucket spec holds all bucket information, we can do plan optimization during table scan. + * + *

If the `bucketMode` is {@link BucketMode#HASH_DYNAMIC}, then `numBucket` is -1; + * + * @since 0.9 + */ +public class BucketSpec { + + private final BucketMode bucketMode; + private final List bucketKeys; + private final int numBuckets; + + public BucketSpec(BucketMode bucketMode, List bucketKeys, int numBuckets) { + this.bucketMode = bucketMode; + this.bucketKeys = bucketKeys; + this.numBuckets = numBuckets; + } + + public BucketMode getBucketMode() { + return bucketMode; + } + + public List getBucketKeys() { + return bucketKeys; + } + + public int getNumBuckets() { + return numBuckets; + } + + @Override + public String toString() { + return "BucketSpec{" + + "bucketMode=" + + bucketMode + + ", bucketKeys=" + + bucketKeys + + ", numBuckets=" + + numBuckets + + '}'; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 58d1caaac19f..243ffb754c17 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -113,11 +113,6 @@ public FileStore store() { return wrapped.store(); } - @Override - public BucketMode bucketMode() { - return wrapped.bucketMode(); - } - @Override public CatalogEnvironment catalogEnvironment() { return wrapped.catalogEnvironment(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 61fe816ac683..ed1ba1da5819 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -59,6 +59,14 @@ default List primaryKeys() { return schema().primaryKeys(); } + default BucketSpec bucketSpec() { + return new BucketSpec(bucketMode(), schema().bucketKeys(), schema().numBuckets()); + } + + default BucketMode bucketMode() { + return store().bucketMode(); + } + @Override default Map options() { return schema().options(); @@ -73,8 +81,6 @@ default Optional comment() { FileStore store(); - BucketMode bucketMode(); - CatalogEnvironment catalogEnvironment(); @Override diff --git a/paimon-spark/paimon-spark-3.1/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java b/paimon-spark/paimon-spark-3.1/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java new file mode 100644 index 000000000000..55670a594de8 --- /dev/null +++ b/paimon-spark/paimon-spark-3.1/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalog; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.spark.SparkProcedures; +import org.apache.paimon.spark.SparkSource; +import org.apache.paimon.spark.analysis.NoSuchProcedureException; +import org.apache.paimon.spark.procedure.Procedure; +import org.apache.paimon.spark.procedure.ProcedureBuilder; + +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.TableCatalog; + +/** Spark base catalog. */ +public abstract class SparkBaseCatalog + implements TableCatalog, SupportsNamespaces, ProcedureCatalog, WithPaimonCatalog { + + protected String catalogName; + + @Override + public String name() { + return catalogName; + } + + @Override + public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException { + if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) { + ProcedureBuilder builder = SparkProcedures.newBuilder(identifier.name()); + if (builder != null) { + return builder.withTableCatalog(this).build(); + } + } + throw new NoSuchProcedureException(identifier); + } + + public boolean usePaimon(String provider) { + return provider == null || SparkSource.NAME().equalsIgnoreCase(provider); + } +} diff --git a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala new file mode 100644 index 000000000000..49bc71e937de --- /dev/null +++ b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.paimon.table.source.Split + +// never be used +case class PaimonBucketedInputPartition(splits: Seq[Split], bucket: Int) + extends PaimonInputPartition diff --git a/paimon-spark/paimon-spark-3.2/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java b/paimon-spark/paimon-spark-3.2/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java new file mode 100644 index 000000000000..55670a594de8 --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalog; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.spark.SparkProcedures; +import org.apache.paimon.spark.SparkSource; +import org.apache.paimon.spark.analysis.NoSuchProcedureException; +import org.apache.paimon.spark.procedure.Procedure; +import org.apache.paimon.spark.procedure.ProcedureBuilder; + +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.TableCatalog; + +/** Spark base catalog. */ +public abstract class SparkBaseCatalog + implements TableCatalog, SupportsNamespaces, ProcedureCatalog, WithPaimonCatalog { + + protected String catalogName; + + @Override + public String name() { + return catalogName; + } + + @Override + public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException { + if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) { + ProcedureBuilder builder = SparkProcedures.newBuilder(identifier.name()); + if (builder != null) { + return builder.withTableCatalog(this).build(); + } + } + throw new NoSuchProcedureException(identifier); + } + + public boolean usePaimon(String provider) { + return provider == null || SparkSource.NAME().equalsIgnoreCase(provider); + } +} diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala new file mode 100644 index 000000000000..49bc71e937de --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.paimon.table.source.Split + +// never be used +case class PaimonBucketedInputPartition(splits: Seq[Split], bucket: Int) + extends PaimonInputPartition diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala new file mode 100644 index 000000000000..f0476cf70732 --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.paimon.predicate.Predicate +import org.apache.paimon.table.Table + +import org.apache.spark.sql.PaimonUtils.fieldReference +import org.apache.spark.sql.connector.expressions.NamedReference +import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering +import org.apache.spark.sql.sources.{Filter, In} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class PaimonScan( + table: Table, + requiredSchema: StructType, + filters: Seq[Predicate], + reservedFilters: Seq[Filter], + pushDownLimit: Option[Int]) + extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, pushDownLimit) + with SupportsRuntimeFiltering { + + override def filterAttributes(): Array[NamedReference] = { + val requiredFields = readBuilder.readType().getFieldNames.asScala + table + .partitionKeys() + .asScala + .toArray + .filter(requiredFields.contains) + .map(fieldReference) + } + + override def filter(filters: Array[Filter]): Unit = { + val converter = new SparkFilterConverter(table.rowType()) + val partitionFilter = filters.flatMap { + case in @ In(attr, _) if table.partitionKeys().contains(attr) => + Some(converter.convert(in)) + case _ => None + } + if (partitionFilter.nonEmpty) { + this.runtimeFilters = filters + readBuilder.withFilter(partitionFilter.head) + // set inputPartitions null to trigger to get the new splits. + inputPartitions = null + } + } + +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java index 55670a594de8..b5a51356426b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java @@ -22,16 +22,32 @@ import org.apache.paimon.spark.SparkProcedures; import org.apache.paimon.spark.SparkSource; import org.apache.paimon.spark.analysis.NoSuchProcedureException; +import org.apache.paimon.spark.catalog.functions.PaimonFunctions; import org.apache.paimon.spark.procedure.Procedure; import org.apache.paimon.spark.procedure.ProcedureBuilder; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; + +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; + +import java.util.Arrays; +import java.util.Map; + +import scala.Option; /** Spark base catalog. */ public abstract class SparkBaseCatalog - implements TableCatalog, SupportsNamespaces, ProcedureCatalog, WithPaimonCatalog { + implements TableCatalog, + FunctionCatalog, + SupportsNamespaces, + ProcedureCatalog, + WithPaimonCatalog { protected String catalogName; @@ -54,4 +70,29 @@ public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureExce public boolean usePaimon(String provider) { return provider == null || SparkSource.NAME().equalsIgnoreCase(provider); } + + // --------------------- Function Catalog Methods ---------------------------- + private static final Map FUNCTIONS = + ImmutableMap.of("bucket", new PaimonFunctions.BucketFunction()); + + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + UnboundFunction func = FUNCTIONS.get(ident.name()); + if (func == null) { + throw new NoSuchFunctionException( + "Function " + ident + " is not a paimon function", Option.empty()); + } + return func; + } + + @Override + public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + if (namespace.length != 0) { + throw new NoSuchNamespaceException( + "Namespace " + Arrays.toString(namespace) + " is not valid", Option.empty()); + } + return FUNCTIONS.keySet().stream() + .map(name -> Identifier.of(namespace, name)) + .toArray(Identifier[]::new); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java new file mode 100644 index 000000000000..d346f7f24a55 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalog.functions; + +import org.apache.spark.sql.connector.catalog.functions.BoundFunction; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.spark.sql.types.DataTypes.IntegerType; + +/** + * It should be only used for resolving, e.g., for {@link + * org.apache.spark.sql.connector.read.SupportsReportPartitioning}. + */ +public class PaimonFunctions { + /** + * For now, we only support report bucket partitioning for table scan. So the case `SELECT + * bucket(10, col)` would fail since we do not implement {@link + * org.apache.spark.sql.connector.catalog.functions.ScalarFunction} + */ + public static class BucketFunction implements UnboundFunction { + @Override + public BoundFunction bind(StructType inputType) { + if (inputType.size() != 2) { + throw new UnsupportedOperationException( + "Wrong number of inputs (expected numBuckets and value)"); + } + + StructField numBucket = inputType.fields()[0]; + StructField bucketField = inputType.fields()[1]; + checkArgument( + numBucket.dataType() == IntegerType, + "bucket number field must be integer type"); + + return new BoundFunction() { + @Override + public DataType[] inputTypes() { + return new DataType[] {IntegerType, bucketField.dataType()}; + } + + @Override + public DataType resultType() { + return IntegerType; + } + + @Override + public String name() { + return "bucket"; + } + + @Override + public String canonicalName() { + // We have to override this method to make it support canonical equivalent + return "paimon.bucket(" + bucketField.dataType().catalogString() + ", int)"; + } + }; + } + + @Override + public String description() { + return name(); + } + + @Override + public String name() { + return "bucket"; + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala index ac001829781b..6fdfe4f6aff5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala @@ -105,7 +105,7 @@ abstract class PaimonBaseScan( .toArray } - def getInputPartitions: Seq[PaimonInputPartition] = { + final def lazyInputPartitions: Seq[PaimonInputPartition] = { if (inputPartitions == null) { inputPartitions = getInputPartitions(getOriginSplits) } @@ -118,7 +118,7 @@ abstract class PaimonBaseScan( override def toBatch: Batch = { val metadataColumns = metadataFields.map(field => PaimonMetadataColumn.get(field.name)) - PaimonBatch(getInputPartitions, readBuilder, metadataColumns) + PaimonBatch(lazyInputPartitions, readBuilder, metadataColumns) } override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala index 894405fd7255..a7c33b21de5b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonInputPartition.scala @@ -20,16 +20,32 @@ package org.apache.paimon.spark import org.apache.paimon.table.source.Split -import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, SupportsReportPartitioning} + +trait PaimonInputPartition extends InputPartition { + def splits: Seq[Split] -case class PaimonInputPartition(splits: Seq[Split]) extends InputPartition { def rowCount(): Long = { splits.map(_.rowCount()).sum } } +case class SimplePaimonInputPartition(splits: Seq[Split]) extends PaimonInputPartition object PaimonInputPartition { def apply(split: Split): PaimonInputPartition = { - PaimonInputPartition(Seq(split)) + SimplePaimonInputPartition(Seq(split)) + } + + def apply(splits: Seq[Split]): PaimonInputPartition = { + SimplePaimonInputPartition(splits) } } + +/** Bucketed input partition should work with [[SupportsReportPartitioning]] together. */ +case class PaimonBucketedInputPartition(splits: Seq[Split], bucket: Int) + extends PaimonInputPartition + with HasPartitionKey { + override def partitionKey(): InternalRow = new GenericInternalRow(Array(bucket.asInstanceOf[Any])) +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala index f0476cf70732..f34a24991d2a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala @@ -19,11 +19,13 @@ package org.apache.paimon.spark import org.apache.paimon.predicate.Predicate -import org.apache.paimon.table.Table +import org.apache.paimon.table.{BucketMode, FileStoreTable, Table} +import org.apache.paimon.table.source.{DataSplit, Split} import org.apache.spark.sql.PaimonUtils.fieldReference -import org.apache.spark.sql.connector.expressions.NamedReference -import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering +import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference} +import org.apache.spark.sql.connector.read.{SupportsReportPartitioning, SupportsRuntimeFiltering} +import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.sources.{Filter, In} import org.apache.spark.sql.types.StructType @@ -36,7 +38,45 @@ case class PaimonScan( reservedFilters: Seq[Filter], pushDownLimit: Option[Int]) extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, pushDownLimit) - with SupportsRuntimeFiltering { + with SupportsRuntimeFiltering + with SupportsReportPartitioning { + + override def outputPartitioning(): Partitioning = { + table match { + case fileStoreTable: FileStoreTable => + val bucketSpec = fileStoreTable.bucketSpec() + if (bucketSpec.getBucketMode != BucketMode.HASH_FIXED) { + new UnknownPartitioning(0) + } else if (bucketSpec.getBucketKeys.size() > 1) { + new UnknownPartitioning(0) + } else { + // Spark does not support bucket with several input attributes, + // so we only support one bucket key case. + assert(bucketSpec.getNumBuckets > 0) + assert(bucketSpec.getBucketKeys.size() == 1) + val key = Expressions.bucket(bucketSpec.getNumBuckets, bucketSpec.getBucketKeys.get(0)) + new KeyGroupedPartitioning(Array(key), lazyInputPartitions.size) + } + + case _ => + new UnknownPartitioning(0) + } + } + + override def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = { + if (!conf.v2BucketingEnabled || splits.exists(!_.isInstanceOf[DataSplit])) { + return super.getInputPartitions(splits) + } + + splits + .map(_.asInstanceOf[DataSplit]) + .groupBy(_.bucket()) + .map { + case (bucket, groupedSplits) => + PaimonBucketedInputPartition(groupedSplits, bucket) + } + .toSeq + } override def filterAttributes(): Array[NamedReference] = { val requiredFields = readBuilder.readType().getFieldNames.asScala diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala index b1d66c90fe80..da9239dd0021 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala @@ -34,7 +34,7 @@ import scala.collection.JavaConverters._ case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics { - private lazy val rowCount: Long = scan.getInputPartitions.map(_.rowCount()).sum + private lazy val rowCount: Long = scan.lazyInputPartitions.map(_.rowCount()).sum private lazy val scannedTotalSize: Long = rowCount * scan.readSchema().defaultSize diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/BucketedTableQueryTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/BucketedTableQueryTest.scala new file mode 100644 index 000000000000..f7faeabdc924 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/BucketedTableQueryTest.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike + +class BucketedTableQueryTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelper { + private def checkAnswerAndShuffle(query: String, numShuffle: Int): Unit = { + var expectedResult: Array[Row] = null + // avoid config default value change in future, so specify it manually + withSQLConf( + "spark.sql.sources.v2.bucketing.enabled" -> "false", + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + expectedResult = spark.sql(query).collect() + } + withSQLConf( + "spark.sql.sources.v2.bucketing.enabled" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + val df = spark.sql(query) + checkAnswer(df, expectedResult.toSeq) + assert(collect(df.queryExecution.executedPlan) { + case shuffle: ShuffleExchangeLike => shuffle + }.size == numShuffle) + } + } + + test("Query on a bucketed table - join - positive case") { + assume(gteqSpark3_3) + + withTable("t1", "t2", "t3", "t4") { + spark.sql( + "CREATE TABLE t1 (id INT, c STRING) TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10')") + spark.sql("INSERT INTO t1 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + + // all matched + spark.sql( + "CREATE TABLE t2 (id INT, c STRING) TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10')") + spark.sql("INSERT INTO t2 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + checkAnswerAndShuffle("SELECT * FROM t1 JOIN t2 on t1.id = t2.id", 0) + + // different primary-key name but does not matter + spark.sql( + "CREATE TABLE t3 (id2 INT, c STRING) TBLPROPERTIES ('primary-key' = 'id2', 'bucket'='10')") + spark.sql("INSERT INTO t3 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + checkAnswerAndShuffle("SELECT * FROM t1 JOIN t3 on t1.id = t3.id2", 0) + + // one primary-key table and one bucketed table + spark.sql( + "CREATE TABLE t4 (id INT, c STRING) TBLPROPERTIES ('bucket-key' = 'id', 'bucket'='10')") + spark.sql("INSERT INTO t4 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + checkAnswerAndShuffle("SELECT * FROM t1 JOIN t4 on t1.id = t4.id", 0) + } + } + + test("Query on a bucketed table - join - negative case") { + assume(gteqSpark3_3) + + withTable("t1", "t2", "t3", "t4", "t5", "t6") { + spark.sql( + "CREATE TABLE t1 (id INT, c STRING) TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10')") + spark.sql("INSERT INTO t1 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + + // dynamic bucket number + spark.sql("CREATE TABLE t2 (id INT, c STRING) TBLPROPERTIES ('primary-key' = 'id')") + spark.sql("INSERT INTO t2 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + checkAnswerAndShuffle("SELECT * FROM t1 JOIN t2 on t1.id = t2.id", 2) + + // different bucket number + spark.sql( + "CREATE TABLE t3 (id INT, c STRING) TBLPROPERTIES ('primary-key' = 'id', 'bucket'='2')") + spark.sql("INSERT INTO t3 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + checkAnswerAndShuffle("SELECT * FROM t1 JOIN t3 on t1.id = t3.id", 2) + + // different primary-key data type + spark.sql( + "CREATE TABLE t4 (id STRING, c STRING) TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10')") + spark.sql("INSERT INTO t4 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + checkAnswerAndShuffle("SELECT * FROM t1 JOIN t4 on t1.id = t4.id", 2) + + // different input partition number + spark.sql( + "CREATE TABLE t5 (id INT, c STRING) TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10')") + spark.sql("INSERT INTO t5 VALUES (1, 'x1')") + checkAnswerAndShuffle("SELECT * FROM t1 JOIN t5 on t1.id = t5.id", 2) + + // one more bucket keys + spark.sql( + "CREATE TABLE t6 (id1 INT, id2 INT, c STRING) TBLPROPERTIES ('bucket-key' = 'id1,id2', 'bucket'='10')") + spark.sql( + "INSERT INTO t6 VALUES (1, 1, 'x1'), (2, 2, 'x3'), (3, 3, 'x3'), (4, 4, 'x4'), (5, 5, 'x5')") + checkAnswerAndShuffle("SELECT * FROM t1 JOIN t6 on t1.id = t6.id1", 2) + } + } + + test("Query on a bucketed table - other operators") { + assume(gteqSpark3_3) + + withTable("t1") { + spark.sql( + "CREATE TABLE t1 (id INT, c STRING) TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10')") + spark.sql("INSERT INTO t1 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5')") + + checkAnswerAndShuffle("SELECT id, count(*) FROM t1 GROUP BY id", 0) + checkAnswerAndShuffle("SELECT c, count(*) FROM t1 GROUP BY c", 1) + checkAnswerAndShuffle("select sum(c) OVER (PARTITION BY id ORDER BY c) from t1", 0) + checkAnswerAndShuffle("select sum(id) OVER (PARTITION BY c ORDER BY id) from t1", 1) + + withSQLConf("spark.sql.requireAllClusterKeysForDistribution" -> "false") { + checkAnswerAndShuffle("SELECT id, c, count(*) FROM t1 GROUP BY id, c", 0) + } + withSQLConf("spark.sql.requireAllClusterKeysForDistribution" -> "true") { + checkAnswerAndShuffle("SELECT id, c, count(*) FROM t1 GROUP BY id, c", 1) + } + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala index 99ba335a7fd9..f223dabdd2aa 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala @@ -42,7 +42,7 @@ class PaimonMetricTest extends PaimonSparkTestBase { def checkMetrics(s: String, skippedTableFiles: Long, resultedTableFiles: Long): Unit = { val scan = getPaimonScan(s) // call getInputPartitions to trigger scan - scan.getInputPartitions + scan.lazyInputPartitions val metrics = scan.reportDriverMetrics() Assertions.assertEquals(skippedTableFiles, metric(metrics, SKIPPED_TABLE_FILES)) Assertions.assertEquals(resultedTableFiles, metric(metrics, RESULTED_TABLE_FILES))