Skip to content

Commit

Permalink
[VL] Fall back scan if file scheme is not supported by registered fil…
Browse files Browse the repository at this point in the history
…e systems (apache#6672)
  • Loading branch information
zhli1142015 authored and shamirchen committed Oct 14, 2024
1 parent e525e86 commit 4ef42ff
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
.toLowerCase(Locale.getDefault)
}

override def supportFileFormatRead(
override def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = {

def validateFilePath: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils;

public class VeloxFileSystemValidationJniWrapper {

public static native boolean allSupportedByRegisteredFileSystems(String[] paths);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}
import org.apache.gluten.utils._

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Pi, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SparkVersion, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid}
Expand All @@ -40,6 +41,8 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.hadoop.fs.Path

import scala.util.control.Breaks.breakable

class VeloxBackend extends Backend {
Expand Down Expand Up @@ -70,11 +73,20 @@ object VeloxBackendSettings extends BackendSettingsApi {

val MAXIMUM_BATCH_SIZE: Int = 32768

override def supportFileFormatRead(
override def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (
!filteredRootPaths.isEmpty && !VeloxFileSystemValidationJniWrapper
.allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
) {
return ValidationResult.failed(
s"Scheme of [$filteredRootPaths] is not supported by registered file systems.")
}
// Validate if all types are supported.
def validateTypes(validatorFunc: PartialFunction[StructField, String]): ValidationResult = {
// Collect unsupported types.
Expand Down Expand Up @@ -179,6 +191,17 @@ object VeloxBackendSettings extends BackendSettingsApi {
.isDefined
}

def distinctRootPaths(paths: Seq[String]): Seq[String] = {
// Skip native validation for local path, as local file system is always registered.
// For evey file scheme, only one path is kept.
paths
.map(p => (new Path(p).toUri.getScheme, p))
.groupBy(_._1)
.filter(_._1 != "file")
.map(_._2.head._2)
.toSeq
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
import org.apache.gluten.utils.VeloxFileSystemValidationJniWrapper

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.GreaterThan
import org.apache.spark.sql.execution.ScalarSubquery
Expand Down Expand Up @@ -74,4 +78,44 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite {
}
}
}

test("Test file scheme validation") {
withTempPath {
path =>
withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "false") {
spark
.range(100)
.selectExpr("cast(id % 9 as int) as c1")
.write
.format("parquet")
.save(path.getCanonicalPath)
runQueryAndCompare(s"SELECT count(*) FROM `parquet`.`${path.getCanonicalPath}`") {
df =>
val plan = df.queryExecution.executedPlan
val fileScan = collect(plan) { case s: FileSourceScanExecTransformer => s }
assert(fileScan.size == 1)
val rootPaths = fileScan(0).getRootPathsInternal
assert(rootPaths.length == 1)
assert(rootPaths(0).startsWith("file:/"))
assert(
VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
rootPaths.toArray))
}
}
}
val filteredRootPath =
VeloxBackendSettings.distinctRootPaths(
Seq("file:/test_path/", "test://test/s", "test://test1/s"))
assert(filteredRootPath.length == 1)
assert(filteredRootPath(0).startsWith("test://"))
assert(
VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
Array("file:/test_path/")))
assert(
!VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
Array("unsupported://test_path")))
assert(
!VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
Array("file:/test_path/", "unsupported://test_path")))
}
}
19 changes: 19 additions & 0 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "utils/ObjectStore.h"
#include "utils/VeloxBatchResizer.h"
#include "velox/common/base/BloomFilter.h"
#include "velox/common/file/FileSystems.h"

#include <iostream>

Expand Down Expand Up @@ -260,6 +261,24 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper
JNI_METHOD_END(gluten::kInvalidObjectHandle)
}

JNIEXPORT jboolean JNICALL
Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByRegisteredFileSystems( // NOLINT
JNIEnv* env,
jclass,
jobjectArray stringArray) {
JNI_METHOD_START
int size = env->GetArrayLength(stringArray);
for (int i = 0; i < size; i++) {
jstring string = (jstring)(env->GetObjectArrayElement(stringArray, i));
std::string path = jStringToCString(env, string);
if (!velox::filesystems::isPathSupportedByRegisteredFileSystems(path)) {
return false;
}
}
return true;
JNI_METHOD_END(false)
}

