diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 2fa2e4402b79..6760d29561a3 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -44,7 +44,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import java.lang.{Long => JLong} import java.net.URI @@ -134,8 +133,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo = { + properties: Map[String, String]): SplitInfo = { partition match { case p: GlutenMergeTreePartition => ExtensionTableBuilder diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index af67b01f49f0..dfdf02d45564 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(1)(plans.head.getSplitInfos(null).size) + assertResult(1)(plans.head.getSplitInfos.size) } } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index 0959832949ac..186078c18dd1 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1801,7 +1801,7 @@ class GlutenClickHouseMergeTreeWriteSuite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(conf._2)(plans.head.getSplitInfos(null).size) + assertResult(conf._2)(plans.head.getSplitInfos.size) } } }) @@ -1910,7 +1910,7 @@ class GlutenClickHouseMergeTreeWriteSuite case f: BasicScanExecTransformer => f } assertResult(2)(scanExec.size) - assertResult(conf._2)(scanExec(1).getSplitInfos(null).size) + assertResult(conf._2)(scanExec(1).getSplitInfos.size) } } }) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 320d1f366c23..061daaac0fad 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -39,9 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, SparkDirectoryUtil} - -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil} import java.lang.{Long => JLong} import java.nio.charset.StandardCharsets @@ -57,8 +55,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo = { + properties: Map[String, String]): SplitInfo = { partition match { case f: FilePartition => val ( @@ -69,7 +66,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { modificationTimes, partitionColumns, metadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf) + constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) LocalFilesBuilder.makeLocalFiles( @@ -112,8 +109,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { private def constructSplitInfo( schema: StructType, files: Array[PartitionedFile], - metadataColumnNames: Seq[String], - serializableHadoopConf: SerializableConfiguration) = { + metadataColumnNames: Seq[String]) = { val paths = new JArrayList[String]() val starts = new JArrayList[JLong] val lengths = new JArrayList[JLong]() @@ -125,15 +121,9 @@ class VeloxIteratorApi extends IteratorApi with Logging { file => // The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded // path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder - var filePath = file.filePath.toString - if (filePath.startsWith("viewfs")) { - val viewPath = new Path(filePath) - val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value) - filePath = viewFileSystem.resolvePath(viewPath).toString - } paths.add( GlutenURLDecoder - .decode(filePath, StandardCharsets.UTF_8.name())) + .decode(file.filePath.toString, StandardCharsets.UTF_8.name())) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) val (fileSize, modificationTime) = diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala index 16cd18e41a05..a5e638d9be28 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala @@ -37,7 +37,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader { loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false) loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.10", "libre2.so", false) loader.loadAndCreateLink("libzstd.so.1", "libzstd.so", false) loader.loadAndCreateLink("liblz4.so.1", "liblz4.so", false) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala index 0a75c30c22ff..5d8c18b8bbf1 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala @@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader { loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false) loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.0", "libre2.so", false) loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala index 50f9fe4aaadc..694cf4c622f5 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala @@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos9 extends SharedLibraryLoader { loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false) loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.9", "libre2.so", false) loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala index 06c065ba2883..6927f2539ef0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala @@ -44,6 +44,6 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader { loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala index 8018995328fe..ce01f4399f4f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala @@ -50,6 +50,6 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader { loader.loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false) loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala index d1f21a0013fb..79c0518ea3b9 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala @@ -57,7 +57,7 @@ class SharedLibraryLoaderUbuntu2004 extends SharedLibraryLoader { loader.loadAndCreateLink("libicudata.so.66", "libicudata.so", false) loader.loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false) loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false) - loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.5", "libre2.so", false) loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) loader.loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala index 3cf4d30237ac..a5d99ede42b9 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala @@ -42,7 +42,7 @@ class SharedLibraryLoaderUbuntu2204 extends SharedLibraryLoader { loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false) - loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.9", "libre2.so", false) loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) loader.loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false) diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 47e4ff8f1382..7eaf5f1e617e 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -109,6 +109,28 @@ macro(add_duckdb) endif() endmacro() +macro(find_libhdfs3) + find_package(libhdfs3 CONFIG) + if(libhdfs3_FOUND AND TARGET HDFS::hdfs3) + set(LIBHDFS3_LIBRARY HDFS::hdfs3) + else() + find_path(libhdfs3_INCLUDE_DIR hdfs/hdfs.h) + set(CMAKE_FIND_LIBRARY_SUFFIXES ".so") + find_library(libhdfs3_LIBRARY NAMES hdfs3) + find_package_handle_standard_args(libhdfs3 DEFAULT_MSG libhdfs3_INCLUDE_DIR + libhdfs3_LIBRARY) + add_library(HDFS::hdfs3 SHARED IMPORTED) + set_target_properties( + HDFS::hdfs3 + PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${libhdfs3_INCLUDE_DIR}" + IMPORTED_LOCATION "${libhdfs3_LIBRARY}") + endif() + + if(NOT libhdfs3_FOUND) + message(FATAL_ERROR "LIBHDFS3 Library Not Found") + endif() +endmacro() + macro(find_re2) find_package(re2 CONFIG) if(re2_FOUND AND TARGET re2::re2) @@ -187,6 +209,10 @@ set(VELOX_SRCS utils/Common.cc utils/VeloxBatchResizer.cc) +if(ENABLE_HDFS) + list(APPEND VELOX_SRCS utils/HdfsUtils.cc) +endif() + if(ENABLE_S3) find_package(ZLIB) endif() @@ -310,6 +336,8 @@ endif() if(ENABLE_HDFS) add_definitions(-DENABLE_HDFS) + find_libhdfs3() + target_link_libraries(velox PUBLIC HDFS::hdfs3) endif() if(ENABLE_S3) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 0e1a9bed7b09..ea5bd59794f1 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -22,6 +22,10 @@ #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/PlanNodeStats.h" +#ifdef ENABLE_HDFS +#include "utils/HdfsUtils.h" +#endif + using namespace facebook; namespace gluten { @@ -66,6 +70,9 @@ WholeStageResultIterator::WholeStageResultIterator( scanNodeIds_(scanNodeIds), scanInfos_(scanInfos), streamIds_(streamIds) { +#ifdef ENABLE_HDFS + gluten::updateHdfsTokens(veloxCfg_.get()); +#endif spillStrategy_ = veloxCfg_->get(kSpillStrategy, kSpillStrategyDefaultValue); auto spillThreadNum = veloxCfg_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); if (spillThreadNum > 0) { diff --git a/cpp/velox/utils/HdfsUtils.h b/cpp/velox/utils/HdfsUtils.h new file mode 100644 index 000000000000..2e07d7ddf41b --- /dev/null +++ b/cpp/velox/utils/HdfsUtils.h @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +namespace gluten { +void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg); +} diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 95811a8c26e8..87d425b5d0f9 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,8 +16,8 @@ set -exu -VELOX_REPO=https://github.com/JkSelf/velox.git -VELOX_BRANCH=libhdfs-test +VELOX_REPO=https://github.com/oap-project/velox.git +VELOX_BRANCH=2024_10_24 VELOX_HOME="" OS=`uname -s` diff --git a/ep/build-velox/src/modify_velox.patch b/ep/build-velox/src/modify_velox.patch index 3d45ff4b48a3..5c1aab248981 100644 --- a/ep/build-velox/src/modify_velox.patch +++ b/ep/build-velox/src/modify_velox.patch @@ -74,7 +74,27 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f7cbc92f..52adb1250 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt -@@ -386,7 +391,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES}) +@@ -242,10 +242,15 @@ if(VELOX_ENABLE_ABFS) + endif() + + if(VELOX_ENABLE_HDFS) +- find_library( +- LIBHDFS3 +- NAMES libhdfs3.so libhdfs3.dylib +- HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) ++ find_package(libhdfs3) ++ if(libhdfs3_FOUND AND TARGET HDFS::hdfs3) ++ set(LIBHDFS3 HDFS::hdfs3) ++ else() ++ find_library( ++ LIBHDFS3 ++ NAMES libhdfs3.so libhdfs3.dylib ++ HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) ++ endif() + add_definitions(-DVELOX_ENABLE_HDFS3) + endif() + +@@ -385,7 +390,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES}) # for reference. find_package(range-v3) set_source(gflags) diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 10d24c317cc1..1cbeb52a9213 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil @@ -59,9 +58,7 @@ case class IcebergScanTransformer( override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan) - override def getSplitInfosFromPartitions( - partitions: Seq[InputPartition], - serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { + override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions( scan, keyGroupedPartitioning, diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala index 5ebf8883c688..bb604f534fbe 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala @@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3) case _ => // do nothing } checkLengthAndPlan(df, 7) @@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3) case _ => // do nothing } checkLengthAndPlan(df, 7) @@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 1) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1) case _ => // do nothing } checkLengthAndPlan(df, 5) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 69c9d37334de..11211bd0da91 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration trait IteratorApi { @@ -38,8 +37,7 @@ trait IteratorApi { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo + properties: Map[String, String]): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index d7b824b397e5..912b93079f4a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration import com.google.protobuf.StringValue import io.substrait.proto.NamedStruct @@ -63,13 +62,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getProperties: Map[String, String] = Map.empty /** Returns the split infos that will be processed by the underlying native engine. */ - def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { - getSplitInfosFromPartitions(getPartitions, serializableHadoopConf) + def getSplitInfos: Seq[SplitInfo] = { + getSplitInfosFromPartitions(getPartitions) } - def getSplitInfosFromPartitions( - partitions: Seq[InputPartition], - serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { + def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { partitions.map( BackendsApiManager.getIteratorApiInstance .genSplitInfo( @@ -77,8 +74,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource getPartitionSchema, fileFormat, getMetadataColumns.map(_.name), - getProperties, - serializableHadoopConf)) + getProperties)) } override protected def doValidateInternal(): ValidationResult = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index e1dfd3f5704a..7bd84e09d136 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists @@ -130,8 +129,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext) val sparkConf: SparkConf = sparkContext.getConf - val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( - sparkContext.hadoopConfiguration) val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo @transient @@ -292,16 +289,12 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f */ val allScanPartitions = basicScanExecTransformers.map(_.getPartitions) val allScanSplitInfos = - getSplitInfosFromPartitions( - basicScanExecTransformers, - allScanPartitions, - serializableHadoopConf) + getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions) val inputPartitions = BackendsApiManager.getIteratorApiInstance.genPartitions( wsCtx, allScanSplitInfos, basicScanExecTransformers) - val rdd = new GlutenWholeStageColumnarRDD( sparkContext, inputPartitions, @@ -387,8 +380,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f private def getSplitInfosFromPartitions( basicScanExecTransformers: Seq[BasicScanExecTransformer], - allScanPartitions: Seq[Seq[InputPartition]], - serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] = { + allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = { // If these are two scan transformers, they must have same partitions, // otherwise, exchange will be inserted. We should combine the two scan // transformers' partitions with same index, and set them together in @@ -406,8 +398,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n]) val allScanSplitInfos = allScanPartitions.zip(basicScanExecTransformers).map { - case (partition, transformer) => - transformer.getSplitInfosFromPartitions(partition, serializableHadoopConf) + case (partition, transformer) => transformer.getSplitInfosFromPartitions(partition) } val partitionLength = allScanSplitInfos.head.size if (allScanSplitInfos.exists(_.size != partitionLength)) {