From e399dc3b84a368830c3e71e0f3472be96978ca78 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 26 Aug 2024 13:25:42 +0800 Subject: [PATCH] [GLUTEN-7008][VL] Report spill metrics from Velox operators to Spark task (#7009) Closes #7008 --- cpp/core/jni/JniWrapper.cc | 3 ++- cpp/core/utils/metrics.h | 1 + cpp/velox/compute/WholeStageResultIterator.cc | 1 + .../org/apache/gluten/metrics/Metrics.java | 4 ++++ .../gluten/metrics/OperatorMetrics.java | 3 +++ .../metrics/HashAggregateMetricsUpdater.scala | 10 ++++++++ .../gluten/metrics/JoinMetricsUpdater.scala | 16 +++++++++++++ .../apache/gluten/metrics/MetricsUtil.scala | 3 +++ .../gluten/metrics/SortMetricsUpdater.scala | 10 ++++++++ .../spark/sql/utils/SparkMetricsUtil.scala | 24 +++++++++++++++++++ 10 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 4be5e9142818..1662b200bf8a 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -175,7 +175,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;"); metricsBuilderConstructor = getMethodIdOrError( - env, metricsBuilderClass, "", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V"); + env, metricsBuilderClass, "", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -478,6 +478,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIter metrics ? metrics->veloxToArrow : -1, longArray[Metrics::kPeakMemoryBytes], longArray[Metrics::kNumMemoryAllocations], + longArray[Metrics::kSpilledInputBytes], longArray[Metrics::kSpilledBytes], longArray[Metrics::kSpilledRows], longArray[Metrics::kSpilledPartitions], diff --git a/cpp/core/utils/metrics.h b/cpp/core/utils/metrics.h index 5b3167b82bbb..bda72b070f24 100644 --- a/cpp/core/utils/metrics.h +++ b/cpp/core/utils/metrics.h @@ -54,6 +54,7 @@ struct Metrics { kNumMemoryAllocations, // Spill. + kSpilledInputBytes, kSpilledBytes, kSpilledRows, kSpilledPartitions, diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 2edf9a573121..59181aaea358 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -364,6 +364,7 @@ void WholeStageResultIterator::collectMetrics() { metrics_->get(Metrics::kWallNanos)[metricIndex] = second->cpuWallTiming.wallNanos; metrics_->get(Metrics::kPeakMemoryBytes)[metricIndex] = second->peakMemoryBytes; metrics_->get(Metrics::kNumMemoryAllocations)[metricIndex] = second->numMemoryAllocations; + metrics_->get(Metrics::kSpilledInputBytes)[metricIndex] = second->spilledInputBytes; metrics_->get(Metrics::kSpilledBytes)[metricIndex] = second->spilledBytes; metrics_->get(Metrics::kSpilledRows)[metricIndex] = second->spilledRows; metrics_->get(Metrics::kSpilledPartitions)[metricIndex] = second->spilledPartitions; diff --git a/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java b/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java index 4e91823e5a21..c4dcbb65a31f 100644 --- a/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -32,6 +32,7 @@ public class Metrics implements IMetrics { public long[] scanTime; public long[] peakMemoryBytes; public long[] numMemoryAllocations; + public long[] spilledInputBytes; public long[] spilledBytes; public long[] spilledRows; public long[] spilledPartitions; @@ -69,6 +70,7 @@ public Metrics( long veloxToArrow, long[] peakMemoryBytes, long[] numMemoryAllocations, + long[] spilledInputBytes, long[] spilledBytes, long[] spilledRows, long[] spilledPartitions, @@ -101,6 +103,7 @@ public Metrics( this.singleMetric.veloxToArrow = veloxToArrow; this.peakMemoryBytes = peakMemoryBytes; this.numMemoryAllocations = numMemoryAllocations; + this.spilledInputBytes = spilledInputBytes; this.spilledBytes = spilledBytes; this.spilledRows = spilledRows; this.spilledPartitions = spilledPartitions; @@ -138,6 +141,7 @@ public OperatorMetrics getOperatorMetrics(int index) { wallNanos[index], peakMemoryBytes[index], numMemoryAllocations[index], + spilledInputBytes[index], spilledBytes[index], spilledRows[index], spilledPartitions[index], diff --git a/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index cb155afed356..cad04987eb57 100644 --- a/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -30,6 +30,7 @@ public class OperatorMetrics implements IOperatorMetrics { public long scanTime; public long peakMemoryBytes; public long numMemoryAllocations; + public long spilledInputBytes; public long spilledBytes; public long spilledRows; public long spilledPartitions; @@ -64,6 +65,7 @@ public OperatorMetrics( long wallNanos, long peakMemoryBytes, long numMemoryAllocations, + long spilledInputBytes, long spilledBytes, long spilledRows, long spilledPartitions, @@ -95,6 +97,7 @@ public OperatorMetrics( this.scanTime = scanTime; this.peakMemoryBytes = peakMemoryBytes; this.numMemoryAllocations = numMemoryAllocations; + this.spilledInputBytes = spilledInputBytes; this.spilledBytes = spilledBytes; this.spilledRows = spilledRows; this.spilledPartitions = spilledPartitions; diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala index 5337af9e6c03..5053cc2ba03d 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala @@ -19,6 +19,8 @@ package org.apache.gluten.metrics import org.apache.gluten.substrait.AggregationParams import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.utils.SparkMetricsUtil +import org.apache.spark.util.TaskResources trait HashAggregateMetricsUpdater extends MetricsUpdater { def updateAggregationMetrics( @@ -81,5 +83,13 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric]) rowConstructionWallNanos += aggregationMetrics.get(idx).wallNanos idx += 1 } + if (TaskResources.inSparkTask()) { + SparkMetricsUtil.incMemoryBytesSpilled( + TaskResources.getLocalTaskContext().taskMetrics(), + aggMetrics.spilledInputBytes) + SparkMetricsUtil.incDiskBytesSpilled( + TaskResources.getLocalTaskContext().taskMetrics(), + aggMetrics.spilledBytes) + } } } diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index 60be8418af54..fe1fa2ad6caf 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -20,6 +20,8 @@ import org.apache.gluten.metrics.Metrics.SingleMetric import org.apache.gluten.substrait.JoinParams import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.utils.SparkMetricsUtil +import org.apache.spark.util.TaskResources import java.util @@ -150,6 +152,20 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos idx += 1 } + if (TaskResources.inSparkTask()) { + SparkMetricsUtil.incMemoryBytesSpilled( + TaskResources.getLocalTaskContext().taskMetrics(), + hashProbeMetrics.spilledInputBytes) + SparkMetricsUtil.incDiskBytesSpilled( + TaskResources.getLocalTaskContext().taskMetrics(), + hashProbeMetrics.spilledBytes) + SparkMetricsUtil.incMemoryBytesSpilled( + TaskResources.getLocalTaskContext().taskMetrics(), + hashBuildMetrics.spilledInputBytes) + SparkMetricsUtil.incDiskBytesSpilled( + TaskResources.getLocalTaskContext().taskMetrics(), + hashBuildMetrics.spilledBytes) + } } } diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index eff4245d5e65..8eea58272f15 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -105,6 +105,7 @@ object MetricsUtil extends Logging { var wallNanos: Long = 0 var peakMemoryBytes: Long = 0 var numMemoryAllocations: Long = 0 + var spilledInputBytes: Long = 0 var spilledBytes: Long = 0 var spilledRows: Long = 0 var spilledPartitions: Long = 0 @@ -130,6 +131,7 @@ object MetricsUtil extends Logging { wallNanos += metrics.wallNanos peakMemoryBytes = peakMemoryBytes.max(metrics.peakMemoryBytes) numMemoryAllocations += metrics.numMemoryAllocations + spilledInputBytes += metrics.spilledInputBytes spilledBytes += metrics.spilledBytes spilledRows += metrics.spilledRows spilledPartitions += metrics.spilledPartitions @@ -162,6 +164,7 @@ object MetricsUtil extends Logging { wallNanos, peakMemoryBytes, numMemoryAllocations, + spilledInputBytes, spilledBytes, spilledRows, spilledPartitions, diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala index 13874035086c..0ba48c62cdfe 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala @@ -17,6 +17,8 @@ package org.apache.gluten.metrics import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.utils.SparkMetricsUtil +import org.apache.spark.util.TaskResources class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { @@ -34,6 +36,14 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpd metrics("spilledRows") += operatorMetrics.spilledRows metrics("spilledPartitions") += operatorMetrics.spilledPartitions metrics("spilledFiles") += operatorMetrics.spilledFiles + if (TaskResources.inSparkTask()) { + SparkMetricsUtil.incMemoryBytesSpilled( + TaskResources.getLocalTaskContext().taskMetrics(), + operatorMetrics.spilledInputBytes) + SparkMetricsUtil.incDiskBytesSpilled( + TaskResources.getLocalTaskContext().taskMetrics(), + operatorMetrics.spilledBytes) + } } } } diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala new file mode 100644 index 000000000000..af8bbb08ed74 --- /dev/null +++ b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.utils + +import org.apache.spark.executor.TaskMetrics + +object SparkMetricsUtil { + def incMemoryBytesSpilled(task: TaskMetrics, v: Long): Unit = task.incMemoryBytesSpilled(v) + def incDiskBytesSpilled(task: TaskMetrics, v: Long): Unit = task.incDiskBytesSpilled(v) +}