Skip to content

Commit

Permalink
[GLUTEN-6935][CH]query fails when set session level join_algorithm to…
Browse files Browse the repository at this point in the history
… grace_hash (apache#6944)
  • Loading branch information
loudongfeng authored and shamirchen committed Oct 14, 2024
1 parent 6253721 commit 20928aa
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf

class GlutenClickHouseJoinSuite extends GlutenClickHouseWholeStageTransformerSuite {

protected val tablesPath: String = basePath + "/tpch-data"
protected val tpchQueries: String =
rootPath + "../../../../gluten-core/src/test/resources/tpch-queries"
protected val queriesResults: String = rootPath + "queries-output"

private val joinAlgorithm = "spark.gluten.sql.columnar.backend.ch.runtime_settings.join_algorithm"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.adaptive.enabled", "false")
.set("spark.sql.files.minPartitionNum", "1")
.set("spark.gluten.sql.columnar.columnartorow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
.set("spark.sql.warehouse.dir", warehouse)
.set(
"spark.sql.warehouse.dir",
getClass.getResource("/").getPath + "tests-working-home/spark-warehouse")
.set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.shuffle.manager", "sort")
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set(joinAlgorithm, "hash")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.setMaster("local[*]")
}

test("int to long join key rewrite causes column miss match ") {
assert("hash".equalsIgnoreCase(sparkConf.get(joinAlgorithm, "hash")))
withSQLConf(joinAlgorithm -> "grace_hash") {
withTable("my_customer", "my_store_sales", "my_date_dim") {
sql("""
|CREATE TABLE my_customer (
| c_customer_sk INT)
|USING orc
|""".stripMargin)
sql("""
|CREATE TABLE my_store_sales (
| ss_sold_date_sk INT,
| ss_customer_sk INT)
| USING orc
|""".stripMargin)
sql("""
|CREATE TABLE my_date_dim (
| d_date_sk INT,
| d_year INT,
| d_qoy INT)
|USING orc
|""".stripMargin)

sql("insert into my_customer values (1), (2), (3), (4)")
sql("insert into my_store_sales values (1, 1), (2, 2), (3, 3), (4, 4)")
sql("insert into my_date_dim values (1, 2002, 1), (2, 2002, 2)")
val q =
"""
|SELECT
| count(*) cnt1
|FROM
| my_customer c
|WHERE
| exists(SELECT *
| FROM my_store_sales, my_date_dim
| WHERE c.c_customer_sk = ss_customer_sk AND
| ss_sold_date_sk = d_date_sk AND
| d_year = 2002 AND
| d_qoy < 4)
|LIMIT 100
|""".stripMargin
runQueryAndCompare(q)(checkGlutenOperatorMatch[CHShuffledHashJoinExecTransformer])
}
}
}

}
7 changes: 3 additions & 4 deletions cpp-ch/local-engine/Parser/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ using namespace DB;

namespace local_engine
{
std::shared_ptr<DB::TableJoin> createDefaultTableJoin(substrait::JoinRel_JoinType join_type, bool is_existence_join)
std::shared_ptr<DB::TableJoin> createDefaultTableJoin(substrait::JoinRel_JoinType join_type, bool is_existence_join, ContextPtr & context)
{
auto & global_context = SerializedPlanParser::global_context;
auto table_join = std::make_shared<TableJoin>(
global_context->getSettingsRef(), global_context->getGlobalTemporaryVolume(), global_context->getTempDataOnDisk());
context->getSettingsRef(), context->getGlobalTemporaryVolume(), context->getTempDataOnDisk());

std::pair<DB::JoinKind, DB::JoinStrictness> kind_and_strictness = JoinUtil::getJoinKindAndStrictness(join_type, is_existence_join);
table_join->setKind(kind_and_strictness.first);
Expand Down Expand Up @@ -216,7 +215,7 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q
renamePlanColumns(*left, *right, *storage_join);
}

auto table_join = createDefaultTableJoin(join.type(), join_opt_info.is_existence_join);
auto table_join = createDefaultTableJoin(join.type(), join_opt_info.is_existence_join, context);
DB::Block right_header_before_convert_step = right->getCurrentDataStream().header;
addConvertStep(*table_join, *left, *right);

Expand Down

0 comments on commit 20928aa

Please sign in to comment.