Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Databricks] Supporting OAuth & Serverless compute #127

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
* No longer install 'rpy2' by default. It will prompt user for installation
the first time `spark_apply()` is called (#125)

* Adding support for Databricks serverless interactive compute (#127)

* Extended authentication method support for Databricks by deferring to SDK
(#127)

# pysparklyr 0.1.5

### Improvements
Expand Down
12 changes: 6 additions & 6 deletions R/databricks-utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,17 @@ databricks_token <- function(token = NULL, fail = FALSE) {
}
}
}
# Checks for OAuth Databricks token inside the RStudio API
if (is.null(token) && exists(".rs.api.getDatabricksToken")) {
getDatabricksToken <- get(".rs.api.getDatabricksToken")
token <- set_names(getDatabricksToken(databricks_host()), "oauth")
}
# # Checks for OAuth Databricks token inside the RStudio API
# if (is.null(token) && exists(".rs.api.getDatabricksToken")) {
# getDatabricksToken <- get(".rs.api.getDatabricksToken")
# token <- set_names(getDatabricksToken(databricks_host()), "oauth")
# }
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be handled by SDK config component.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, are we talking about this SDK? https://github.com/databricks/databricks-sdk-py/ And if so, can you point me to where it handles the RStudio token? I can't seem to find it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SDK won't detect the .rs.api.getDatabricks* but maybe theres a gap in my understanding, I thought connect would also write to a config file as well, which the SDK should pickup?

if (is.null(token)) {
if (fail) {
rlang::abort(c(
paste0(
"No authentication token was identified: \n",
" - No 'DATABRICKS_TOKEN' environment variable found \n",
" - No Databricks OAuth token found \n",
" - Not passed as a function argument"
),
"Please add your Token to 'DATABRICKS_TOKEN' inside your .Renviron file."
Expand Down Expand Up @@ -249,3 +248,4 @@ sanitize_host <- function(url, silent = FALSE) {
}
ret
}

3 changes: 2 additions & 1 deletion R/python-install.R
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ install_environment <- function(
"PyArrow",
"grpcio",
"google-api-python-client",
"grpcio_status"
"grpcio_status",
"databricks-sdk"
)

if (add_torch && install_ml) {
Expand Down
83 changes: 61 additions & 22 deletions R/sparklyr-spark-connect.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ spark_connect_method.spark_method_databricks_connect <- function(
config,
app_name,
version = NULL,
serverless = FALSE,
hadoop_version,
extensions,
scala_version,
Expand All @@ -71,22 +72,28 @@ spark_connect_method.spark_method_databricks_connect <- function(
method <- method[[1]]
token <- databricks_token(token, fail = FALSE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your comment on line 137, I think we should remove this line. And have token only populated when the user passes it as an argument in the spark_connect() call

Copy link
Author

@zacdav-db zacdav-db Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking for leaving this was that users explicitly setting the DATABRICKS_TOKEN and DATABRICKS_HOST vars should have those respected as it were set explicitly. The Databricks Python SDK won't detect those when its done from R.

databricks_token function also looks for CONNECT_DATABRICKS_TOKEN so i think its probably important to leave that intact?

I was expecting hierarchy to be:

  1. Explicit token
  2. DATABRICKS_TOKEN
  3. CONNECT_DATABRICKS_TOKEN
  4. .rs.api.getDatabricksToken(host)
  5. Python SDK explicit setting of profile
  6. Python SDK detection of DEFAULT profile

Where 1-4 are handled by databricks_token

cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID")
cli_path <- Sys.getenv("DATABRICKS_CLI_PATH", "databricks")
master <- databricks_host(master, fail = FALSE)
if (host_sanitize && master != "") {
master <- sanitize_host(master, silent)
}

# if serverless is TRUE, cluster_id is overruled (set to NULL)
cluster_info <- NULL
if (cluster_id != "" && master != "" && token != "") {
cluster_info <- databricks_dbr_version_name(
cluster_id = cluster_id,
host = master,
token = token,
silent = silent
)
if (is.null(version)) {
version <- cluster_info$version
if (!serverless) {
if (cluster_id != "" && master != "" && token != "") {
cluster_info <- databricks_dbr_version_name(
cluster_id = cluster_id,
host = master,
token = token,
silent = silent
)
if (is.null(version)) {
version <- cluster_info$version
}
}
} else {
cluster_id <- NULL
}

envname <- use_envname(
Expand All @@ -102,34 +109,61 @@ spark_connect_method.spark_method_databricks_connect <- function(
return(invisible)
}

db <- import_check("databricks.connect", envname, silent)
dbc <- import_check("databricks.connect", envname, silent)
db_sdk <- import_check("databricks.sdk", envname, silent = TRUE)

if (!is.null(cluster_info)) {
msg <- "{.header Connecting to} {.emph '{cluster_info$name}'}"
msg_done <- "{.header Connected to:} {.emph '{cluster_info$name}'}"
master_label <- glue("{cluster_info$name} ({cluster_id})")
} else {
} else if (!serverless) {
msg <- "{.header Connecting to} {.emph '{cluster_id}'}"
msg_done <- "{.header Connected to:} '{.emph '{cluster_id}'}'"
master_label <- glue("Databricks Connect - Cluster: {cluster_id}")
} else if (serverless) {
msg <- "{.header Connecting to} {.emph 'serverless'}"
msg_done <- "{.header Connected to:} '{.emph 'serverless'}'"
master_label <- glue("Databricks Connect - Cluster: serverless")
}

if (!silent) {
cli_div(theme = cli_colors())
cli_progress_step(msg, msg_done)
}

remote_args <- list()
if (master != "") remote_args$host <- master
if (token != "") remote_args$token <- token
if (cluster_id != "") remote_args$cluster_id <- cluster_id
# sdk config
conf_args <- list(host = master)
# if token is found, propagate
# otherwise trust in sdk to detect and do what it can?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we remove line 72, then this if makes sense to leave

if (token != "") {
conf_args$token <- token
conf_args$auth_type <- "pat"
}

databricks_session <- function(...) {
user_agent <- build_user_agent()
db$DatabricksSession$builder$remote(...)$userAgent(user_agent)
if (serverless) {
conf_args$serverless_compute_id <- "auto"
} else {
conf_args$cluster_id <- cluster_id
}

conn <- exec(databricks_session, !!!remote_args)
sdk_config <- db_sdk$core$Config(!!!conf_args)

# unsure if this iss needed anymore?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to remove this from here, specially since we can't use httr2:::is_hosted_session() since a ::: is not allowed. Do you think this is important for the package to do if the user is on desktop? If so, what do you think about isolating it in its own exported function? Maybe pysparklyr::databricks_desktop_login()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is required, will do some testing without.

if (!httr2:::is_hosted_session() && nchar(Sys.which(cli_path)) != 0) {
# When on desktop, try using the Databricks CLI for auth.
output <- suppressWarnings(
system2(
cli_path,
c("auth", "login", "--host", master),
stdout = TRUE,
stderr = TRUE
)
)
}

user_agent <- build_user_agent()
conn <- dbc$DatabricksSession$builder$sdkConfig(sdk_config)$userAgent(user_agent)


if (!silent) {
cli_progress_done()
Expand All @@ -141,6 +175,7 @@ spark_connect_method.spark_method_databricks_connect <- function(
master_label = master_label,
con_class = "connect_databricks",
cluster_id = cluster_id,
serverless = serverless,
method = method,
config = config
)
Expand All @@ -151,6 +186,7 @@ initialize_connection <- function(
master_label,
con_class,
cluster_id = NULL,
serverless = NULL,
method = NULL,
config = NULL) {
warnings <- import("warnings")
Expand All @@ -173,12 +209,15 @@ initialize_connection <- function(
"ignore",
message = "Index.format is deprecated and will be removed in a future version"
)

session <- conn$getOrCreate()
get_version <- try(session$version, silent = TRUE)
if (inherits(get_version, "try-error")) databricks_dbr_error(get_version)
session$conf$set("spark.sql.session.localRelationCacheThreshold", 1048576L)
session$conf$set("spark.sql.execution.arrow.pyspark.enabled", "true")
session$conf$set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")
if (!serverless) {
session$conf$set("spark.sql.session.localRelationCacheThreshold", 1048576L)
session$conf$set("spark.sql.execution.arrow.pyspark.enabled", "true")
session$conf$set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")
}

# do we need this `spark_context` object?
spark_context <- list(spark_context = session)
Expand Down
Loading