The integration_test
module contains benchmarks derived from the
TPC-DS, TPC-H, and
TPCx-BB benchmarks. These are not official TPC
benchmarks and are only intended to be used to compare relative performance between CPU and GPU
and to help catch performance regressions in the plugin.
For each of these benchmarks, source data must be generated by a utility that can generate data at any scale factor, where the scale factor is an integer representing approximately how many gigabytes of data will be generated, with scale factor 1 meaning ~1 GB, and scale factor 1000 meaning ~1 TB, for example.
Further information on data generation can be found using the following links:
The remainder of this document is based on the TPC-DS benchmark but the steps are very similar for the other benchmarks. The main difference is that the package and class name is different for each benchmark.
Benchmark | Package | Class Names |
---|---|---|
TPC-DS | com.nvidia.spark.rapids.tests.tpcds | ConvertFiles, TpcdsLikeBench |
TPC-xBB | com.nvidia.spark.rapids.tests.tpcxbb | ConvertFiles, TpcxbbLikeBench |
TPC-H | com.nvidia.spark.rapids.tests.tpch | ConvertFiles, TpchLikeBench |
The integration test jar needs to be added to the --jars
configuration option when launching the
Spark shell. This jar can be found in the integration_tests/target
directory after running
mvn package
, with a filename matching rapids-4-spark-integration-tests_2.12-*-SNAPSHOT.jar
.
To run benchmarks on the GPU, the RAPIDS Accelerator for Apache Spark must also be installed, following the instructions provided in the Getting Started guide.
Although it is possible to run benchmarks directly against the CSV data generated by the TPC data generators, it is common to convert the data to Parquet format and run benchmarks against the Parquet files instead.
The integration_test
module contains code for converting the CSV data sets to Parquet.
The following commands can be entered into spark-shell to perform the conversion.
import com.nvidia.spark.rapids.tests.tpcds._
TpcdsLikeSpark.csvToParquet(spark, "/path/to/input", "/path/to/output")
Note that the code for converting CSV to Parquet does not explicitly specify the number of
partitions to write, so the size of the resulting parquet files will vary depending on the value
for spark.default.parallelism
, which by default is based on the number of available executor
cores. However, the file conversion methods accept coalesce
and repartition
arguments to
better control the size of the partitions on a per-table basis.
Example using coalesce
and repartition
options to control the number and size of partitions
for specific tables.
TpcdsLikeSpark.csvToParquet(spark, "/path/to/input", "/path/to/output",
coalesce=Map("customer_address" -> 1), repartition=Map("web_sales" -> 256))
It is also possible to use spark-submit
to run the file conversion process.
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER_URL \
--jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \
--class com.nvidia.spark.rapids.tests.tpcds.ConvertFiles \
$SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \
--input /path/to/input \
--output /path/to/output \
--output-format parquet \
--coalesce customer_address=1 \
--repartition web_sales=256 inventory=128
It should also be noted that no decimal types will be output. The conversion code uses explicit schemas to ensure that decimal types are converted to floating-point types instead because the plugin does not yet support decimal types but these will be supported in a future release.
The benchmarks can be executed in two modes currently:
- Execute the query and collect the results to the driver
- Execute the query and write the results to disk (in Parquet, CSV, or ORC format)
The following commands can be entered into spark-shell to register the data files that the benchmark will query.
import com.nvidia.spark.rapids.tests.tpcds._
TpcdsLikeSpark.setupAllParquet(spark, "/path/to/tpcds")
The benchmark can be executed with the following syntax to execute the query and collect the results to the driver.
import com.nvidia.spark.rapids.tests._
val benchmark = new BenchmarkRunner(TpcdsLikeBench)
benchmark.collect(spark, "q5", iterations=3)
The benchmark can be executed with the following syntax to execute the query and write the results
to Parquet. There are also writeCsv
and writeOrc
methods for writing the output to CSV or ORC
files.
import com.nvidia.spark.rapids.tests._
val benchmark = new BenchmarkRunner(TpcdsLikeBench)
benchmark.writeParquet(spark, "q5", "/data/output/tpcds/q5", iterations=3)
The benchmark runner has a command-line interface, allowing it to be submitted
to Spark using spark-submit
which can be more practical than using the Spark shell when
running a series of benchmarks using automation.
Here is an example spark-submit
command for running TPC-DS query 5, reading from Parquet and
writing results to Parquet. The --output
and --output-format
arguments can be omitted to
have the benchmark call collect()
on the results instead.
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER_URL \
--jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \
--class com.nvidia.spark.rapids.tests.BenchmarkRunner \
$SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \
--benchmark tpcds \
--query q5 \
--input /raid/tpcds-3TB-parquet-largefiles \
--input-format parquet \
--output /raid/tpcds-output/tpcds-q5-cpu \
--output-format parquet \
--summary-file-prefix tpcds-q5-cpu \
--iterations 1
Each benchmark run produces a JSON file containing information about the environment and the query, including the following items:
- Spark version
- Spark configuration
- Environment variables
- Logical and physical query plan
- SQL metrics for the executed plan
- Timing information for each query iteration
Care should be taken to ensure that no sensitive information is captured from the environment
before sharing these JSON files. Environment variables with names containing the words PASSWORD
,
TOKEN
, or SECRET
are filtered out, but this may not be sufficient to prevent leaking secrets.
For convenience, the benchmark.py script is provided, allowing benchmarks to be run in an automated way with multiple configurations. Example usage is provided in the documentation within the script.
It is important to verify that queries actually produced the correct output, especially when comparing between CPU and GPU benchmarks. A utility is provided to help with this.
This is a simple utility that pulls results down to the driver for comparison so will only work for data sets that can fit in the driver's memory.
If data needs sorting before comparison, this is delegated to Spark before collecting the results.
Example usage from spark-shell:
val cpu = spark.read.parquet("/data/tpcxbb/q5-cpu")
val gpu = spark.read.parquet("/data/tpcxbb/q5-gpu")
import com.nvidia.spark.rapids.tests.common._
BenchUtils.compareResults(cpu, gpu, ignoreOrdering=true, epsilon=0.0001)
This will report on any differences between the two dataframes.
The verification utility can also be run using spark-submit
using the following syntax.
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER_URL \
--jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \
--class com.nvidia.spark.rapids.tests.common.CompareResults \
$SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \
--input1 /path/to/result1 \
--input2 /path/to/result2 \
--input-format parquet
Please refer to the Tuning Guide for information on performance tuning.