Skip to content

Commit

Permalink
adding ability to import raw data from climsoft
Browse files Browse the repository at this point in the history
  • Loading branch information
lilyclements committed Jun 25, 2024
1 parent ea6bf2d commit 2e2478f
Show file tree
Hide file tree
Showing 24 changed files with 371 additions and 26 deletions.
5 changes: 4 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ RoxygenNote: 7.2.3
Depends:
R (>= 2.10)
Imports:
DBI,
dplyr,
epicsadata,
forcats,
googleCloudStorageR,
jsonlite,
lubridate,
magrittr,
purrr,
rlang,
rpicsa,
testthat
testthat,
tidyr
Remotes:
IDEMSInternational/epicsadata,
IDEMSInternational/rpicsa
6 changes: 6 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@ export(export_r_instat_to_bucket)
export(extremes_summaries)
export(gcs_auth_file)
export(get_binary_file)
export(get_climsoft_conn)
export(get_daily_data)
export(get_definitions_id_from_metadata)
export(import_from_climsoft)
export(join_null_data)
export(monthly_temperature_summaries)
export(reformat_annual_summaries)
export(reformat_crop_success)
export(reformat_season_start)
export(reformat_temperature_summaries)
export(season_start_probabilities)
export(set_climsoft_conn)
export(setup)
export(station_metadata)
export(update_metadata_definition_id)
importFrom(DBI,dbConnect)
importFrom(RMySQL,MySQL)
importFrom(epicsadata,gcs_auth_file)
importFrom(magrittr,"%>%")
importFrom(rlang,":=")
13 changes: 9 additions & 4 deletions R/annual_rainfall_summaries.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#' @param country `character(1)` The country code of the data.
#' @param station_id `character` The id's of the stations to analyse. Either a
#' single value or a vector.
#' @param call A character vector specifying where to call the raw data from if calling raw data.
#' @param summaries `character` The names of the summaries to produce.
#' @param override A logical argument default `FALSE` indicating whether to calculate the summaries still, even if they are stored already in the bucket.
#'
Expand All @@ -16,8 +17,10 @@
#' @examples
#' #annual_rainfall_summaries(country = "zm", station_id = "01122", summaries = "annual_rain")
#' #annual_rainfall_summaries(country = "zm", station_id = "16", summaries = c("start_rains", "end_rains", "annual_rain", "seasonal_rain")) #, "end_season"))
annual_rainfall_summaries <- function(country, station_id, summaries = c("annual_rain", "start_rains", "end_rains", "end_season", "seasonal_rain", "seasonal_length"), override = FALSE) {
annual_rainfall_summaries <- function(country, station_id, call = c("climsoft", "googlebuckets"),
summaries = c("annual_rain", "start_rains", "end_rains", "end_season", "seasonal_rain", "seasonal_length"), override = FALSE) {
list_return <- NULL
call <- match.arg(call)

# we get the definitions_id from station_id metadata.
definitions_id <- get_definitions_id_from_metadata(country, station_id)
Expand All @@ -26,7 +29,7 @@ annual_rainfall_summaries <- function(country, station_id, summaries = c("annual
get_summaries <- epicsadata::get_summaries_data(country, station_id, summary = "annual_rainfall_summaries")
summary_data <- get_summaries[[1]]
timestamp <- get_summaries[[2]]

# what if the definitions is different? Have an override option.
# if the summary data exists, and if you do not want to override it then:
if (nrow(summary_data) > 0 & override == FALSE) {
Expand Down Expand Up @@ -133,12 +136,14 @@ annual_rainfall_summaries <- function(country, station_id, summaries = c("annual
}
}
}

# Fetch daily data and preprocess
daily <- epicsadata::get_daily_data(country = country, station_id = station_id)
daily <- get_daily_data(country = country, station_id = station_id, call_from = call)

# For the variable names to be set as a certain default, set TRUE here, and run check_and_rename_variables
data_names <- epicsadata::data_definitions(names(daily), TRUE)
daily <- check_and_rename_variables(daily, data_names)
if (class(daily$date) != "Date") daily$date <- as.Date(daily$date)
if (!"year" %in% names(daily)) daily$year <- lubridate::year(daily$date)

# Check if start_rains and end_rains are required for seasonal_rain and seasonal_length
if (any(grepl("seasonal_", summaries))){
Expand Down
8 changes: 4 additions & 4 deletions R/check_and_rename_variables.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ check_and_rename_variables <- function(data, data_names) {
month = c("month_abbr", "month"),
doy = c("DOY", "doy_366", "doy"),
day = c("day"),
rain = c("rain", "rainfall", "precipitation"),
tmax = c("tmax", "max_temperature", "maximum", "max", "temperature_max"),
tmin = c("tmin","min_temperature", "minimum", "min", "temperature_min"))
rain = c("rain", "rainfall", "precipitation", "PRECIP"),
tmax = c("tmax", "max_temperature", "maximum", "max", "temperature_max", "TMPMAX"),
tmin = c("tmin","min_temperature", "minimum", "min", "temperature_min", "TMPMIN"))

# Loop through the missing variable names

Expand Down Expand Up @@ -66,4 +66,4 @@ check_and_rename_variables <- function(data, data_names) {

}
return(data)
}
}
37 changes: 37 additions & 0 deletions R/climsoft_connection_functions.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#' Set Climsoft Connection
#'
#' Establishes a connection to a Climsoft database and stores it in a package environment for later use.
#'
#' @param dbname Name of the database.
#' @param user Username for database access.
#' @param password Password for database access.
#' @param host Host where the database server is located.
#' @param port Port number on which the database server is running.
#'
#' @return Invisible. The function does not return anything but stores the connection in a designated package environment.
#'
#' @examples
#' #set_climsoft_conn("climsoft_db", "user", "password", "localhost", "3306")
#'
#' @importFrom DBI dbConnect
#' @importFrom RMySQL MySQL
#' @export
set_climsoft_conn <- function(dbname, user, password, host, port){
conn <- DBI::dbConnect(drv = RMySQL::MySQL(), dbname = dbname,
user = user, password = password, host = host, port = port)
pkg_env$conn <- conn
}

#' Get Climsoft Connection
#'
#' Retrieves the stored Climsoft database connection from the package environment.
#'
#' @return The database connection object.
#'
#' @examples
#' #con <- get_climsoft_conn()
#'
#' @export
get_climsoft_conn <- function(){
get("conn", envir = pkg_env)
}
9 changes: 7 additions & 2 deletions R/crop_success_probabilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#' @param country `character(1)` The country code of the data.
#' @param station_id `character` The id's of the stations to analyse. Either a
#' single value or a vector.
#' @param call A character vector specifying where to call the raw data from if calling raw data.
#' @param water_requirements \code{numeric} Vector containing water requirements requirements.
#' @param planting_dates \code{numeric} Vector containing planting dates requirements.
#' @param planting_length \code{numeric} Vector containing seasonal crop length requirements.
Expand All @@ -28,6 +29,7 @@
#' # planting_length = c(100, 150), planting_dates = c(90, 100, 110))
crop_success_probabilities <- function(country,
station_id,
call = c("climsoft", "googlebuckets"),
planting_dates = NULL,
water_requirements = NULL,
planting_length = NULL,
Expand Down Expand Up @@ -99,17 +101,20 @@ crop_success_probabilities <- function(country,
# if we are overriding, then we are overriding for our start_rains definition too, meaning we need to recalculate that
if (override){
# Fetch daily data and preprocess
daily <- epicsadata::get_daily_data(country = country, station_id = station_id)
daily <- get_daily_data(country = country, station_id = station_id, call_from = call)

# For the variable names to be set as a certain default, set TRUE here, and run check_and_rename_variables
data_names <- epicsadata::data_definitions(names(daily), TRUE)
daily <- check_and_rename_variables(daily, data_names)
if (class(daily$date) != "Date") daily$date <- as.Date(daily$date)
if (!"year" %in% names(daily)) daily$year <- lubridate::year(daily$date)

} else {
data_names <- NULL
data_names$station <- "station"
}

season_data <- annual_rainfall_summaries(country = country, station_id = station_id, summaries = c("start_rains", "seasonal_length", "seasonal_rain"), override = override) # end rains or end season?
season_data <- annual_rainfall_summaries(country = country, station_id = station_id, call = call, summaries = c("start_rains", "seasonal_length", "seasonal_rain"), override = override) # end rains or end season?
#offset <- season_data[[1]]$start_rains$s_start_doy

definitions$crops_success$planting_length <- check_and_set_parameter("planting_length", "planting_length")
Expand Down
42 changes: 42 additions & 0 deletions R/get_daily_data.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#' Get Daily Data
#'
#' @param country A character vector specifying the country or countries from which to get the data. Common options are `"mz"`, `"zm"`, and `"zm_test"`. Any defined in `get_bucket_name()`.
#' @param station_id A character string specifying the ID of the station for which to get the daily data.
#'
#' @return A data frame containing the daily data for the specified station and country.
#' @export
#'
#' @examples #
get_daily_data <- function(country, station_id, call_from = c("climsoft", "googlebuckets")) {
call_from <- match.arg(call_from)
if (length(country) > 1) stop("'country' must be of length 1")
station_id <- as.character(station_id)

if (call_from == "climsoft"){
# if you call from climsoft
climsoft_info <- station_metadata(country = country, station_id = station_id)$climsoft_list
if (is.null(get_climsoft_conn())) stop("Set climsoft connection with set_climsoft_conn() function.")
station_data <- import_from_climsoft(con = get_climsoft_conn(),
stations = station_id,
include_station_info = FALSE,
elementfiltercolumn = climsoft_info[[1]]$elementfiltercolumn,
elements = climsoft_info[[1]]$elements)
} else {
# if you call from googlebuckets
dfs <- vector("list", length(station_id))
names(dfs) <- station_id
for (i in seq_along(station_id)) {
f <- paste0(country, "/", "data", "/", station_id[i], ".rds")
if (file.exists(f)) {
dfs[[i]] <- readRDS(f)
} else {
f <- epicsadata::update_daily_data(country, station_id[i])
dfs[[i]] <- f#saveRDS(o, file = f)
}
}
if (length(station_id) > 1) {
station_data <- dplyr::bind_rows(dfs)
} else station_data <- dfs[[1]]
}
return(station_data)
}
1 change: 1 addition & 0 deletions R/global.R
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pkg_env <- new.env(parent = emptyenv())
112 changes: 112 additions & 0 deletions R/import_from_climsoft.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#' Import Data from Climsoft
#'
#' Connects to a Climsoft database and imports data based on the specified filters for stations and elements, with options to include observation flags and station information.
#'
#' @param con Connection object to the Climsoft database, default is the result of \code{get_climsoft_conn()}.
#' @param stationfiltercolumn Name of the column to filter by stations, default is 'stationId'.
#' @param stations Vector of station IDs to filter the data, defaults to an empty vector.
#' @param elementfiltercolumn Name of the column to filter by elements, default is 'elementId'.
#' @param elements Vector of element IDs to filter the data, defaults to an empty vector.
#' @param include_observation_flags Boolean, if TRUE includes observation flags in the output, defaults to FALSE.
#' @param include_station_info Boolean, if TRUE includes station metadata in the output, defaults to FALSE.
#' @param start_date Start date for filtering the observations, format should be Date, defaults to NULL.
#' @param end_date End date for filtering the observations, format should be Date, defaults to NULL.
#'
#' @return A list containing Climsoft station and observation data based on the filters applied. If `include_station_info` is TRUE, the list will have two elements: 'Metadata' with station details and 'Daily data' with observations.
#'
#' @examples
#' con <- get_climsoft_conn()
#' data <- import_from_climsoft(con, stations = c("101", "102"), elements = c("1", "2"), start_date = as.Date("2020-01-01"), end_date = as.Date("2020-01-31"))
#'
#' @export
import_from_climsoft <- function(con = get_climsoft_conn(),
stationfiltercolumn = "stationId",
stations = c(),
elementfiltercolumn = "elementId",
elements = c(),
include_observation_flags = FALSE,
include_station_info = FALSE,
start_date = NULL,
end_date = NULL) {
con <- con # get connection

#get stations database data and station ids values
if (length(stations) > 0) {
#construct a string of station values from the passed station vector eg of result ('191','122')
passed_station_values <- paste0("(", paste0("'", stations, "'", collapse = ", "), ")")

#get the station info of the passed station values
db_station_info <- DBI::dbGetQuery(con, paste0( "SELECT * FROM station WHERE ", stationfiltercolumn, " IN ", passed_station_values, ";"))

#set values of station ids only
if (stationfiltercolumn == "stationId") {
station_ids_values <- passed_station_values
} else{
station_ids_values <- paste0("(", paste0("'", db_station_info$stationId, "'", collapse = ", "),")")
}
}

#if there are no elements passed then stop and throw error
if (length(elements) < 1) stop("start_date must be of type Date.")

#set values of element ids only
if (elementfiltercolumn == "elementId") {
#get element id values directly from passed data
element_ids_values <- paste0("(", paste0(elements, collapse = ", "), ")")
} else{
#get element id values from the database
passed_element_values <- paste0("(", paste0("'", elements, "'", collapse = ", "), ")")
db_elements_ids <- DBI::dbGetQuery( con, paste0("SELECT elementId FROM obselement WHERE ", elementfiltercolumn, " IN ", passed_element_values, ";" ))
element_ids_values <- paste0("(", paste0(sprintf("%d", db_elements_ids$elementId), collapse = ", "), ")")
}

if(include_elements_info) {
db_elements_info <- DBI::dbGetQuery(con, paste0("SELECT elementId, elementName, abbreviation, description, elementtype, upperLimit, lowerLimit, units FROM obselement WHERE elementId ", " IN ", element_ids_values, ";" ))
}

flags_column_col_sql <- " "
if (include_observation_flags) {
flags_column_col_sql <- ", observationfinal.flag AS flag"
}

#get databounds filter query if dates have been passed
date_bounds_filter <- ""
if (!is.null(start_date)) {
if (!lubridate::is.Date(start_date))
stop("start_date must be of type Date.")
start_date <- format(start_date, format = "%Y-%m-%d")
date_bounds_filter = paste0(date_bounds_filter, " AND obsDatetime >= ", sQuote(start_date))
}
if (!is.null(end_date)) {
if (!lubridate::is.Date(end_date))
stop("end_date must be of type Date.")
end_date <- format(end_date, format = "%Y-%m-%d")
date_bounds_filter <- paste0(date_bounds_filter," AND obsDatetime <=", sQuote(end_date))
}

#construct observation data sql query and get data from database
if (length(stations) > 0) {
#if stations passed get observation data of selected elements of passed stations
db_observation_data <- DBI::dbGetQuery(con, paste0("SELECT observationfinal.recordedFrom As station, obselement.abbreviation AS element, observationfinal.obsDatetime AS datetime, observationfinal.obsValue AS obsvalue", flags_column_col_sql, " FROM observationfinal INNER JOIN obselement ON observationfinal.describedBy = obselement.elementId WHERE observationfinal.recordedFrom IN ", station_ids_values, " AND observationfinal.describedBy IN ", element_ids_values, date_bounds_filter, " ORDER BY observationfinal.recordedFrom, observationfinal.describedBy;"))
} else{
#if stations have not been passed get observation data of passed elements of all stations
db_observation_data <- DBI::dbGetQuery(con, paste0("SELECT observationfinal.recordedFrom As station, obselement.abbreviation AS element, observationfinal.obsDatetime AS datetime, observationfinal.obsValue AS obsvalue", flags_column_col_sql, " FROM observationfinal INNER JOIN obselement ON observationfinal.describedBy = obselement.elementId WHERE observationfinal.describedBy IN ", element_ids_values, date_bounds_filter, " ORDER BY observationfinal.recordedFrom, observationfinal.describedBy;"))

#then get the stations ids (uniquely) from the observation data and use the ids to get station info
station_ids_values <- paste0("(", paste0("'", as.character(unique(db_observation_data$station) ), "'", collapse = ", "), ")")
db_station_info <- DBI::dbGetQuery(con, paste0("SELECT * FROM station WHERE stationId IN ", station_ids_values, ";" ))
}

if(unstack_data){
db_observation_data <- tidyr::pivot_wider(db_observation_data,
names_from = element,
values_from = obsvalue)
}
if (include_station_info) {
data_list <- list(db_station_info, db_observation_data)
names(data_list) <- c("Metadata", "Daily data")
} else {
data_list <- db_observation_data
}
return(data_list)
}
9 changes: 6 additions & 3 deletions R/season_start_probabilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#' @param country `character(1)` The country code of the data.
#' @param station_id `character` The id's of the stations to analyse. Either a
#' single value or a vector.
#' @param call A character vector specifying where to call the raw data from if calling raw data.
#' @param start_dates `numeric` A vector of start dates (in doy format) to calculate the probabilities of the season starting on or before.
#' @param override A logical argument default `FALSE` indicating whether to calculate the summaries still, even if they are stored already in the bucket.
#'
Expand All @@ -20,6 +21,7 @@
#' #season_start_probabilities(country = "zm", station_id = "16", start_dates = c(10, 20, 100))
season_start_probabilities <- function(country,
station_id,
call = c("climsoft", "googlebuckets"),
start_dates = NULL,
override = FALSE) {
list_return <- NULL
Expand Down Expand Up @@ -62,16 +64,17 @@ season_start_probabilities <- function(country,
# if we are overriding, then we are overriding for our start_rains definition too, meaning we need to recalculate that
if (override){
# Fetch daily data and preprocess
daily <- epicsadata::get_daily_data(country = country, station_id = station_id)

daily <- get_daily_data(country = country, station_id = station_id, call_from = call)
# For the variable names to be set as a certain default, set TRUE here, and run check_and_rename_variables
data_names <- epicsadata::data_definitions(names(daily), TRUE)
daily <- check_and_rename_variables(daily, data_names)
if (class(daily$date) != "Date") daily$date <- as.Date(daily$date)
if (!"year" %in% names(daily)) daily$year <- lubridate::year(daily$date)
} else {
data_names <- NULL
data_names$station <- "station"
}
season_data <- annual_rainfall_summaries(country = country, station_id = station_id, summaries = c("start_rains"), override = override)
season_data <- annual_rainfall_summaries(country = country, station_id = station_id, call = call, summaries = c("start_rains"), override = override)
if (is.null(start_dates)){
start_dates <- definitions$season_start_probabilities$specified_day
if (length(start_dates) == 0) stop("start_dates parameter missing in definitions file.")
Expand Down
Loading

0 comments on commit 2e2478f

Please sign in to comment.