Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7953][VL] Fetch and dump all inputs for micro benchmark on middle stage begin #7998

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,11 @@ jobs:
run: |
$MVN_CMD test -Pspark-3.5 -Pbackends-velox -pl backends-velox -am \
-DtagsToInclude="org.apache.gluten.tags.GenerateExample" -Dtest=none -DfailIfNoTests=false -Dexec.skip
# This test depends on example.json generated by the above mvn test.
cd cpp/build/velox/benchmarks && sudo chmod +x ./generic_benchmark
./generic_benchmark --run-example --with-shuffle --threads 1 --iterations 1
# This test depends on files generated by the above mvn test.
./cpp/build/velox/benchmarks/generic_benchmark --with-shuffle --partitioning hash --threads 1 --iterations 1 \
--conf $(realpath backends-velox/generated-native-benchmark/conf_12_0.ini) \
--plan $(realpath backends-velox/generated-native-benchmark/plan_12_0.json) \
--data $(realpath backends-velox/generated-native-benchmark/data_12_0_0.parquet),$(realpath backends-velox/generated-native-benchmark/data_12_0_1.parquet)
- name: Run UDF test
run: |
# Depends on --build_example=ON.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@ package org.apache.gluten.benchmarks
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.{VeloxWholeStageTransformerSuite, WholeStageTransformer}

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf

import org.apache.commons.io.FileUtils
import org.scalatest.Tag

import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import scala.collection.JavaConverters._

object GenerateExample extends Tag("org.apache.gluten.tags.GenerateExample")

Expand All @@ -50,6 +46,11 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
createTPCHNotNullTables()
}

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
}

test("Test plan json non-empty - AQE off") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
Expand All @@ -67,7 +68,6 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson
assert(planJson.nonEmpty)
}
spark.sparkContext.setLogLevel(logLevel)
}

test("Test plan json non-empty - AQE on") {
Expand All @@ -88,70 +88,42 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
val planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson
assert(planJson.nonEmpty)
}
spark.sparkContext.setLogLevel(logLevel)
}

test("generate example", GenerateExample) {
import testImplicits._
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT.key -> "true"
GlutenConfig.BENCHMARK_SAVE_DIR.key -> generatedPlanDir,
GlutenConfig.BENCHMARK_TASK_STAGEID.key -> "12",
GlutenConfig.BENCHMARK_TASK_PARTITIONID.key -> "0"
) {
logWarning(s"Generating inputs for micro benchmark to $generatedPlanDir")
val q4_lineitem = spark
.sql(s"""
|select l_orderkey from lineitem where l_commitdate < l_receiptdate
|""".stripMargin)
val q4_orders = spark
.sql(s"""
|select o_orderkey, o_orderpriority
| from orders
| where o_orderdate >= '1993-07-01' and o_orderdate < '1993-10-01'
|""".stripMargin)
q4_lineitem
.createOrReplaceTempView("q4_lineitem")
q4_orders
.createOrReplaceTempView("q4_orders")

q4_lineitem
.repartition(1, 'l_orderkey)
.write
.format(outputFileFormat)
.save(generatedPlanDir + "/example_lineitem")
q4_orders
.repartition(1, 'o_orderkey)
.write
.format(outputFileFormat)
.save(generatedPlanDir + "/example_orders")

val df =
spark.sql("""
|select * from q4_orders left semi join q4_lineitem on l_orderkey = o_orderkey
|""".stripMargin)
generateSubstraitJson(df, "example.json")
spark
.sql("""
|select /*+ REPARTITION(1) */
| o_orderpriority,
| count(*) as order_count
|from
| orders
|where
| o_orderdate >= date '1993-07-01'
| and o_orderdate < date '1993-07-01' + interval '3' month
| and exists (
| select /*+ REPARTITION(1) */
| *
| from
| lineitem
| where
| l_orderkey = o_orderkey
| and l_commitdate < l_receiptdate
| )
|group by
| o_orderpriority
|order by
| o_orderpriority
|""".stripMargin)
.foreach(_ => ())
}
spark.sparkContext.setLogLevel(logLevel)
}

def generateSubstraitJson(df: DataFrame, file: String): Unit = {
val executedPlan = df.queryExecution.executedPlan
executedPlan.execute()
val finalPlan =
executedPlan match {
case aqe: AdaptiveSparkPlanExec =>
aqe.executedPlan match {
case s: ShuffleQueryStageExec => s.shuffle.child
case other => other
}
case plan => plan
}
val lastStageTransformer = finalPlan.find(_.isInstanceOf[WholeStageTransformer])
assert(lastStageTransformer.nonEmpty)
val plan =
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson.split('\n')

val exampleJsonFile = Paths.get(generatedPlanDir, file)
Files.write(exampleJsonFile, plan.toList.asJava, StandardCharsets.UTF_8)
}
}
4 changes: 0 additions & 4 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ if(ENABLE_IAA)
add_definitions(-DGLUTEN_ENABLE_IAA)
endif()

