Skip to content

Commit

Permalink
better 79 raw import
Browse files Browse the repository at this point in the history
ref #3
  • Loading branch information
wibeasley committed Oct 17, 2018
1 parent 35ae665 commit 692622f
Show file tree
Hide file tree
Showing 3 changed files with 447 additions and 424 deletions.
210 changes: 125 additions & 85 deletions dal/import-79-raw.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,24 @@ columns_to_drop <- c("A0002600", "Y2267000")

ds_extract <- tibble::tribble(
~schema , ~table_name , ~file_name
,"Extract" , "tblGen1Explicit" , "nlsy79-gen1/Gen1Explicit.csv"
,"Extract" , "tblGen1Implicit" , "nlsy79-gen1/Gen1Implicit.csv"
,"Extract" , "tblGen1Links" , "nlsy79-gen1/Gen1Links.csv"
,"Extract" , "tblGen1Outcomes" , "nlsy79-gen1/Gen1Outcomes.csv"
,"Extract" , "tblGen1GeocodeSanitized" , "nlsy79-gen1/Gen1GeocodeSanitized.csv"
# "Process.tblLURosterGen1" , "nlsy79-gen1/RosterGen1.csv"
,"Extract" , "tblGen1Explicit" , "nlsy79-gen1/Gen1Explicit"
,"Extract" , "tblGen1Implicit" , "nlsy79-gen1/Gen1Implicit"
,"Extract" , "tblGen1Links" , "nlsy79-gen1/Gen1Links"
,"Extract" , "tblGen1Outcomes" , "nlsy79-gen1/Gen1Outcomes"
,"Extract" , "tblGen1GeocodeSanitized" , "nlsy79-gen1/Gen1GeocodeSanitized"
# "Process.tblLURosterGen1" , "nlsy79-gen1/RosterGen1"
# tblGen1MzDzDistinction2010
#
,"Extract" , "tblGen2FatherFromGen1" , "nlsy79-gen2/Gen2FatherFromGen1.csv"
,"Extract" , "tblGen2ImplicitFather" , "nlsy79-gen2/Gen2ImplicitFather.csv"
,"Extract" , "tblGen2Links" , "nlsy79-gen2/Gen2Links.csv"
,"Extract" , "tblGen2LinksFromGen1" , "nlsy79-gen2/Gen2LinksFromGen1.csv"
,"Extract" , "tblGen2OutcomesHeight" , "nlsy79-gen2/Gen2OutcomesHeight.csv"
,"Extract" , "tblGen2OutcomesMath" , "nlsy79-gen2/Gen2OutcomesMath.csv"
,"Extract" , "tblGen2OutcomesWeight" , "nlsy79-gen2/Gen2OutcomesWeight.csv"
,"Extract" , "tblGen2FatherFromGen1" , "nlsy79-gen2/Gen2FatherFromGen1"
,"Extract" , "tblGen2ImplicitFather" , "nlsy79-gen2/Gen2ImplicitFather"
,"Extract" , "tblGen2Links" , "nlsy79-gen2/Gen2Links"
,"Extract" , "tblGen2LinksFromGen1" , "nlsy79-gen2/Gen2LinksFromGen1"
,"Extract" , "tblGen2OutcomesHeight" , "nlsy79-gen2/Gen2OutcomesHeight"
,"Extract" , "tblGen2OutcomesMath" , "nlsy79-gen2/Gen2OutcomesMath"
,"Extract" , "tblGen2OutcomesWeight" , "nlsy79-gen2/Gen2OutcomesWeight"


,"Extract" , "tblGen2FatherFromGen1Death" , "nlsy79-gen2/Gen2FatherFromGen1Death.csv"
,"Extract" , "tblGen2FatherFromGen1Death" , "nlsy79-gen2/Gen2FatherFromGen1Death"

# "Extract" , "tbl97Roster" , "nlsy97/97-roster.csv"
)
Expand All @@ -57,22 +57,43 @@ col_types_default <- readr::cols(
checkmate::assert_character(ds_extract$table_name , min.chars=10, any.missing=F, unique=T)
checkmate::assert_character(ds_extract$file_name , min.chars=10, any.missing=F, unique=T)

sql_template_primary_key <- "
ALTER TABLE {table_name_qualified} ADD CONSTRAINT
PK_{table_name} PRIMARY KEY CLUSTERED ( R0000100 )
WITH( STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
"

# ---- load-data ---------------------------------------------------------------
start_time <- Sys.time()

ds_inventory <- database_inventory(study)

ds_extract <- ds_extract %>%
dplyr::mutate(
path = file.path(directory_in, file_name),
extract_exist = file.exists(path),
table_name_qualified = paste0(schema, ".", table_name),
sql_select = glue::glue("SELECT TOP(100) * FROM {table_name_qualified}"),
sql_truncate = glue::glue("TRUNCATE TABLE {table_name_qualified}")
table_name_qualified = paste0(schema, ".", table_name),
table_name = sub("^Extract\\.(\\w+)$", "\\1", table_name_qualified),
path_zip = file.path(directory_in, paste0(file_name, ".zip")),
name_csv = paste0(file_name, ".csv"),
# path_csv = file.path(directory_in, name_csv),
extract_exist = file.exists(path_zip),
sql_select = glue::glue("SELECT TOP(100) * FROM {table_name_qualified}"),
sql_truncate = glue::glue("TRUNCATE TABLE {table_name_qualified}"),
# sql_not_null = glue::glue(sql_template_not_null),
sql_primary_key = glue::glue(sql_template_primary_key)
)
testit::assert("All files should be found.", all(ds_extract$extract_exist))

print(ds_extract, n=20)

ds_extract %>%
dplyr::select(table_name_qualified, path_zip) %>%
print(n=20)

# ---- tweak-data --------------------------------------------------------------
ds_inventory <- ds_inventory %>%
dplyr::mutate(
table_name_qualified = glue::glue("{schema_name}.{table_name}")
)

# ---- verify-values -----------------------------------------------------------
# Sniff out problems
Expand All @@ -87,87 +108,106 @@ DBI::dbGetInfo(channel_odbc)

channel_rodbc <- open_dsn_channel_rodbc(study)

for( i in seq_len(nrow(ds_extract)) ) { # i <- 13L
message(glue::glue("Uploading from `{ds_extract$file_name[i]}` to `{ds_extract$table_name_qualified[i]}`."))
for( i in seq_len(nrow(ds_extract)) ) { # i <- 1L
# for( i in 1 ) { # i <- 1L
message(glue::glue("Uploading from `{ds_extract$path_zip[i]}` to `{ds_extract$table_name_qualified[i]}`."))

d <- readr::read_csv(ds_extract$path[i], col_types=col_types_default)
# Create temp zip file
temp_directory <- tempdir() # file.path(tempdir(), dirname(ds_extract$name_csv[i]))
temp_csv <- file.path(temp_directory, ds_extract$name_csv[i])
utils::unzip(ds_extract$path_zip[i], files=basename(ds_extract$name_csv[i]), exdir=file.path(tempdir(), dirname(ds_extract$name_csv[i])))
if( !file.exists(temp_csv) ) stop("The decompressed csv, `", temp_csv, "` was not found.")

columns_to_drop_specific <- colnames(d) %>%
intersect(columns_to_drop)
# %>%
# glue::glue("{.}")
# Read the temp csv, and delete it
# d <- readr::read_csv(ds_extract$path_csv[i], col_types=col_types_default)
d <- readr::read_csv(temp_csv, col_types=col_types_default)
unlink(temp_csv);

# Drop pre-specified columns from all extracts
columns_to_drop_specific <- intersect(colnames(d), columns_to_drop)
if( length(columns_to_drop_specific) >= 1L ) {
d <- d %>%
dplyr::select_(.dots=paste0("-", columns_to_drop_specific))
}

# Print diagnostic info
# print(dim(d))
# purrr::map_chr(d, class)
print(d, n=20)

#RODBC::sqlQuery(channel_odbc, ds_extract$sql_truncate[i], errors=FALSE)
# d_peek <- RODBC::sqlQuery(channel_odbc, ds_extract$sql_select[i], errors=FALSE)

DBI::dbGetQuery(channel_odbc, ds_extract$sql_truncate[i])

d_peek <- DBI::dbGetQuery(channel_odbc, ds_extract$sql_select[i])
peek <- colnames(d_peek)
# peek <- DBI::dbListFields(channel_odbc, ds_extract$table_name_qualified[i])

missing_in_extract <- setdiff(peek , colnames(d))
missing_in_database <- setdiff(colnames(d), peek )

# d_column <- tibble::tibble(
# db = colnames(d),
# extract = peek
# ) %>%
# dplyr::filter(db != extract)

# system.time({
# DBI::dbWriteTable(
# conn = channel_odbc,
# name = DBI::SQL(ds_extract$table_name[i]),
# value = d, #[, 1:10],
# # append = T,
# overwrite = T
# )
# })

system.time({
RODBC::sqlSave(
channel = channel_rodbc,
dat = d,
tablename = ds_extract$table_name_qualified[i],
safer = FALSE, # Don't keep the existing table.
rownames = FALSE,
append = TRUE
) %>%
print()
})

# system.time({
# DBI::dbWriteTable(
# conn = channel_odbc,
# name = DBI::Id(schema=ds_extract$schema[i], table=ds_extract$table_name[i]),
# value = d,
# overwrite = FALSE,
# append = TRUE
# ) %>%
# print()
# })

# OuhscMunge::upload_sqls_rodbc(
# d = d[1:100, ],
# table_name = ds_extract$table_name_qualified[i] ,
# dsn_name = "local-nlsy-links-79",
# clear_table = F,
# create_table = T
# )

# Write the table to the database. Different operations, depending if the table existings already.
if( ds_extract$table_name_qualified[i] %in% ds_inventory$table_name_qualified ) {
#RODBC::sqlQuery(channel_odbc, ds_extract$sql_truncate[i], errors=FALSE)
# d_peek <- RODBC::sqlQuery(channel_odbc, ds_extract$sql_select[i], errors=FALSE)

# Remove existing records
DBI::dbGetQuery(channel_odbc, ds_extract$sql_truncate[i])

# Compare columns in the database table and in the extract.
d_peek <- DBI::dbGetQuery(channel_odbc, ds_extract$sql_select[i])
peek <- colnames(d_peek)
# peek <- DBI::dbListFields(channel_odbc, ds_extract$table_name_qualified[i])
missing_in_extract <- setdiff(peek , colnames(d))
missing_in_database <- setdiff(colnames(d), peek )
testit::assert("All columns in the database should be in the extract.", length(missing_in_extract )==0L)
testit::assert("All columns in the extract should be in the database.", length(missing_in_database)==0L)

# Write to the database
RODBC::sqlSave(
channel = channel_rodbc,
dat = d,
tablename = ds_extract$table_name_qualified[i],
safer = FALSE, # Don't keep the existing table.
rownames = FALSE,
append = TRUE
) %>%
print()

# I'd like to use the odbc package, but it's still having problems with schema names.
# system.time({
# DBI::dbWriteTable(
# conn = channel_odbc,
# name = DBI::SQL(ds_extract$table_name_qualified[i]),
# value = d, #[, 1:10],
# # append = T,
# overwrite = T
# )
# })

} else {
# If the table doesn't already exist in the database, create it.
OuhscMunge::upload_sqls_rodbc(
d = d,
# d = d[1:100, ],
table_name = ds_extract$table_name_qualified[i] ,
dsn_name = "local-nlsy-links-79",
clear_table = F,
create_table = T
)

colnames(d)

sql_template_not_null <- "
ALTER TABLE {table_name_qualified} ALTER COLUMN [{variable_code}] INTEGER NOT NULL
"
# sql_template_not_null <- "
# ALTER TABLE {%s} ALTER COLUMN [{%s}] INTEGER NOT NULL
# "
sql_not_null <- glue::glue(sql_template_not_null, table_name_qualified=ds_extract$table_name_qualified[i] , variable_code=colnames(d))
sql_not_null <- paste(sql_not_null, collapse="; ")
# sql_not_null <- sprintf(sql_template_not_null, table_name_qualified=ds_extract$table_name_qualified[i] , variable_code=colnames(d))
# sql_not_null


# Make the subject id the primary key.
# DBI::dbGetQuery(channel_odbc, ds_extract$sql_not_null[i])
DBI::dbGetQuery(channel_odbc, sql_not_null)
DBI::dbGetQuery(channel_odbc, ds_extract$sql_primary_key[i])
}

message(glue::glue("Tibble size: {format(object.size(d), units='MB')}"))
}
# Diconnect the connections/channels.
DBI::dbDisconnect(channel_odbc); rm(channel_odbc)
RODBC::odbcClose(channel_rodbc); rm(channel_rodbc)

Expand Down
13 changes: 4 additions & 9 deletions dal/import-97-raw.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ sql_template_primary_key <- "
"

# ---- load-data ---------------------------------------------------------------
ds_inventory <- database_inventory(study)

start_time <- Sys.time()

ds_inventory <- database_inventory(study)

ds_extract <- ds_extract %>%
dplyr::mutate(
table_name = sub("^Extract\\.(\\w+)$", "\\1", table_name_qualified),
Expand All @@ -88,11 +88,6 @@ ds_inventory <- ds_inventory %>%
table_name_qualified = glue::glue("{schema_name}.{table_name}")
)


# ---- inspect -----------------------------------------------------------------



# ---- verify-values -----------------------------------------------------------
# Sniff out problems

Expand Down Expand Up @@ -133,7 +128,7 @@ for( i in seq_len(nrow(ds_extract)) ) { # i <- 1L
# purrr::map_chr(d, class)
print(d, n=20)

# Write the table to teh database. Different operations, depending if the table existings already.
# Write the table to the database. Different operations, depending if the table existings already.
if( ds_extract$table_name_qualified[i] %in% ds_inventory$table_name_qualified ) {
#RODBC::sqlQuery(channel_odbc, ds_extract$sql_truncate[i], errors=FALSE)
# d_peek <- RODBC::sqlQuery(channel_odbc, ds_extract$sql_select[i], errors=FALSE)
Expand All @@ -147,7 +142,7 @@ for( i in seq_len(nrow(ds_extract)) ) { # i <- 1L
# peek <- DBI::dbListFields(channel_odbc, ds_extract$table_name_qualified[i])
missing_in_extract <- setdiff(peek , colnames(d))
missing_in_database <- setdiff(colnames(d), peek )
testit::assert("All columns in the database should be in the extract.", length(missing_in_extract )==0L )
testit::assert("All columns in the database should be in the extract.", length(missing_in_extract )==0L)
testit::assert("All columns in the extract should be in the database.", length(missing_in_database)==0L)

# Write to the database
Expand Down
Loading

0 comments on commit 692622f

Please sign in to comment.