Skip to content

Commit

Permalink
First stab
Browse files Browse the repository at this point in the history
  • Loading branch information
hadley committed Nov 7, 2023
1 parent cbe1b99 commit 8105d7b
Show file tree
Hide file tree
Showing 22 changed files with 569 additions and 124 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Collate:
'src-sql.R'
'src_dbi.R'
'table-ident.R'
'table-name.R'
'tbl-lazy.R'
'tbl-sql.R'
'test-frame.R'
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ S3method(escape,data.frame)
S3method(escape,dbplyr_catalog)
S3method(escape,dbplyr_schema)
S3method(escape,dbplyr_table_ident)
S3method(escape,dbplyr_table_name)
S3method(escape,double)
S3method(escape,factor)
S3method(escape,ident)
Expand Down Expand Up @@ -178,6 +179,7 @@ S3method(print,base_query)
S3method(print,dbplyr_catalog)
S3method(print,dbplyr_schema)
S3method(print,dbplyr_sql_options)
S3method(print,dbplyr_table_name)
S3method(print,ident)
S3method(print,join_query)
S3method(print,lazy_base_local_query)
Expand Down
5 changes: 4 additions & 1 deletion R/backend-mssql.R
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,10 @@ mssql_version <- function(con) {
# <https://docs.microsoft.com/en-us/previous-versions/sql/sql-server-2008-r2/ms177399%28v%3dsql.105%29#temporary-tables>
#' @export
`db_table_temporary.Microsoft SQL Server` <- function(con, table, temporary, ...) {
table <- as_table_ident(table)
table <- as_table_name(table, con)

# TODO: FIX ME

table_name <- vctrs::field(table, "table")

if (temporary && substr(table_name, 1, 1) != "#") {
Expand Down
2 changes: 1 addition & 1 deletion R/backend-mysql.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ db_connection_describe.MySQLConnection <- db_connection_describe.MariaDBConnecti

#' @export
db_col_types.MariaDBConnection <- function(con, table, call) {
table <- as_table_ident(table, error_call = call)
table <- as_table_name(table, con, error_call = call)
col_info_df <- DBI::dbGetQuery(con, glue_sql2(con, "SHOW COLUMNS FROM {.tbl table};"))
set_names(col_info_df[["Type"]], col_info_df[["Field"]])
}
Expand Down
4 changes: 2 additions & 2 deletions R/backend-postgres-old.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ db_write_table.PostgreSQLConnection <- function(con,
values,
temporary = TRUE,
...) {
table <- as_table_ident(table)
table <- as_table_name(table, con)
if (!isFALSE(temporary)) {
cli_abort(c(
"RPostgreSQL backend does not support creation of temporary tables",
Expand All @@ -27,7 +27,7 @@ db_write_table.PostgreSQLConnection <- function(con,
# the bare table name
dbWriteTable(
con,
name = vctrs::field(table, "table"),
name = SQL(unclass(table)),
value = values,
field.types = types,
...,
Expand Down
2 changes: 1 addition & 1 deletion R/backend-postgres.R
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ db_supports_table_alias_with_as.PostgreSQL <- function(con) {

#' @export
db_col_types.PqConnection <- function(con, table, call) {
table <- as_table_ident(table, error_call = call)
table <- as_table_name(table, con, error_call = call)
res <- DBI::dbSendQuery(con, glue_sql2(con, "SELECT * FROM {.tbl table} LIMIT 0"))
on.exit(DBI::dbClearResult(res))
DBI::dbFetch(res, n = 0)
Expand Down
2 changes: 1 addition & 1 deletion R/backend-spark-sql.R
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ simulate_spark_sql <- function() simulate_dbi("Spark SQL")
cli::cli_abort("Spark SQL only support temporary tables")
}

table <- as_table_ident(table)
table <- as_table_name(table, con)
sql <- glue_sql2(
con,
"CREATE ", if (overwrite) "OR REPLACE ",
Expand Down
4 changes: 2 additions & 2 deletions R/build-sql.R
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ sql_quote_transformer <- function(connection) {
glue_check_collapse(type, should_collapse)

if (type == "tbl") {
value <- as_table_ident(value)
value <- as_table_name(value, connection)
} else if (type == "from") {
value <- as_from(value)
value <- as_table_source(value, connection)
} else if (type == "col") {
if (is_bare_character(value)) {
value <- ident(value)
Expand Down
14 changes: 7 additions & 7 deletions R/db-io.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ db_copy_to <- function(con,
indexes = NULL,
analyze = TRUE,
in_transaction = TRUE) {
as_table_ident(table)
check_table_id(table)
check_bool(overwrite)
check_character(types, allow_null = TRUE)
check_named(types)
Expand All @@ -65,14 +65,14 @@ db_copy_to.DBIConnection <- function(con,
indexes = NULL,
analyze = TRUE,
in_transaction = TRUE) {
table <- as_table_ident(table)
table <- as_table_name(table, con)
new <- db_table_temporary(con, table, temporary)
table <- new$table
temporary <- new$temporary
call <- current_env()

with_transaction(con, in_transaction, {
tryCatch(
withCallingHandlers(
{
table <- dplyr::db_write_table(con, table,
types = types,
Expand Down Expand Up @@ -106,7 +106,7 @@ db_compute <- function(con,
indexes = list(),
analyze = TRUE,
in_transaction = TRUE) {
as_table_ident(table)
check_table_id(table)
check_scalar_sql(sql)
check_bool(overwrite)
check_bool(temporary)
Expand All @@ -126,7 +126,7 @@ db_compute.DBIConnection <- function(con,
indexes = list(),
analyze = TRUE,
in_transaction = FALSE) {
table <- as_table_ident(table)
table <- as_table_name(table, con)
new <- db_table_temporary(con, table, temporary)
table <- new$table
temporary <- new$temporary
Expand Down Expand Up @@ -178,7 +178,7 @@ db_write_table.DBIConnection <- function(con,
temporary = TRUE,
...,
overwrite = FALSE) {
table <- as_table_ident(table)
table <- as_table_name(table, con)
check_character(types, allow_null = TRUE)
check_named(types)
check_bool(temporary)
Expand All @@ -187,7 +187,7 @@ db_write_table.DBIConnection <- function(con,
tryCatch(
dbWriteTable(
con,
name = table_ident_to_id(table),
name = SQL(unclass(table)),
value = values,
field.types = types,
temporary = temporary,
Expand Down
84 changes: 39 additions & 45 deletions R/db-sql.R
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ sql_table_index <- function(con,
unique = FALSE,
...,
call = caller_env()) {
as_table_ident(table, error_call = call)
check_table_id(table, call = call)
check_character(columns, call = call)
check_name(name, allow_null = TRUE, call = call)
check_bool(unique, call = call)
Expand All @@ -173,10 +173,8 @@ sql_table_index.DBIConnection <- function(con,
unique = FALSE,
...,
call = caller_env()) {
table <- as_table_ident(table, error_call = call)
table_name <- collapse_table_ident(table, sep = "_")

name <- name %||% paste0(c(table_name, columns), collapse = "_")
table <- as_table_name(table, con, error_call = call)
name <- name %||% paste0(c(table, columns), collapse = "_")
glue_sql2(
con,
"CREATE ", if (unique) "UNIQUE ", "INDEX {.name name}",
Expand All @@ -201,32 +199,29 @@ sql_query_explain.DBIConnection <- function(con, sql, ...) {
#' @rdname db-sql
#' @export
sql_query_fields <- function(con, sql, ...) {
as_from(sql)
check_table_source(sql)
check_dots_used()

UseMethod("sql_query_fields")
}
#' @export
sql_query_fields.DBIConnection <- function(con, sql, ...) {
sql <- as_from(sql)

sql <- as_table_source(sql, con)
dbplyr_query_select(con, sql("*"), dbplyr_sql_subquery(con, sql), where = sql("0 = 1"))
}

#' @rdname db-sql
#' @export
sql_query_save <- function(con, sql, name, temporary = TRUE, ...) {
as_from(sql)
as_table_ident(name)
check_table_id(name)
check_bool(temporary)
check_dots_used()

UseMethod("sql_query_save")
}
#' @export
sql_query_save.DBIConnection <- function(con, sql, name, temporary = TRUE, ...) {
sql <- as_from(sql)
name <- as_table_ident(name)
name <- as_table_name(name, con)

glue_sql2(
con,
Expand All @@ -245,7 +240,7 @@ sql_query_wrap <- function(con, from, name = NULL, ..., lvl = 0) {
}
#' @export
sql_query_wrap.DBIConnection <- function(con, from, name = NULL, ..., lvl = 0) {
from <- as_from(from)
from <- as_table_source(from, con)

if (is.sql(from)) {
if (db_supports_table_alias_with_as(con)) {
Expand All @@ -257,11 +252,11 @@ sql_query_wrap.DBIConnection <- function(con, from, name = NULL, ..., lvl = 0) {
from <- sql_indent_subquery(from, con, lvl)
# some backends, e.g. Postgres, require an alias for a subquery
name <- name %||% unique_subquery_name()
out <- glue_sql2(con, "{from}", as_sql, "{.name name}")
return(out)
glue_sql2(con, "{from}", as_sql, "{.name name}")
} else { # must be a table_name
name <- sql_escape_ident(con, name)
setNames(from, name)
}

set_table_ident_alias(from, name)
}

#' @export
Expand All @@ -286,14 +281,14 @@ sql_indent_subquery <- function(from, con, lvl = 0) {
#' @rdname db-sql
#' @export
sql_query_rows <- function(con, sql, ...) {
as_from(sql)
check_table_source(sql)
check_dots_used()

UseMethod("sql_query_rows")
}
#' @export
sql_query_rows.DBIConnection <- function(con, sql, ...) {
sql <- as_from(sql)
sql <- as_table_source(sql, con)
from <- dbplyr_sql_subquery(con, sql, "master")
glue_sql2(con, "SELECT COUNT(*) FROM {.from from}")
}
Expand Down Expand Up @@ -770,8 +765,8 @@ sql_query_insert <- function(con,
conflict = c("error", "ignore"),
returning_cols = NULL,
method = NULL) {
as_table_ident(table)
as_from(from)
check_table_id(table)
check_table_source(sql)
check_character(insert_cols)
check_character(by)
check_character(returning_cols, allow_null = TRUE)
Expand All @@ -790,8 +785,8 @@ sql_query_insert.DBIConnection <- function(con,
conflict = c("error", "ignore"),
returning_cols = NULL,
method = NULL) {
table <- as_table_ident(table)
from <- as_from(from)
table <- as_table_name(table, con)
from <- as_table_source(from, con)

method <- method %||% "where_not_exists"
arg_match(method, "where_not_exists", error_arg = "method")
Expand Down Expand Up @@ -838,8 +833,8 @@ sql_query_append <- function(con,
return(out)
}

as_table_ident(table)
as_from(from)
check_table_id(table)
check_table_source(from)
check_character(insert_cols)
check_character(returning_cols, allow_null = TRUE)

Expand All @@ -854,8 +849,8 @@ sql_query_append.DBIConnection <- function(con,
insert_cols,
...,
returning_cols = NULL) {
table <- as_table_ident(table)
from <- as_from(from)
table <- as_table_name(table, con)
from <- as_table_source(from, con)

# https://stackoverflow.com/questions/25969/insert-into-values-select-from
parts <- rows_prep(con, table, from, by = list(), lvl = 0)
Expand All @@ -880,8 +875,8 @@ sql_query_update_from <- function(con,
update_values,
...,
returning_cols = NULL) {
as_table_ident(table)
as_from(from)
check_table_id(table)
check_table_source(from)
check_character(by)
check_character(update_values)
check_named(update_values)
Expand All @@ -899,8 +894,8 @@ sql_query_update_from.DBIConnection <- function(con,
update_values,
...,
returning_cols = NULL) {
table <- as_table_ident(table)
from <- as_from(from)
table <- as_table_name(table, con)
from <- as_table_source(from, con)

# https://stackoverflow.com/questions/2334712/how-do-i-update-from-a-select-in-sql-server
parts <- rows_prep(con, table, from, by, lvl = 0)
Expand Down Expand Up @@ -928,8 +923,8 @@ sql_query_upsert <- function(con,
...,
returning_cols = NULL,
method = NULL) {
as_table_ident(table)
as_from(from)
check_table_id(table)
check_table_source(from)
check_character(by)
check_character(update_cols)
check_character(returning_cols, allow_null = TRUE)
Expand All @@ -949,8 +944,8 @@ sql_query_upsert.DBIConnection <- function(con,
...,
returning_cols = NULL,
method = NULL) {
table <- as_table_ident(table)
from <- as_from(from)
table <- as_table_name(table, con)
from <- as_table_source(from, con)

method <- method %||% "cte_update"
arg_match(method, "cte_update", error_arg = "method")
Expand Down Expand Up @@ -998,8 +993,8 @@ sql_query_delete <- function(con,
by,
...,
returning_cols = NULL) {
as_table_ident(table)
as_from(from)
check_table_id(table)
check_table_source(from)
check_character(by)
check_character(returning_cols, allow_null = TRUE)

Expand All @@ -1014,8 +1009,8 @@ sql_query_delete.DBIConnection <- function(con,
by,
...,
returning_cols = NULL) {
table <- as_table_ident(table)
from <- as_from(from)
table <- as_table_name(table)
from <- as_table_source(from, con)
parts <- rows_prep(con, table, from, by, lvl = 1)

clauses <- list2(
Expand Down Expand Up @@ -1143,15 +1138,14 @@ db_save_query.DBIConnection <- function(con,
temporary = TRUE,
...,
overwrite = FALSE) {
sql <- sql_query_save(con, sql, name, temporary = temporary, ...)
tryCatch(
name <- as_table_name(name, con)
sql <- sql_query_save(con, sql(sql), name, temporary = temporary, ...)
withCallingHandlers(
{
if (overwrite) {
name <- as_table_ident(name)
name_id <- table_ident_to_id(name)
found <- DBI::dbExistsTable(con, name_id)
found <- DBI::dbExistsTable(con, SQL(unclass(name)))
if (found) {
DBI::dbRemoveTable(con, name_id)
DBI::dbRemoveTable(con, SQL(unclass(name)))
}
}
DBI::dbExecute(con, sql, immediate = TRUE)
Expand Down
Loading

0 comments on commit 8105d7b

Please sign in to comment.