-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Databricks] Supporting OAuth & Serverless compute #127
base: main
Are you sure you want to change the base?
Changes from 4 commits
bd9ec95
14b9e2d
17caa0d
e5fa257
dd4bd04
4440303
811dacf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,7 @@ spark_connect_method.spark_method_databricks_connect <- function( | |
config, | ||
app_name, | ||
version = NULL, | ||
serverless = FALSE, | ||
hadoop_version, | ||
extensions, | ||
scala_version, | ||
|
@@ -71,22 +72,28 @@ spark_connect_method.spark_method_databricks_connect <- function( | |
method <- method[[1]] | ||
token <- databricks_token(token, fail = FALSE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on your comment on line 137, I think we should remove this line. And have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My thinking for leaving this was that users explicitly setting the
I was expecting hierarchy to be:
Where 1-4 are handled by |
||
cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID") | ||
cli_path <- Sys.getenv("DATABRICKS_CLI_PATH", "databricks") | ||
master <- databricks_host(master, fail = FALSE) | ||
if (host_sanitize && master != "") { | ||
master <- sanitize_host(master, silent) | ||
} | ||
|
||
# if serverless is TRUE, cluster_id is overruled (set to NULL) | ||
cluster_info <- NULL | ||
if (cluster_id != "" && master != "" && token != "") { | ||
cluster_info <- databricks_dbr_version_name( | ||
cluster_id = cluster_id, | ||
host = master, | ||
token = token, | ||
silent = silent | ||
) | ||
if (is.null(version)) { | ||
version <- cluster_info$version | ||
if (!serverless) { | ||
if (cluster_id != "" && master != "" && token != "") { | ||
cluster_info <- databricks_dbr_version_name( | ||
cluster_id = cluster_id, | ||
host = master, | ||
token = token, | ||
silent = silent | ||
) | ||
if (is.null(version)) { | ||
version <- cluster_info$version | ||
} | ||
} | ||
} else { | ||
cluster_id <- NULL | ||
} | ||
|
||
envname <- use_envname( | ||
|
@@ -102,34 +109,61 @@ spark_connect_method.spark_method_databricks_connect <- function( | |
return(invisible) | ||
} | ||
|
||
db <- import_check("databricks.connect", envname, silent) | ||
dbc <- import_check("databricks.connect", envname, silent) | ||
db_sdk <- import_check("databricks.sdk", envname, silent = TRUE) | ||
|
||
if (!is.null(cluster_info)) { | ||
msg <- "{.header Connecting to} {.emph '{cluster_info$name}'}" | ||
msg_done <- "{.header Connected to:} {.emph '{cluster_info$name}'}" | ||
master_label <- glue("{cluster_info$name} ({cluster_id})") | ||
} else { | ||
} else if (!serverless) { | ||
msg <- "{.header Connecting to} {.emph '{cluster_id}'}" | ||
msg_done <- "{.header Connected to:} '{.emph '{cluster_id}'}'" | ||
master_label <- glue("Databricks Connect - Cluster: {cluster_id}") | ||
} else if (serverless) { | ||
msg <- "{.header Connecting to} {.emph 'serverless'}" | ||
msg_done <- "{.header Connected to:} '{.emph 'serverless'}'" | ||
master_label <- glue("Databricks Connect - Cluster: serverless") | ||
} | ||
|
||
if (!silent) { | ||
cli_div(theme = cli_colors()) | ||
cli_progress_step(msg, msg_done) | ||
} | ||
|
||
remote_args <- list() | ||
if (master != "") remote_args$host <- master | ||
if (token != "") remote_args$token <- token | ||
if (cluster_id != "") remote_args$cluster_id <- cluster_id | ||
# sdk config | ||
conf_args <- list(host = master) | ||
# if token is found, propagate | ||
# otherwise trust in sdk to detect and do what it can? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, if we remove line 72, then this |
||
if (token != "") { | ||
conf_args$token <- token | ||
conf_args$auth_type <- "pat" | ||
} | ||
|
||
if (serverless) { | ||
conf_args$serverless_compute_id <- "auto" | ||
} else { | ||
conf_args$cluster_id <- cluster_id | ||
} | ||
|
||
databricks_session <- function(...) { | ||
user_agent <- build_user_agent() | ||
db$DatabricksSession$builder$remote(...)$userAgent(user_agent) | ||
sdk_config <- db_sdk$core$Config(!!!conf_args) | ||
|
||
# unsure if this iss needed anymore? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to remove this from here, specially since we can't use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is required, will do some testing without. |
||
if (!httr2:::is_hosted_session() && nchar(Sys.which(cli_path)) != 0) { | ||
# When on desktop, try using the Databricks CLI for auth. | ||
output <- suppressWarnings( | ||
system2( | ||
cli_path, | ||
c("auth", "login", "--host", master), | ||
stdout = TRUE, | ||
stderr = TRUE | ||
) | ||
) | ||
} | ||
|
||
conn <- exec(databricks_session, !!!remote_args) | ||
user_agent <- build_user_agent() | ||
conn <- dbc$DatabricksSession$builder$sdkConfig(sdk_config)$userAgent(user_agent) | ||
|
||
|
||
if (!silent) { | ||
cli_progress_done() | ||
|
@@ -141,6 +175,7 @@ spark_connect_method.spark_method_databricks_connect <- function( | |
master_label = master_label, | ||
con_class = "connect_databricks", | ||
cluster_id = cluster_id, | ||
serverless = serverless, | ||
method = method, | ||
config = config | ||
) | ||
|
@@ -151,6 +186,7 @@ initialize_connection <- function( | |
master_label, | ||
con_class, | ||
cluster_id = NULL, | ||
serverless = NULL, | ||
method = NULL, | ||
config = NULL) { | ||
warnings <- import("warnings") | ||
|
@@ -173,12 +209,17 @@ initialize_connection <- function( | |
"ignore", | ||
message = "Index.format is deprecated and will be removed in a future version" | ||
) | ||
|
||
assign("conn", conn, .GlobalEnv) | ||
|
||
session <- conn$getOrCreate() | ||
get_version <- try(session$version, silent = TRUE) | ||
if (inherits(get_version, "try-error")) databricks_dbr_error(get_version) | ||
session$conf$set("spark.sql.session.localRelationCacheThreshold", 1048576L) | ||
session$conf$set("spark.sql.execution.arrow.pyspark.enabled", "true") | ||
session$conf$set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false") | ||
if (!serverless) { | ||
session$conf$set("spark.sql.session.localRelationCacheThreshold", 1048576L) | ||
session$conf$set("spark.sql.execution.arrow.pyspark.enabled", "true") | ||
session$conf$set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false") | ||
} | ||
|
||
# do we need this `spark_context` object? | ||
spark_context <- list(spark_context = session) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be handled by SDK config component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, are we talking about this SDK? https://github.com/databricks/databricks-sdk-py/ And if so, can you point me to where it handles the RStudio token? I can't seem to find it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SDK won't detect the
.rs.api.getDatabricks*
but maybe theres a gap in my understanding, I thought connect would also write to a config file as well, which the SDK should pickup?