if(ENABLE_ORC)
add_definitions(-DGLUTEN_ENABLE_ORC)
endif()

#
# Subdirectories
#
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "operators/c2r/ColumnarToRow.h"
#include "operators/r2c/RowToColumnar.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "operators/writer/ArrowWriter.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "substrait/plan.pb.h"
Expand Down Expand Up @@ -124,6 +125,8 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

virtual void dumpConf(const std::string& path) = 0;

virtual std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path) = 0;

const std::unordered_map<std::string, std::string>& getConfMap() {
return confMap_;
}
Expand Down
44 changes: 28 additions & 16 deletions cpp/core/jni/JniCommon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,34 @@ gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::next() {
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
return nullptr; // stream ended
}

checkException(env);
jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_);
checkException(env);
auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
if (writer_ != nullptr) {
// save snapshot of the batch to file
std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get()));
GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
if (!writer_->closed()) {
// Dump all inputs.
while (env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_);
checkException(env);
auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);

// Save the snapshot of the batch to file.
std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get()));
GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
}
checkException(env);
GLUTEN_THROW_NOT_OK(writer_->closeWriter());
}
return writer_->retrieveColumnarBatch();
} else {
if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
return nullptr; // stream ended
}
checkException(env);
jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_);
checkException(env);
return ObjectStore::retrieve<ColumnarBatch>(handle);
}
return batch;
}
1 change: 1 addition & 0 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "compute/Runtime.h"
#include "config/GlutenConfig.h"
#include "memory/AllocationListener.h"
#include "operators/writer/ArrowWriter.h"
#include "shuffle/rss/RssClient.h"
#include "utils/Compression.h"
#include "utils/Exception.h"
Expand Down
14 changes: 11 additions & 3 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,16 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
}
saveDir = conf.at(kGlutenSaveDir);
std::filesystem::path f{saveDir};
if (!std::filesystem::exists(f)) {
throw GlutenException("Save input path " + saveDir + " does not exists");
if (std::filesystem::exists(f)) {
if (!std::filesystem::is_directory(f)) {
throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " + saveDir);
}
} else {
std::error_code ec;
std::filesystem::create_directory(f, ec);
if (ec) {
throw GlutenException("Failed to create directory: " + saveDir + ", error message: " + ec.message());
}
}
ctx->dumpConf(saveDir + "/conf" + fileIdentifier + ".ini");
}
Expand Down Expand Up @@ -407,7 +415,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
std::shared_ptr<ArrowWriter> writer = nullptr;
if (saveInput) {
auto file = saveDir + "/data" + fileIdentifier + "_" + std::to_string(idx) + ".parquet";
writer = std::make_shared<ArrowWriter>(file);
writer = ctx->createArrowWriter(file);
}
jobject iter = env->GetObjectArrayElement(iterArr, idx);
auto arrayIter = makeJniColumnarBatchIterator(env, iter, ctx, writer);
Expand Down
1 change: 0 additions & 1 deletion cpp/core/memory/ColumnarBatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "arrow/c/helpers.h"
#include "arrow/record_batch.h"
#include "memory/MemoryManager.h"
#include "operators/writer/ArrowWriter.h"
#include "utils/ArrowStatus.h"
#include "utils/Exception.h"

