From bd9ec95415330fdc996651139a6214da50634aef Mon Sep 17 00:00:00 2001 From: Zac Davies Date: Fri, 27 Sep 2024 17:39:23 +1000 Subject: [PATCH 1/7] investigating tweaks to support all auth methods. --- R/databricks-utils.R | 12 +++++------ R/python-install.R | 3 ++- R/sparklyr-spark-connect.R | 42 ++++++++++++++++++++++++++++++-------- 3 files changed, 41 insertions(+), 16 deletions(-) diff --git a/R/databricks-utils.R b/R/databricks-utils.R index 57eb233..a549c4d 100644 --- a/R/databricks-utils.R +++ b/R/databricks-utils.R @@ -42,18 +42,17 @@ databricks_token <- function(token = NULL, fail = FALSE) { } } } - # Checks for OAuth Databricks token inside the RStudio API - if (is.null(token) && exists(".rs.api.getDatabricksToken")) { - getDatabricksToken <- get(".rs.api.getDatabricksToken") - token <- set_names(getDatabricksToken(databricks_host()), "oauth") - } + # # Checks for OAuth Databricks token inside the RStudio API + # if (is.null(token) && exists(".rs.api.getDatabricksToken")) { + # getDatabricksToken <- get(".rs.api.getDatabricksToken") + # token <- set_names(getDatabricksToken(databricks_host()), "oauth") + # } if (is.null(token)) { if (fail) { rlang::abort(c( paste0( "No authentication token was identified: \n", " - No 'DATABRICKS_TOKEN' environment variable found \n", - " - No Databricks OAuth token found \n", " - Not passed as a function argument" ), "Please add your Token to 'DATABRICKS_TOKEN' inside your .Renviron file." @@ -249,3 +248,4 @@ sanitize_host <- function(url, silent = FALSE) { } ret } + diff --git a/R/python-install.R b/R/python-install.R index 0fafe49..3351488 100644 --- a/R/python-install.R +++ b/R/python-install.R @@ -217,7 +217,8 @@ install_environment <- function( "PyArrow", "grpcio", "google-api-python-client", - "grpcio_status" + "grpcio_status", + "databricks-sdk" ) if (add_torch && install_ml) { diff --git a/R/sparklyr-spark-connect.R b/R/sparklyr-spark-connect.R index 753c0d9..ccbc215 100644 --- a/R/sparklyr-spark-connect.R +++ b/R/sparklyr-spark-connect.R @@ -71,6 +71,7 @@ spark_connect_method.spark_method_databricks_connect <- function( method <- method[[1]] token <- databricks_token(token, fail = FALSE) cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID") + cli_path <- Sys.getenv("DATABRICKS_CLI_PATH", "databricks") master <- databricks_host(master, fail = FALSE) if (host_sanitize && master != "") { master <- sanitize_host(master, silent) @@ -102,7 +103,8 @@ spark_connect_method.spark_method_databricks_connect <- function( return(invisible) } - db <- import_check("databricks.connect", envname, silent) + db_connect <- import_check("databricks.connect", envname, silent) + db_sdk <- import_check("databricks.sdk", envname, silent) if (!is.null(cluster_info)) { msg <- "{.header Connecting to} {.emph '{cluster_info$name}'}" @@ -119,17 +121,36 @@ spark_connect_method.spark_method_databricks_connect <- function( cli_progress_step(msg, msg_done) } - remote_args <- list() - if (master != "") remote_args$host <- master - if (token != "") remote_args$token <- token - if (cluster_id != "") remote_args$cluster_id <- cluster_id + # config + # if token is found, propagate + # otherwise trust in sdk to detect and do what it can? + if (token != "") { + config <- db_sdk$core$Config( + host = master, + token = token, + cluster_id = cluster_id, + auth_type = "pat" + ) + } else { + config <- db_sdk$core$Config(host = master, cluster_id = cluster_id) + } - databricks_session <- function(...) { - user_agent <- build_user_agent() - db$DatabricksSession$builder$remote(...)$userAgent(user_agent) + if (!httr2:::is_hosted_session() && nchar(Sys.which(cli_path)) != 0) { + # When on desktop, try using the Databricks CLI for auth. + output <- suppressWarnings( + system2( + cli_path, + c("auth", "login", "--host", master), + stdout = TRUE, + stderr = TRUE + ) + ) } - conn <- exec(databricks_session, !!!remote_args) + user_agent <- build_user_agent() + conn <- db_connect$DatabricksSession$builder$sdkConfig(config)$userAgent(user_agent) + + if (!silent) { cli_progress_done() @@ -173,6 +194,9 @@ initialize_connection <- function( "ignore", message = "Index.format is deprecated and will be removed in a future version" ) + + assign("conn", conn, .GlobalEnv) + session <- conn$getOrCreate() get_version <- try(session$version, silent = TRUE) if (inherits(get_version, "try-error")) databricks_dbr_error(get_version) From 14b9e2d5c7326233ac7baa23daa1acb084f17c9a Mon Sep 17 00:00:00 2001 From: Zac Davies Date: Mon, 14 Oct 2024 22:17:38 +1100 Subject: [PATCH 2/7] first complete attempt at adding support for serverless while deferring to SDK for auth. This should enable full OAuth support. --- R/sparklyr-spark-connect.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/R/sparklyr-spark-connect.R b/R/sparklyr-spark-connect.R index ccbc215..60dbd2a 100644 --- a/R/sparklyr-spark-connect.R +++ b/R/sparklyr-spark-connect.R @@ -103,7 +103,7 @@ spark_connect_method.spark_method_databricks_connect <- function( return(invisible) } - db_connect <- import_check("databricks.connect", envname, silent) + dbc <- import_check("databricks.connect", envname, silent) db_sdk <- import_check("databricks.sdk", envname, silent) if (!is.null(cluster_info)) { @@ -129,6 +129,7 @@ spark_connect_method.spark_method_databricks_connect <- function( host = master, token = token, cluster_id = cluster_id, + auth_type = "pat" ) } else { @@ -148,7 +149,7 @@ spark_connect_method.spark_method_databricks_connect <- function( } user_agent <- build_user_agent() - conn <- db_connect$DatabricksSession$builder$sdkConfig(config)$userAgent(user_agent) + conn <- dbc$DatabricksSession$builder$sdkConfig(config)$userAgent(user_agent) From 17caa0ddf2c70ccf53ea5c0023ef194eafb261f9 Mon Sep 17 00:00:00 2001 From: Zac Davies Date: Mon, 14 Oct 2024 22:18:00 +1100 Subject: [PATCH 3/7] adding serverless components --- R/sparklyr-spark-connect.R | 64 ++++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/R/sparklyr-spark-connect.R b/R/sparklyr-spark-connect.R index 60dbd2a..1491b67 100644 --- a/R/sparklyr-spark-connect.R +++ b/R/sparklyr-spark-connect.R @@ -57,6 +57,7 @@ spark_connect_method.spark_method_databricks_connect <- function( config, app_name, version = NULL, + serverless = FALSE, hadoop_version, extensions, scala_version, @@ -77,17 +78,22 @@ spark_connect_method.spark_method_databricks_connect <- function( master <- sanitize_host(master, silent) } + # if serverless is TRUE, cluster_id is overruled (set to NULL) cluster_info <- NULL - if (cluster_id != "" && master != "" && token != "") { - cluster_info <- databricks_dbr_version_name( - cluster_id = cluster_id, - host = master, - token = token, - silent = silent - ) - if (is.null(version)) { - version <- cluster_info$version + if (!serverless) { + if (cluster_id != "" && master != "" && token != "") { + cluster_info <- databricks_dbr_version_name( + cluster_id = cluster_id, + host = master, + token = token, + silent = silent + ) + if (is.null(version)) { + version <- cluster_info$version + } } + } else { + cluster_id <- NULL } envname <- use_envname( @@ -104,16 +110,20 @@ spark_connect_method.spark_method_databricks_connect <- function( } dbc <- import_check("databricks.connect", envname, silent) - db_sdk <- import_check("databricks.sdk", envname, silent) + db_sdk <- import_check("databricks.sdk", envname, silent = TRUE) if (!is.null(cluster_info)) { msg <- "{.header Connecting to} {.emph '{cluster_info$name}'}" msg_done <- "{.header Connected to:} {.emph '{cluster_info$name}'}" master_label <- glue("{cluster_info$name} ({cluster_id})") - } else { + } else if (!serverless) { msg <- "{.header Connecting to} {.emph '{cluster_id}'}" msg_done <- "{.header Connected to:} '{.emph '{cluster_id}'}'" master_label <- glue("Databricks Connect - Cluster: {cluster_id}") + } else if (serverless) { + msg <- "{.header Connecting to} {.emph 'serverless'}" + msg_done <- "{.header Connected to:} '{.emph 'serverless'}'" + master_label <- glue("Databricks Connect - Cluster: serverless") } if (!silent) { @@ -121,21 +131,24 @@ spark_connect_method.spark_method_databricks_connect <- function( cli_progress_step(msg, msg_done) } - # config + # sdk config + conf_args <- list(host = master) # if token is found, propagate # otherwise trust in sdk to detect and do what it can? if (token != "") { - config <- db_sdk$core$Config( - host = master, - token = token, - cluster_id = cluster_id, + conf_args$token <- token + conf_args$auth_type <- "pat" + } - auth_type = "pat" - ) + if (serverless) { + conf_args$serverless_compute_id <- "auto" } else { - config <- db_sdk$core$Config(host = master, cluster_id = cluster_id) + conf_args$cluster_id <- cluster_id } + sdk_config <- db_sdk$core$Config(!!!conf_args) + + # unsure if this iss needed anymore? if (!httr2:::is_hosted_session() && nchar(Sys.which(cli_path)) != 0) { # When on desktop, try using the Databricks CLI for auth. output <- suppressWarnings( @@ -149,8 +162,7 @@ spark_connect_method.spark_method_databricks_connect <- function( } user_agent <- build_user_agent() - conn <- dbc$DatabricksSession$builder$sdkConfig(config)$userAgent(user_agent) - + conn <- dbc$DatabricksSession$builder$sdkConfig(sdk_config)$userAgent(user_agent) if (!silent) { @@ -163,6 +175,7 @@ spark_connect_method.spark_method_databricks_connect <- function( master_label = master_label, con_class = "connect_databricks", cluster_id = cluster_id, + serverless = serverless, method = method, config = config ) @@ -173,6 +186,7 @@ initialize_connection <- function( master_label, con_class, cluster_id = NULL, + serverless = NULL, method = NULL, config = NULL) { warnings <- import("warnings") @@ -201,9 +215,11 @@ initialize_connection <- function( session <- conn$getOrCreate() get_version <- try(session$version, silent = TRUE) if (inherits(get_version, "try-error")) databricks_dbr_error(get_version) - session$conf$set("spark.sql.session.localRelationCacheThreshold", 1048576L) - session$conf$set("spark.sql.execution.arrow.pyspark.enabled", "true") - session$conf$set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false") + if (!serverless) { + session$conf$set("spark.sql.session.localRelationCacheThreshold", 1048576L) + session$conf$set("spark.sql.execution.arrow.pyspark.enabled", "true") + session$conf$set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false") + } # do we need this `spark_context` object? spark_context <- list(spark_context = session) From e5fa2578e8df90269ffda18cfdb845ceb70fba2d Mon Sep 17 00:00:00 2001 From: Zac Davies Date: Mon, 14 Oct 2024 22:25:05 +1100 Subject: [PATCH 4/7] Updating NEWS --- NEWS.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/NEWS.md b/NEWS.md index 9d20db3..b620940 100644 --- a/NEWS.md +++ b/NEWS.md @@ -7,6 +7,11 @@ * No longer install 'rpy2' by default. It will prompt user for installation the first time `spark_apply()` is called (#125) +* Adding support for Databricks serverless interactive compute (#127) + +* Extended authentication method support for Databricks by deferring to SDK +(#127) + # pysparklyr 0.1.5 ### Improvements From dd4bd04b59d489b3dcdcc2b4904baffd3bea0bfb Mon Sep 17 00:00:00 2001 From: Zac Davies Date: Mon, 14 Oct 2024 22:31:55 +1100 Subject: [PATCH 5/7] removing testing logic --- R/sparklyr-spark-connect.R | 2 -- 1 file changed, 2 deletions(-) diff --git a/R/sparklyr-spark-connect.R b/R/sparklyr-spark-connect.R index 1491b67..0220712 100644 --- a/R/sparklyr-spark-connect.R +++ b/R/sparklyr-spark-connect.R @@ -210,8 +210,6 @@ initialize_connection <- function( message = "Index.format is deprecated and will be removed in a future version" ) - assign("conn", conn, .GlobalEnv) - session <- conn$getOrCreate() get_version <- try(session$version, silent = TRUE) if (inherits(get_version, "try-error")) databricks_dbr_error(get_version) From 44403030f633d43e37957d2c27b4107af0793dc0 Mon Sep 17 00:00:00 2001 From: Zac Davies Date: Tue, 15 Oct 2024 18:30:21 +1100 Subject: [PATCH 6/7] wip refactoring to use Databricks py SDK heavily --- R/databricks-utils.R | 122 ++++++++++++++++++++++--------------- R/deploy.R | 1 + R/sparklyr-spark-connect.R | 112 +++++++++++++++++----------------- 3 files changed, 131 insertions(+), 104 deletions(-) diff --git a/R/databricks-utils.R b/R/databricks-utils.R index a549c4d..245c97a 100644 --- a/R/databricks-utils.R +++ b/R/databricks-utils.R @@ -27,6 +27,10 @@ databricks_host <- function(host = NULL, fail = TRUE) { } databricks_token <- function(token = NULL, fail = FALSE) { + # if token provided, return + # otherwise, search for token: + # DATABRICKS_TOKEN > CONNECT_DATABRICKS_TOKEN > .rs.api.getDatabricksToken + if (!is.null(token)) { return(set_names(token, "argument")) } @@ -42,11 +46,11 @@ databricks_token <- function(token = NULL, fail = FALSE) { } } } - # # Checks for OAuth Databricks token inside the RStudio API - # if (is.null(token) && exists(".rs.api.getDatabricksToken")) { - # getDatabricksToken <- get(".rs.api.getDatabricksToken") - # token <- set_names(getDatabricksToken(databricks_host()), "oauth") - # } + # Checks for OAuth Databricks token inside the RStudio API + if (is.null(token) && exists(".rs.api.getDatabricksToken")) { + getDatabricksToken <- get(".rs.api.getDatabricksToken") + token <- set_names(getDatabricksToken(databricks_host()), "oauth") + } if (is.null(token)) { if (fail) { rlang::abort(c( @@ -65,15 +69,13 @@ databricks_token <- function(token = NULL, fail = FALSE) { } databricks_dbr_version_name <- function(cluster_id, - host = NULL, - token = NULL, + client, silent = FALSE) { bullets <- NULL version <- NULL cluster_info <- databricks_dbr_info( cluster_id = cluster_id, - host = host, - token = token, + client = client, silent = silent ) cluster_name <- substr(cluster_info$cluster_name, 1, 100) @@ -95,8 +97,7 @@ databricks_extract_version <- function(x) { } databricks_dbr_info <- function(cluster_id, - host = NULL, - token = NULL, + client, silent = FALSE) { cli_div(theme = cli_colors()) @@ -108,10 +109,10 @@ databricks_dbr_info <- function(cluster_id, ) } - out <- databricks_cluster_get(cluster_id, host, token) + out <- databricks_cluster_get(cluster_id, client) if (inherits(out, "try-error")) { - sanitized <- sanitize_host(host, silent) - out <- databricks_cluster_get(cluster_id, sanitized, token) + # sanitized <- sanitize_host(host, silent) + out <- databricks_cluster_get(cluster_id, client) } if (inherits(out, "try-error")) { @@ -158,30 +159,17 @@ databricks_dbr_info <- function(cluster_id, out } -databricks_dbr_version <- function(cluster_id, - host = NULL, - token = NULL) { +databricks_dbr_version <- function(cluster_id, client) { vn <- databricks_dbr_version_name( cluster_id = cluster_id, - host = host, - token = token + client = client ) vn$version } -databricks_cluster_get <- function(cluster_id, - host = NULL, - token = NULL) { +databricks_cluster_get <- function(cluster_id, client) { try( - paste0( - host, - "/api/2.0/clusters/get" - ) %>% - request() %>% - req_auth_bearer_token(token) %>% - req_body_json(list(cluster_id = cluster_id)) %>% - req_perform() %>% - resp_body_json(), + client$clusters$get(cluster_id = cluster_id)$as_dict(), silent = TRUE ) } @@ -226,26 +214,62 @@ databricks_dbr_error <- function(error) { ) } -sanitize_host <- function(url, silent = FALSE) { - parsed_url <- url_parse(url) - new_url <- url_parse("http://localhost") - if (is.null(parsed_url$scheme)) { - new_url$scheme <- "https" - if (!is.null(parsed_url$path) && is.null(parsed_url$hostname)) { - new_url$hostname <- parsed_url$path - } +# sanitize_host <- function(url, silent = FALSE) { +# parsed_url <- url_parse(url) +# new_url <- url_parse("http://localhost") +# if (is.null(parsed_url$scheme)) { +# new_url$scheme <- "https" +# if (!is.null(parsed_url$path) && is.null(parsed_url$hostname)) { +# new_url$hostname <- parsed_url$path +# } +# } else { +# new_url$scheme <- parsed_url$scheme +# new_url$hostname <- parsed_url$hostname +# } +# ret <- url_build(new_url) +# if (ret != url && !silent) { +# cli_div(theme = cli_colors()) +# cli_alert_warning( +# "{.header Changing host URL to:} {.emph {ret}}" +# ) +# cli_end() +# } +# ret +# } + +# from httr2 +is_hosted_session <- function () { + if (nzchar(Sys.getenv("COLAB_RELEASE_TAG"))) { + return(TRUE) + } + Sys.getenv("RSTUDIO_PROGRAM_MODE") == "server" && + !grepl("localhost", Sys.getenv("RSTUDIO_HTTP_REFERER"), fixed = TRUE) +} + +databricks_desktop_login <- function(host = NULL, profile = NULL) { + + # host takes priority over profile + if (!is.null(host)) { + method <- "--host" + value <- host + } else if (!is.null(profile)) { + method <- "--profile" + value <- profile } else { - new_url$scheme <- parsed_url$scheme - new_url$hostname <- parsed_url$hostname + # todo rlang error? + stop("must specifiy `host` or `profile`, neither were set") } - ret <- url_build(new_url) - if (ret != url && !silent) { - cli_div(theme = cli_colors()) - cli_alert_warning( - "{.header Changing host URL to:} {.emph {ret}}" + + cli_path <- Sys.getenv("DATABRICKS_CLI_PATH", "databricks") + if (!is_hosted_session() && nchar(Sys.which(cli_path)) != 0) { + # When on desktop, try using the Databricks CLI for auth. + output <- suppressWarnings( + system2( + cli_path, + c("auth", "login", method, value), + stdout = TRUE, + stderr = TRUE + ) ) - cli_end() } - ret } - diff --git a/R/deploy.R b/R/deploy.R index c99a98c..1e53bdc 100644 --- a/R/deploy.R +++ b/R/deploy.R @@ -52,6 +52,7 @@ deploy_databricks <- function( cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID") + # TODO: this needs to be adjusted to use client, might need to refactor? if (is.null(version) && !is.null(cluster_id)) { version <- databricks_dbr_version( cluster_id = cluster_id, diff --git a/R/sparklyr-spark-connect.R b/R/sparklyr-spark-connect.R index 0220712..605bc22 100644 --- a/R/sparklyr-spark-connect.R +++ b/R/sparklyr-spark-connect.R @@ -58,6 +58,7 @@ spark_connect_method.spark_method_databricks_connect <- function( app_name, version = NULL, serverless = FALSE, + profile = NULL, hadoop_version, extensions, scala_version, @@ -70,32 +71,14 @@ spark_connect_method.spark_method_databricks_connect <- function( silent <- args$silent %||% FALSE method <- method[[1]] + + token <- databricks_token(token, fail = FALSE) cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID") - cli_path <- Sys.getenv("DATABRICKS_CLI_PATH", "databricks") - master <- databricks_host(master, fail = FALSE) - if (host_sanitize && master != "") { - master <- sanitize_host(master, silent) - } - # if serverless is TRUE, cluster_id is overruled (set to NULL) - cluster_info <- NULL - if (!serverless) { - if (cluster_id != "" && master != "" && token != "") { - cluster_info <- databricks_dbr_version_name( - cluster_id = cluster_id, - host = master, - token = token, - silent = silent - ) - if (is.null(version)) { - version <- cluster_info$version - } - } - } else { - cluster_id <- NULL - } + + # load python env envname <- use_envname( backend = "databricks", version = version, @@ -109,37 +92,31 @@ spark_connect_method.spark_method_databricks_connect <- function( return(invisible) } + # load python libs dbc <- import_check("databricks.connect", envname, silent) db_sdk <- import_check("databricks.sdk", envname, silent = TRUE) - if (!is.null(cluster_info)) { - msg <- "{.header Connecting to} {.emph '{cluster_info$name}'}" - msg_done <- "{.header Connected to:} {.emph '{cluster_info$name}'}" - master_label <- glue("{cluster_info$name} ({cluster_id})") - } else if (!serverless) { - msg <- "{.header Connecting to} {.emph '{cluster_id}'}" - msg_done <- "{.header Connected to:} '{.emph '{cluster_id}'}'" - master_label <- glue("Databricks Connect - Cluster: {cluster_id}") - } else if (serverless) { - msg <- "{.header Connecting to} {.emph 'serverless'}" - msg_done <- "{.header Connected to:} '{.emph 'serverless'}'" - master_label <- glue("Databricks Connect - Cluster: serverless") - } + # SDK behaviour + # https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html#default-authentication-flow - if (!silent) { - cli_div(theme = cli_colors()) - cli_progress_step(msg, msg_done) - } + conf_args <- list() - # sdk config - conf_args <- list(host = master) - # if token is found, propagate - # otherwise trust in sdk to detect and do what it can? - if (token != "") { + # the profile as specified - which has a default of 'DEFAULT' + # otherwise, if a token is found, propagate to SDK config + + # TODO: emit messages about connection here? + # specific vars taken priority, profile only works when no env vars are set + if (token != "" && master != "") { + conf_args$host <- master conf_args$token <- token conf_args$auth_type <- "pat" + databricks_desktop_login(host = master) + } else if (!is.null(profile)) { + conf_args$profile <- profile + databricks_desktop_login(profile = profile) } + # serverless config related settings if (serverless) { conf_args$serverless_compute_id <- "auto" } else { @@ -148,23 +125,48 @@ spark_connect_method.spark_method_databricks_connect <- function( sdk_config <- db_sdk$core$Config(!!!conf_args) - # unsure if this iss needed anymore? - if (!httr2:::is_hosted_session() && nchar(Sys.which(cli_path)) != 0) { - # When on desktop, try using the Databricks CLI for auth. - output <- suppressWarnings( - system2( - cli_path, - c("auth", "login", "--host", master), - stdout = TRUE, - stderr = TRUE + # create workspace client + sdk_client <- db_sdk$WorkspaceClient(config = sdk_config) + + # if serverless is TRUE, cluster_id is overruled (set to NULL) + cluster_info <- NULL + if (!serverless) { + if (cluster_id != "" && master != "" && token != "") { + cluster_info <- databricks_dbr_version_name( + cluster_id = cluster_id, + client = sdk_client, + silent = silent ) - ) + if (is.null(version)) { + version <- cluster_info$version + } + } + } else { + cluster_id <- NULL + } + + if (!is.null(cluster_info)) { + msg <- "{.header Connecting to} {.emph '{cluster_info$name}'}" + msg_done <- "{.header Connected to:} {.emph '{cluster_info$name}'}" + master_label <- glue("{cluster_info$name} ({cluster_id})") + } else if (!serverless) { + msg <- "{.header Connecting to} {.emph '{cluster_id}'}" + msg_done <- "{.header Connected to:} '{.emph '{cluster_id}'}'" + master_label <- glue("Databricks Connect - Cluster: {cluster_id}") + } else if (serverless) { + msg <- "{.header Connecting to} {.emph serverless}" + msg_done <- "{.header Connected to:} '{.emph serverless}'" + master_label <- glue("Databricks Connect - Cluster: serverless") + } + + if (!silent) { + cli_div(theme = cli_colors()) + cli_progress_step(msg, msg_done) } user_agent <- build_user_agent() conn <- dbc$DatabricksSession$builder$sdkConfig(sdk_config)$userAgent(user_agent) - if (!silent) { cli_progress_done() cli_end() From 811dacf7d036856f3d49adf2d7ddbba4e462603d Mon Sep 17 00:00:00 2001 From: Zac Davies Date: Fri, 18 Oct 2024 15:52:17 +1100 Subject: [PATCH 7/7] fix check error for connection method + remove sanitize for now. --- R/databricks-utils.R | 23 ----------------------- R/sparklyr-spark-connect.R | 7 ++----- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/R/databricks-utils.R b/R/databricks-utils.R index 245c97a..eab2206 100644 --- a/R/databricks-utils.R +++ b/R/databricks-utils.R @@ -214,29 +214,6 @@ databricks_dbr_error <- function(error) { ) } -# sanitize_host <- function(url, silent = FALSE) { -# parsed_url <- url_parse(url) -# new_url <- url_parse("http://localhost") -# if (is.null(parsed_url$scheme)) { -# new_url$scheme <- "https" -# if (!is.null(parsed_url$path) && is.null(parsed_url$hostname)) { -# new_url$hostname <- parsed_url$path -# } -# } else { -# new_url$scheme <- parsed_url$scheme -# new_url$hostname <- parsed_url$hostname -# } -# ret <- url_build(new_url) -# if (ret != url && !silent) { -# cli_div(theme = cli_colors()) -# cli_alert_warning( -# "{.header Changing host URL to:} {.emph {ret}}" -# ) -# cli_end() -# } -# ret -# } - # from httr2 is_hosted_session <- function () { if (nzchar(Sys.getenv("COLAB_RELEASE_TAG"))) { diff --git a/R/sparklyr-spark-connect.R b/R/sparklyr-spark-connect.R index 605bc22..b1e1a3f 100644 --- a/R/sparklyr-spark-connect.R +++ b/R/sparklyr-spark-connect.R @@ -57,14 +57,14 @@ spark_connect_method.spark_method_databricks_connect <- function( config, app_name, version = NULL, - serverless = FALSE, - profile = NULL, hadoop_version, extensions, scala_version, ...) { args <- list(...) cluster_id <- args$cluster_id + serverless <- args$serverless %||% FALSE + profile <- args$profile %||% NULL token <- args$token envname <- args$envname host_sanitize <- args$host_sanitize %||% TRUE @@ -72,12 +72,9 @@ spark_connect_method.spark_method_databricks_connect <- function( method <- method[[1]] - token <- databricks_token(token, fail = FALSE) cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID") - - # load python env envname <- use_envname( backend = "databricks",