diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 7519580b9cb74..e875641c4cee8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration import java.lang.{Long => JLong} import java.net.URI @@ -125,7 +126,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo = { + properties: Map[String, String], + serializableHadoopConf: SerializableConfiguration): SplitInfo = { partition match { case p: GlutenMergeTreePartition => val partLists = new JArrayList[String]() diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 6a473cc54f7ec..a59931e699eb2 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -760,7 +760,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(1)(plans.head.getSplitInfos.size) + assertResult(1)(plans.head.getSplitInfos(null).size) } } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala index 3b7606daac6b5..82bed316766f7 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1798,7 +1798,7 @@ class GlutenClickHouseMergeTreeWriteSuite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(conf._2)(plans.head.getSplitInfos.size) + assertResult(conf._2)(plans.head.getSplitInfos(null).size) } } }) @@ -1908,7 +1908,7 @@ class GlutenClickHouseMergeTreeWriteSuite case f: BasicScanExecTransformer => f } assertResult(2)(scanExec.size) - assertResult(conf._2)(scanExec(1).getSplitInfos.size) + assertResult(conf._2)(scanExec(1).getSplitInfos(null).size) } } }) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index d8355e1c419fb..1baa7c32db817 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -39,7 +39,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.ExecutorManager +import org.apache.spark.util.{ExecutorManager, SerializableConfiguration} + +import org.apache.hadoop.fs.{FileSystem, Path} import java.lang.{Long => JLong} import java.nio.charset.StandardCharsets @@ -55,7 +57,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo = { + properties: Map[String, String], + serializableHadoopConf: SerializableConfiguration): SplitInfo = { partition match { case f: FilePartition => val ( @@ -66,7 +69,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { modificationTimes, partitionColumns, metadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames) + constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) LocalFilesBuilder.makeLocalFiles( @@ -109,7 +112,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { private def constructSplitInfo( schema: StructType, files: Array[PartitionedFile], - metadataColumnNames: Seq[String]) = { + metadataColumnNames: Seq[String], + serializableHadoopConf: SerializableConfiguration) = { val paths = new JArrayList[String]() val starts = new JArrayList[JLong] val lengths = new JArrayList[JLong]() @@ -121,9 +125,15 @@ class VeloxIteratorApi extends IteratorApi with Logging { file => // The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded // path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder + var filePath = file.filePath.toString + if (filePath.startsWith("viewfs")) { + val viewPath = new Path(filePath) + val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value) + filePath = viewFileSystem.resolvePath(viewPath).toString + } paths.add( GlutenURLDecoder - .decode(file.filePath.toString, StandardCharsets.UTF_8.name())) + .decode(filePath, StandardCharsets.UTF_8.name())) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) val (fileSize, modificationTime) = diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala index 47ed2c47cbb5d..d77bb145e1497 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala @@ -36,7 +36,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader { .loadAndCreateLink("libntlm.so.0", "libntlm.so", false) .loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) .loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - .loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) + .loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) .loadAndCreateLink("libre2.so.10", "libre2.so", false) .loadAndCreateLink("libzstd.so.1", "libzstd.so", false) .loadAndCreateLink("liblz4.so.1", "liblz4.so", false) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala index c1d3bf2e26cb7..cf7e01d329fd8 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala @@ -41,7 +41,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader { .loadAndCreateLink("libntlm.so.0", "libntlm.so", false) .loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) .loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - .loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) + .loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) .loadAndCreateLink("libre2.so.0", "libre2.so", false) .loadAndCreateLink("libsodium.so.23", "libsodium.so", false) .commit() diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala index ca7d1d22d9840..514d84ad0b53d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala @@ -46,7 +46,7 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader { .loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) .loadAndCreateLink("libcurl.so.4", "libcurl.so", false) .loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - .loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) + .loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) .commit() } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala index 128c8eaa2aef2..abd82f4bdbf89 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala @@ -52,7 +52,7 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader { .loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false) .loadAndCreateLink("libcurl.so.4", "libcurl.so", false) .loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) - .loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) + .loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) .commit() } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala index 18f2e6cfbeb32..e0985e11589ba 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala @@ -59,7 +59,7 @@ class SharedLibraryLoaderUbuntu2004 extends SharedLibraryLoader { .loadAndCreateLink("libicudata.so.66", "libicudata.so", false) .loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false) .loadAndCreateLink("libxml2.so.2", "libxml2.so", false) - .loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) + .loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) .loadAndCreateLink("libre2.so.5", "libre2.so", false) .loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) .loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala index b23105b7dce05..58569f125f393 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala @@ -44,7 +44,7 @@ class SharedLibraryLoaderUbuntu2204 extends SharedLibraryLoader { .loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) .loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) .loadAndCreateLink("libxml2.so.2", "libxml2.so", false) - .loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) + .loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false) .loadAndCreateLink("libre2.so.9", "libre2.so", false) .loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) .loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false) diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 1952651338154..271f1d24636e3 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -109,28 +109,6 @@ macro(add_duckdb) endif() endmacro() -macro(find_libhdfs3) - find_package(libhdfs3 CONFIG) - if(libhdfs3_FOUND AND TARGET HDFS::hdfs3) - set(LIBHDFS3_LIBRARY HDFS::hdfs3) - else() - find_path(libhdfs3_INCLUDE_DIR hdfs/hdfs.h) - set(CMAKE_FIND_LIBRARY_SUFFIXES ".so") - find_library(libhdfs3_LIBRARY NAMES hdfs3) - find_package_handle_standard_args(libhdfs3 DEFAULT_MSG libhdfs3_INCLUDE_DIR - libhdfs3_LIBRARY) - add_library(HDFS::hdfs3 SHARED IMPORTED) - set_target_properties( - HDFS::hdfs3 - PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${libhdfs3_INCLUDE_DIR}" - IMPORTED_LOCATION "${libhdfs3_LIBRARY}") - endif() - - if(NOT libhdfs3_FOUND) - message(FATAL_ERROR "LIBHDFS3 Library Not Found") - endif() -endmacro() - macro(find_re2) find_package(re2 CONFIG) if(re2_FOUND AND TARGET re2::re2) @@ -210,10 +188,6 @@ set(VELOX_SRCS utils/Common.cc utils/VeloxBatchResizer.cc) -if(ENABLE_HDFS) - list(APPEND VELOX_SRCS utils/HdfsUtils.cc) -endif() - if(ENABLE_S3) find_package(ZLIB) endif() @@ -331,8 +305,6 @@ endif() if(ENABLE_HDFS) add_definitions(-DENABLE_HDFS) - find_libhdfs3() - target_link_libraries(velox PUBLIC HDFS::hdfs3) endif() if(ENABLE_S3) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index eb700c6489ece..5b6ba8679420d 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -22,9 +22,9 @@ #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/PlanNodeStats.h" -#ifdef ENABLE_HDFS -#include "utils/HdfsUtils.h" -#endif +// #ifdef ENABLE_HDFS +// #include "utils/HdfsUtils.h" +// #endif using namespace facebook; @@ -68,9 +68,9 @@ WholeStageResultIterator::WholeStageResultIterator( scanNodeIds_(scanNodeIds), scanInfos_(scanInfos), streamIds_(streamIds) { -#ifdef ENABLE_HDFS - gluten::updateHdfsTokens(veloxCfg_.get()); -#endif + // #ifdef ENABLE_HDFS + // gluten::updateHdfsTokens(veloxCfg_.get()); + // #endif spillStrategy_ = veloxCfg_->get(kSpillStrategy, kSpillStrategyDefaultValue); auto spillThreadNum = veloxCfg_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); if (spillThreadNum > 0) { diff --git a/cpp/velox/utils/HdfsUtils.cc b/cpp/velox/utils/HdfsUtils.cc deleted file mode 100644 index a912c04eee7e7..0000000000000 --- a/cpp/velox/utils/HdfsUtils.cc +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "HdfsUtils.h" -#include -#include "config/GlutenConfig.h" -#include "utils/exception.h" - -namespace gluten { - -namespace { -struct Credential { - const std::string userName; - const std::string allTokens; - - bool operator==(const Credential& rhs) const { - return userName == rhs.userName && allTokens == rhs.allTokens; - } - bool operator!=(const Credential& rhs) const { - return !(rhs == *this); - } -}; -} // namespace - -void updateHdfsTokens(const facebook::velox::Config* veloxCfg) { - static std::mutex mtx; - std::lock_guard lock{mtx}; - - static std::optional activeCredential{std::nullopt}; - - const auto& newUserName = veloxCfg->get(gluten::kUGIUserName); - const auto& newAllTokens = veloxCfg->get(gluten::kUGITokens); - - if (!newUserName.hasValue() || !newAllTokens.hasValue()) { - return; - } - - Credential newCredential{newUserName.value(), newAllTokens.value()}; - - if (activeCredential.has_value() && activeCredential.value() == newCredential) { - // Do nothing if the credential is the same with before. - return; - } - - hdfsSetDefautUserName(newCredential.userName.c_str()); - std::vector tokens; - folly::split('\0', newCredential.allTokens, tokens); - for (auto& token : tokens) - hdfsSetTokenForDefaultUser(token.data()); - activeCredential.emplace(newCredential); -} -} // namespace gluten diff --git a/cpp/velox/utils/HdfsUtils.h b/cpp/velox/utils/HdfsUtils.h deleted file mode 100644 index cd017f250ad22..0000000000000 --- a/cpp/velox/utils/HdfsUtils.h +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -namespace gluten { -void updateHdfsTokens(const facebook::velox::Config* veloxCfg); -} diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index b65dfda1c3d85..45b5786ef75cb 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,8 +16,8 @@ set -exu -VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2024_08_11 +VELOX_REPO=https://github.com/JkSelf/velox.git +VELOX_BRANCH=libhdfs-replace VELOX_HOME="" OS=`uname -s` diff --git a/ep/build-velox/src/modify_velox.patch b/ep/build-velox/src/modify_velox.patch index c710ff5454525..647aca3c7cde7 100644 --- a/ep/build-velox/src/modify_velox.patch +++ b/ep/build-velox/src/modify_velox.patch @@ -99,27 +99,7 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt index 2dc95f972..391485879 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt -@@ -236,10 +236,15 @@ if(VELOX_ENABLE_ABFS) - endif() - - if(VELOX_ENABLE_HDFS) -- find_library( -- LIBHDFS3 -- NAMES libhdfs3.so libhdfs3.dylib -- HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) -+ find_package(libhdfs3) -+ if(libhdfs3_FOUND AND TARGET HDFS::hdfs3) -+ set(LIBHDFS3 HDFS::hdfs3) -+ else() -+ find_library( -+ LIBHDFS3 -+ NAMES libhdfs3.so libhdfs3.dylib -+ HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) -+ endif() - add_definitions(-DVELOX_ENABLE_HDFS3) - endif() - -@@ -380,7 +385,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES}) +@@ -386,7 +391,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES}) # for reference. find_package(range-v3) set_source(gflags) diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index b780649731230..8c11cc29ac17a 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration trait IteratorApi { @@ -37,7 +38,8 @@ trait IteratorApi { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo + properties: Map[String, String], + serializableHadoopConf: SerializableConfiguration): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index b7953b3acab69..a3e6dc6945e96 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration import com.google.protobuf.StringValue import io.substrait.proto.NamedStruct @@ -72,11 +73,13 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getProperties: Map[String, String] = Map.empty /** Returns the split infos that will be processed by the underlying native engine. */ - def getSplitInfos: Seq[SplitInfo] = { - getSplitInfosFromPartitions(getPartitions) + def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { + getSplitInfosFromPartitions(getPartitions, serializableHadoopConf) } - def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { + def getSplitInfosFromPartitions( + partitions: Seq[InputPartition], + serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { partitions.map( BackendsApiManager.getIteratorApiInstance .genSplitInfo( @@ -84,7 +87,8 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource getPartitionSchema, fileFormat, getMetadataColumns.map(_.name), - getProperties)) + getProperties, + serializableHadoopConf)) } override protected def doValidateInternal(): ValidationResult = { diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 78132c08c7823..c0c928c2025ee 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists @@ -127,6 +128,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext) val sparkConf: SparkConf = sparkContext.getConf + val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( + sparkContext.hadoopConfiguration) val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo val substraitPlanLogLevel: String = GlutenConfig.getConf.substraitPlanLogLevel @@ -289,12 +292,16 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f */ val allScanPartitions = basicScanExecTransformers.map(_.getPartitions) val allScanSplitInfos = - getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions) + getSplitInfosFromPartitions( + basicScanExecTransformers, + allScanPartitions, + serializableHadoopConf) val inputPartitions = BackendsApiManager.getIteratorApiInstance.genPartitions( wsCtx, allScanSplitInfos, basicScanExecTransformers) + val rdd = new GlutenWholeStageColumnarRDD( sparkContext, inputPartitions, @@ -369,7 +376,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f private def getSplitInfosFromPartitions( basicScanExecTransformers: Seq[BasicScanExecTransformer], - allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = { + allScanPartitions: Seq[Seq[InputPartition]], + serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] = { // If these are two scan transformers, they must have same partitions, // otherwise, exchange will be inserted. We should combine the two scan // transformers' partitions with same index, and set them together in @@ -387,7 +395,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n]) val allScanSplitInfos = allScanPartitions.zip(basicScanExecTransformers).map { - case (partition, transformer) => transformer.getSplitInfosFromPartitions(partition) + case (partition, transformer) => + transformer.getSplitInfosFromPartitions(partition, serializableHadoopConf) } val partitionLength = allScanSplitInfos.head.size if (allScanSplitInfos.exists(_.size != partitionLength)) { diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 9fb8521d9df5b..64c8591c80e76 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil @@ -59,7 +60,9 @@ case class IcebergScanTransformer( override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan) - override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { + override def getSplitInfosFromPartitions( + partitions: Seq[InputPartition], + serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions( scan, keyGroupedPartitioning, diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala index bb604f534fbeb..5ebf8883c6887 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala @@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3) case _ => // do nothing } checkLengthAndPlan(df, 7) @@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3) case _ => // do nothing } checkLengthAndPlan(df, 7) @@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 1) case _ => // do nothing } checkLengthAndPlan(df, 5)