Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO-NOT-MERGE] Support SQL API mode #48930

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql

import java.net.URI
import java.nio.file.{Files, Paths}
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

Expand Down Expand Up @@ -595,17 +596,26 @@ object SparkSession extends api.BaseSparkSessionCompanion with Logging {
*/
private[sql] def withLocalConnectServer[T](f: => T): T = {
synchronized {
lazy val isAPIModeConnect = Option(System.getProperty("spark.api.mode"))
.exists(_.toLowerCase(Locale.ROOT) == "connect")
val remoteString = sparkOptions
.get("spark.remote")
.orElse(Option(System.getProperty("spark.remote"))) // Set from Spark Submit
.orElse(sys.env.get(SparkConnectClient.SPARK_REMOTE))
.orElse {
if (isAPIModeConnect) {
Option(System.getProperty("spark.master")).orElse(sys.env.get("SPARK_MASTER"))
} else {
None
}
}

val maybeConnectScript =
Option(System.getenv("SPARK_HOME")).map(Paths.get(_, "sbin", "start-connect-server.sh"))

if (server.isEmpty &&
remoteString.exists(_.startsWith("local")) &&
maybeConnectScript.exists(Files.exists(_))) {
maybeConnectScript.exists(Files.exists(_)) &&
(remoteString.exists(_.startsWith("local")) || isAPIModeConnect)) {
server = Some {
val args =
Seq(maybeConnectScript.get.toString, "--master", remoteString.get) ++ sparkOptions
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy
import java.io.File
import java.net.URI
import java.nio.file.Files
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -52,9 +53,13 @@ object PythonRunner {
// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))
lazy val isAPIModeClassic = sparkConf.getOption(
"spark.api.mode").exists(_.toLowerCase(Locale.ROOT) == "classic")
lazy val isAPIModeConnect = sparkConf.getOption(
"spark.api.mode").exists(_.toLowerCase(Locale.ROOT) == "connect")

var gatewayServer: Option[Py4JServer] = None
if (sparkConf.getOption("spark.remote").isEmpty) {
if (sparkConf.getOption("spark.remote").isEmpty || isAPIModeClassic) {
gatewayServer = Some(new Py4JServer(sparkConf))

val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.get.start() })
Expand All @@ -80,7 +85,7 @@ object PythonRunner {
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
val env = builder.environment()
if (sparkConf.getOption("spark.remote").nonEmpty) {
if (sparkConf.getOption("spark.remote").nonEmpty || isAPIModeConnect) {
// For non-local remote, pass configurations to environment variables so
// Spark Connect client sets them. For local remotes, they will be set
// via Py4J.
Expand All @@ -90,7 +95,11 @@ object PythonRunner {
env.put(s"PYSPARK_REMOTE_INIT_CONF_$idx", compact(render(group)))
}
}
sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url))
if (isAPIModeClassic) {
sparkConf.getOption("spark.master").foreach(url => env.put("MASTER", url))
} else {
sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url))
}
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,8 @@ protected boolean handle(String opt, String value) {
checkArgument(value != null, "Missing argument to %s", CONF);
String[] setConf = value.split("=", 2);
checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
if (setConf[0].equals("spark.remote")) {
if (setConf[0].equals("spark.remote") || (
setConf[0].equals("spark.api.mode") && setConf[1].equals("connect"))) {
isRemote = true;
}
conf.put(setConf[0], setConf[1]);
Expand Down
15 changes: 13 additions & 2 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,12 @@ def getOrCreate(self) -> "SparkSession":
if is_remote_only():
from pyspark.sql.connect.session import SparkSession as RemoteSparkSession

if "spark.master" in opts:
raise PySparkRuntimeError(
errorClass="CONNECT_URL_NOT_SET",
messageParameters={},
)

url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))

if url is None:
Expand All @@ -497,6 +503,7 @@ def getOrCreate(self) -> "SparkSession":
"SPARK_CONNECT_MODE_ENABLED" in os.environ
or "SPARK_REMOTE" in os.environ
or "spark.remote" in opts
or opts["spark.api.mode"] == "connect"
):
with SparkContext._lock:
from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
Expand All @@ -505,15 +512,19 @@ def getOrCreate(self) -> "SparkSession":
SparkContext._active_spark_context is None
and SparkSession._instantiatedSession is None
):
url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))
is_api_mode_connect = opts["spark.api.mode"].lower() == "connect"
if is_api_mode_connect:
url = opts.get("spark.master", os.environ.get("MASTER"))
else:
url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))

if url is None:
raise PySparkRuntimeError(
errorClass="CONNECT_URL_NOT_SET",
messageParameters={},
)

if url.startswith("local"):
if url.startswith("local") or is_api_mode_connect:
os.environ["SPARK_LOCAL_REMOTE"] = "1"
RemoteSparkSession._start_connect_server(url, opts)
url = "sc://localhost"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,17 @@ object SparkConnectClient {
* Configure the builder using the env SPARK_REMOTE environment variable.
*/
def loadFromEnvironment(): Builder = {
lazy val isAPIModeConnect = Option(System.getProperty("spark.api.mode"))
.exists(_.toLowerCase(Locale.ROOT) == "connect")
Option(System.getProperty("spark.remote")) // Set from Spark Submit
.orElse(sys.env.get(SparkConnectClient.SPARK_REMOTE))
.orElse {
if (isAPIModeConnect) {
Option(System.getProperty("spark.master")).orElse(sys.env.get("SPARK_MASTER"))
} else {
None
}
}
.foreach(connectionString)
this
}
Expand Down