Skip to content

Commit

Permalink
[VL] Add write IO metrics for WriteFiles (apache#7011)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohahaha authored and shamirchen committed Oct 14, 2024
1 parent 80df0a4 commit 48ddcae
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"physicalWrittenBytes" -> SQLMetrics.createSizeMetric(
sparkContext,
"number of written bytes"),
"writeIONanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write IO"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write"),
"numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(write.isDefined)
val metrics = write.get.metrics
assert(metrics("physicalWrittenBytes").value > 0)
assert(metrics("writeIONanos").value > 0)
assert(metrics("numWrittenFiles").value == 1)
}
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;");

metricsBuilderConstructor = getMethodIdOrError(
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -504,6 +504,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIter
longArray[Metrics::kIoWaitTime],
longArray[Metrics::kPreloadSplits],
longArray[Metrics::kPhysicalWrittenBytes],
longArray[Metrics::kWriteIOTime],
longArray[Metrics::kNumWrittenFiles]);

JNI_METHOD_END(nullptr)
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct Metrics {

// Write metrics.
kPhysicalWrittenBytes,
kWriteIOTime,
kNumWrittenFiles,

// The end of enum items.
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const std::string kRemainingFilterTime = "totalRemainingFilterTime";
const std::string kIoWaitTime = "ioWaitNanos";
const std::string kPreloadSplits = "readyPreloadedSplits";
const std::string kNumWrittenFiles = "numWrittenFiles";
const std::string kWriteIOTime = "writeIOTime";

// others
const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";
Expand Down Expand Up @@ -391,6 +392,7 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] =
runtimeMetric("sum", entry.second->customStats, kNumWrittenFiles);
metrics_->get(Metrics::kPhysicalWrittenBytes)[metricIndex] = second->physicalWrittenBytes;
metrics_->get(Metrics::kWriteIOTime)[metricIndex] = runtimeMetric("sum", second->customStats, kWriteIOTime);

metricIndex += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class Metrics implements IMetrics {
public long[] preloadSplits;

public long[] physicalWrittenBytes;

public long[] writeIOTime;
public long[] numWrittenFiles;

public SingleMetric singleMetric = new SingleMetric();
Expand Down Expand Up @@ -88,6 +88,7 @@ public Metrics(
long[] ioWaitTime,
long[] preloadSplits,
long[] physicalWrittenBytes,
long[] writeIOTime,
long[] numWrittenFiles) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
Expand Down Expand Up @@ -120,6 +121,7 @@ public Metrics(
this.ioWaitTime = ioWaitTime;
this.preloadSplits = preloadSplits;
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
}

Expand Down Expand Up @@ -159,6 +161,7 @@ public OperatorMetrics getOperatorMetrics(int index) {
ioWaitTime[index],
preloadSplits[index],
physicalWrittenBytes[index],
writeIOTime[index],
numWrittenFiles[index]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long preloadSplits;

public long physicalWrittenBytes;

public long writeIOTime;
public long numWrittenFiles;

/** Create an instance for operator metrics. */
Expand Down Expand Up @@ -83,6 +83,7 @@ public OperatorMetrics(
long ioWaitTime,
long preloadSplits,
long physicalWrittenBytes,
long writeIOTime,
long numWrittenFiles) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
Expand Down Expand Up @@ -114,6 +115,7 @@ public OperatorMetrics(
this.ioWaitTime = ioWaitTime;
this.preloadSplits = preloadSplits;
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ object MetricsUtil extends Logging {
val outputBytes = operatorMetrics.get(0).outputBytes

val physicalWrittenBytes = operatorMetrics.get(0).physicalWrittenBytes
val writeIOTime = operatorMetrics.get(0).writeIOTime

var cpuCount: Long = 0
var wallNanos: Long = 0
Expand Down Expand Up @@ -182,6 +183,7 @@ object MetricsUtil extends Logging {
ioWaitTime,
preloadSplits,
physicalWrittenBytes,
writeIOTime,
numWrittenFiles
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class WriteFilesMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metr
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
metrics("physicalWrittenBytes") += operatorMetrics.physicalWrittenBytes
metrics("writeIONanos") += operatorMetrics.writeIOTime
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("numWrittenFiles") += operatorMetrics.numWrittenFiles
}
Expand Down

0 comments on commit 48ddcae

Please sign in to comment.