Expand Down
9 changes: 8 additions & 1 deletion cpp/core/operators/writer/ArrowWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/table.h"
#include "arrow/util/type_fwd.h"

namespace gluten {
arrow::Status ArrowWriter::initWriter(arrow::Schema& schema) {
if (writer_ != nullptr) {
return arrow::Status::OK();
Expand Down Expand Up @@ -50,9 +51,15 @@ arrow::Status ArrowWriter::writeInBatches(std::shared_ptr<arrow::RecordBatch> ba
}

arrow::Status ArrowWriter::closeWriter() {
// Write file footer and close
// Write file footer and close.
if (writer_ != nullptr) {
ARROW_RETURN_NOT_OK(writer_->Close());
}
closed_ = true;
return arrow::Status::OK();
}

bool ArrowWriter::closed() const {
return closed_;
}
} // namespace gluten
16 changes: 13 additions & 3 deletions cpp/core/operators/writer/ArrowWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@

#pragma once

#include "parquet/arrow/writer.h"
#include <parquet/arrow/writer.h>
#include "memory/ColumnarBatch.h"

namespace gluten {
/**
* @brief Used to print RecordBatch to a parquet file
*
*/
class ArrowWriter {
public:
explicit ArrowWriter(std::string& path) : path_(path) {}
explicit ArrowWriter(const std::string& path) : path_(path) {}

virtual ~ArrowWriter() = default;

arrow::Status initWriter(arrow::Schema& schema);

arrow::Status writeInBatches(std::shared_ptr<arrow::RecordBatch> batch);

arrow::Status closeWriter();

private:
bool closed() const;

virtual std::shared_ptr<ColumnarBatch> retrieveColumnarBatch() = 0;

protected:
std::unique_ptr<parquet::arrow::FileWriter> writer_;
std::string path_;
bool closed_{false};
};
} // namespace gluten
3 changes: 3 additions & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,12 @@ set(VELOX_SRCS
operators/functions/RegistrationAllFunctions.cc
operators/functions/RowConstructorWithNull.cc
operators/functions/SparkExprToSubfieldFilterParser.cc
operators/reader/FileReaderIterator.cc
operators/reader/ParquetReaderIterator.cc
operators/serializer/VeloxColumnarToRowConverter.cc
operators/serializer/VeloxColumnarBatchSerializer.cc
operators/serializer/VeloxRowToColumnarConverter.cc
operators/writer/VeloxArrowWriter.cc
operators/writer/VeloxParquetDataSource.cc
shuffle/VeloxShuffleReader.cc
shuffle/VeloxShuffleWriter.cc
Expand Down
7 changes: 1 addition & 6 deletions cpp/velox/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

find_arrow_lib(${PARQUET_LIB_NAME})

set(VELOX_BENCHMARK_COMMON_SRCS common/FileReaderIterator.cc
common/BenchmarkUtils.cc)
set(VELOX_BENCHMARK_COMMON_SRCS common/BenchmarkUtils.cc)
add_library(velox_benchmark_common STATIC ${VELOX_BENCHMARK_COMMON_SRCS})
target_include_directories(
velox_benchmark_common PUBLIC ${CMAKE_SOURCE_DIR}/velox
Expand All @@ -38,7 +37,3 @@ add_velox_benchmark(columnar_to_row_benchmark ColumnarToRowBenchmark.cc)
add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)

add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)

if(ENABLE_ORC)
add_velox_benchmark(orc_converter exec/OrcConverter.cc)
endif()
Loading
Loading