#ifdef __cplusplus
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopF
import org.apache.spark.sql.types.StructField

trait BackendSettingsApi {
def supportFileFormatRead(
def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = ValidationResult.succeeded
def supportWriteFilesExec(
format: FileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ trait BaseDataSource {

/** Returns the input file paths, used to validate the partition column path */
def getInputFilePathsInternal: Seq[String]

def getRootPathsInternal: Seq[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
import org.apache.gluten.extension.ValidationResult
Expand Down Expand Up @@ -59,6 +60,14 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
}
}

def getRootFilePaths: Seq[String] = {
if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) {
getRootPathsInternal
} else {
Seq.empty
}
}

/** Returns the file format properties. */
def getProperties: Map[String, String] = Map.empty

Expand Down Expand Up @@ -92,7 +101,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
}

val validationResult = BackendsApiManager.getSettings
.supportFileFormatRead(fileFormat, fields, getPartitionSchema.nonEmpty, getInputFilePaths)
.validateScan(
fileFormat,
fields,
getPartitionSchema.nonEmpty,
getRootFilePaths,
getInputFilePaths)
if (!validationResult.ok()) {
return validationResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.FileIndexUtil

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -131,6 +132,14 @@ abstract class BatchScanExecTransformerBase(
}
}

override def getRootPathsInternal: Seq[String] = {
scan match {
case fileScan: FileScan =>
FileIndexUtil.getRootPath(fileScan.fileIndex)
case _ => Seq.empty
}
}

override def doValidateInternal(): ValidationResult = {
if (pushedAggregate.nonEmpty) {
return ValidationResult.failed(s"Unsupported aggregation push down for $scan.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.FileIndexUtil

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression}
Expand Down Expand Up @@ -126,6 +127,10 @@ abstract class FileSourceScanExecTransformerBase(
relation.location.inputFiles.toSeq
}

override def getRootPathsInternal: Seq[String] = {
FileIndexUtil.getRootPath(relation.location)
}

override protected def doValidateInternal(): ValidationResult = {
if (
!metadataColumns.isEmpty && !BackendsApiManager.getSettings.supportNativeMetadataColumns()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.spark.sql.execution.datasources._

object FileIndexUtil {
def getRootPath(index: FileIndex): Seq[String] = {
index.rootPaths
.filter(_.isAbsolute)
.map(_.toString)
.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ case class HiveTableScanExecTransformer(
Seq.empty
}

// TODO: get root paths from hive table.
override def getRootPathsInternal: Seq[String] = Seq.empty

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(metrics)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class IcebergScanTransformer(

override def getInputFilePathsInternal: Seq[String] = Seq.empty

// TODO: get root paths from table.
override def getRootPathsInternal: Seq[String] = Seq.empty

override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan)

override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
Expand Down
13 changes: 13 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def forceParquetTimestampTypeScanFallbackEnabled: Boolean =
conf.getConf(VELOX_FORCE_PARQUET_TIMESTAMP_TYPE_SCAN_FALLBACK)

def scanFileSchemeValidationEnabled: Boolean =
conf.getConf(VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED)

// whether to use ColumnarShuffleManager
def isUseColumnarShuffleManager: Boolean =
conf
Expand Down Expand Up @@ -2015,6 +2018,16 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)

val VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED =
buildConf("spark.gluten.sql.scan.fileSchemeValidation.enabled")
.internal()
.doc(
"When true, enable file path scheme validation for scan. Validation will fail if" +
" file scheme is not supported by registered file systems, which will cause scan " +
" operator fall back.")
.booleanConf
.createWithDefault(true)

val COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED =
buildConf("spark.gluten.sql.columnar.cast.avg")
.internal()
Expand Down

0 comments on commit 4ef42ff

Please sign in to comment.