diff --git a/R/convert_parquet.R b/R/convert_parquet.R new file mode 100644 index 0000000..a6ea9e8 --- /dev/null +++ b/R/convert_parquet.R @@ -0,0 +1,203 @@ +library(arrow, warn.conflicts = FALSE) +library(dplyr, warn.conflicts = FALSE) +library(stringr) +library(purrr) +library(furrr) + +source("./schemas/schemas.R") + +missing_values <- c("", + "NA", + "Na", + "na", + "NULL", + "Null", + "null", + "NAN", + "NaN", + "Nan", + "nan") + +#' Extracts a particular substring of a string containing the underscore symbol +#' +#' @description `get.batch_id` splits a string containing an single underscore +#' into two parts, before and after it +#' +#' @details It is assumed there is one and only one underscore symbol in the +#' input string, there are no checks for all other cases. This function is +#' designed to help with unconventional file names +#' +#' @param x A string +#' +#' @return The second element of the substring array +#' +#' @examples +#' get.batch_id("1_2") +get.batch_id <- function(x) { + batch_id <- x |> str_split_1(pattern = "_") + gsub("00", "", batch_id[[2]]) |> as.integer() +} + +#' Converts simulation results from `*.csv` files to `*.parquet`. +#' +#' @description `convert.single_simulation` results of a single simulation run: +#' `Household.csv`, `Person.csv`, `BenefitUnit.csv` to binary files for further +#' use with Apache Arrow. All data is grouped by the corresponding +#' `scenario_name`, `year` and `run_id`. +#' +#' @details This function is designed for efficient data processing and therefore +#' doesn't deal with `*.csv` files that contain trailing commas & spaces. All +#' preprocessing MUST be done in advance. +#' https://unix.stackexchange.com/questions/220576/how-to-remove-last-comma-of-each-line-on-csv-using-linux +#' +#' @param path_string The name of a directory containing simulation results +#' @param scenario_id The string description of a dataset. Typically that would +#' be "baseline" or "reform", but there is no hard restrictions +#' @param files A list of strings that represent file names +#' @param delete_original A flag to ensure that the original files are (not) +#' deleted, `FALSE` by default +#' +#' @seealso `convert.multiple_simulations` +#' +#' @examples +#' convert.single_simulation("2023121212_100", "baseline") +convert.single_simulation <- function(path_string, + scenario_id, + files = c("Household", + "Person", + "BenefitUnit"), + delete_original = FALSE) { + batch_id <- get.batch_id(path_string) + + f.base <- scenario_id |> + paste("output", sep = "/") |> + paste(path_string, sep = "/") |> + paste("csv", sep = "/") + + f.person <- f.base |> + paste("Person", sep = "/") |> + paste("csv", sep = ".") + + f.household <- f.base |> + paste("Household", sep = "/") |> + paste("csv", sep = ".") + + f.benefitunit <- f.base |> + paste("BenefitUnit", sep = "/") |> + paste("csv", sep = ".") + + f.person |> + open_csv_dataset( + schema = schema.person, + skip = 1, + na = missing_values, + convert_options = CsvConvertOptions$create( + null_values = missing_values, + strings_can_be_null = TRUE, + true_values = c("true", "TRUE", "True", "Male"), + false_values = c("false", "FALSE", "False", "Female") + ) + ) |> + mutate( + time = cast(time, uint16()), + countFemale = cast(countFemale, uint8()), + countMale = cast(countMale, uint8()) + ) |> + rename(id_BenefitUnit = idBenefitUnit) |> + collect() |> + mutate( + labourSupplyWeekly = case_when( + labourSupplyWeekly == "ZERO" ~ "0", + labourSupplyWeekly == "TEN" ~ "10", + labourSupplyWeekly == "TWENTY" ~ "20", + labourSupplyWeekly == "THIRTY" ~ "30", + labourSupplyWeekly == "FORTY" ~ "40" + ), + labourSupplyWeekly = as.integer(labourSupplyWeekly) + ) |> + as_arrow_table(schema = schema.person.save) |> + mutate( + scenario = scenario_id, + run = run + (batch_id - 1) * 20, + run = cast(run, uint16()) + ) |> + group_by(scenario, time, run) |> + write_dataset(path = "Person", + format = "parquet", + compression = 'lz4') + + + f.benefitunit |> + open_csv_dataset( + schema = schema.benefit_unit, + skip = 1, + na = missing_values, + convert_options = CsvConvertOptions$create( + null_values = missing_values, + strings_can_be_null = TRUE, + true_values = c("true", "TRUE", "True", "Male"), + false_values = c("false", "FALSE", "False", "Female") + ) + ) |> + mutate( + time = cast(time, uint16()), + atRiskOfPoverty = cast(atRiskOfPoverty, bool()), + scenario = scenario_id, + run = run + (batch_id - 1) * 20, + run = cast(run, uint16()) + ) |> + group_by(scenario, time, run) |> + write_dataset(path = "BenefitUnit", + format = "parquet", + compression = 'lz4') + + + f.household |> + open_csv_dataset( + schema = schema.household, + skip = 1, + na = missing_values, + convert_options = CsvConvertOptions$create( + null_values = missing_values, + strings_can_be_null = TRUE, + true_values = c("true", "TRUE", "True", "Male"), + false_values = c("false", "FALSE", "False", "Female") + ) + ) |> + mutate( + time = cast(time, uint16()), + scenario = scenario_id, + run = run + (batch_id - 1) * 20, + run = cast(run, uint16()) + ) |> + group_by(scenario, time, run) |> + write_dataset(path = "Household", + format = "parquet", + compression = 'lz4') + + + if (delete_original) { + file.remove(f.person) + file.remove(f.household) + file.remove(f.benefitunit) + } +} + +#' Converts all `*.csv` files stored in a directory to `*.parquet` +#' +#' @param scenario_names A list of scenario names +#' @param paths_pattern A regular expression that allows to select specific +#' subdirectories only +#' +#' @seealso `convert.single_simulation` +convert.multiple_simulations <- function(scenario_names = c("baseline", "reform"), + paths_pattern = "^2023\\d+\\_([1-9]|[1234][0-9]|50)00$") { + for (sid in scenario_names) { + paths <- sid |> paste("output", sep = "/") |> dir() + paths[str_detect(paths, paths_pattern)] |> + map(\(x) convert.single_simulation(x, sid), .progress = TRUE) + } +} + +convert.multiple_simulations() + diff --git a/R/schemas/schemas.R b/R/schemas/schemas.R new file mode 100644 index 0000000..87bff99 --- /dev/null +++ b/R/schemas/schemas.R @@ -0,0 +1,130 @@ +library(arrow, warn.conflicts = FALSE) + +schema.person <- schema( + field("run", uint16(), nullable = FALSE), + field("time", float32(), nullable = FALSE), + field("id_Person", uint64(), nullable = FALSE), + field("L1_potentialHourlyEarnings", float32(), nullable = FALSE), + field("adultchildflag", bool(), nullable = TRUE), + field("countFemale", float32(), nullable = FALSE), + field("countMale", float32(), nullable = FALSE), + field("countNaN", uint8(), nullable = FALSE), + field("countOK", uint8(), nullable = FALSE), + field("countOKover32", uint8(), nullable = FALSE), + field("covidModuleGrossLabourIncomeBaseline_Xt5", utf8(), nullable = FALSE), + field("dag", uint8(), nullable = FALSE), + field("dcpagdf", int8(), nullable = TRUE), + field("dcpen", bool(), nullable = TRUE), + field("dcpex", bool(), nullable = TRUE), + field("dcpst", utf8(), nullable = FALSE), + field("dcpyy", uint8(), nullable = TRUE), + field("ded", bool(), nullable = FALSE), + field("deh_c3", utf8(), nullable = FALSE), + field("dehf_c3", utf8(), nullable = FALSE), + field("dehm_c3", utf8(), nullable = FALSE), + field("dehsp_c3", utf8(), nullable = TRUE), + field("der", bool(), nullable = FALSE), + field("dgn", bool(), nullable = FALSE), + field("dhe", float32(), nullable = FALSE), + field("dhesp", float32(), nullable = TRUE), + field("dhh_owned", bool(), nullable = FALSE), + field("dhm", float32(), nullable = FALSE), + field("dhm_ghq", bool(), nullable = FALSE), + field("dlltsd", bool(), nullable = FALSE), + field("education_inrange", bool(), nullable = TRUE), + field("flagAlignEntry", bool(), nullable = TRUE), + field("flagAlignExit", bool(), nullable = TRUE), + field("flagDies", bool(), nullable = TRUE), + field("flagEmigrate", bool(), nullable = TRUE), + field("flagImmigrate", bool(), nullable = TRUE), + field("hoursWorkedWeekly", float32(), nullable = TRUE), + field("household_status", utf8(), nullable = FALSE), + field("idBenefitUnit", uint64(), nullable = FALSE), + field("idFather", uint64(), nullable = TRUE), + field("idMother", uint64(), nullable = TRUE), + field("idPartner", uint64(), nullable = TRUE), + field("id_original", uint64(), nullable = FALSE), + field("inverseMillsRatioMaxFemale", float32(), nullable = FALSE), + field("inverseMillsRatioMaxMale", float32(), nullable = FALSE), + field("inverseMillsRatioMinFemale", float32(), nullable = FALSE), + field("inverseMillsRatioMinMale", float32(), nullable = FALSE), + field("labourSupplyWeekly", utf8(), nullable = FALSE), + field("les_c4", utf8(), nullable = FALSE), + field("les_c7_covid", utf8(), nullable = TRUE), + field("lesdf_c4", utf8(), nullable = TRUE), + field("lessp_c4", utf8(), nullable = TRUE), + field("partnership_samesex", bool(), nullable = TRUE), + field("potentialHourlyEarnings", float32(), nullable = FALSE), + field("sIndex", float32(), nullable = TRUE), + field("sIndexNormalised", float32(), nullable = TRUE), + field("scoreFemale", float32(), nullable = FALSE), + field("scoreMale", float32(), nullable = FALSE), + field("sedex", bool(), nullable = FALSE), + field("weight", float32(), nullable = FALSE), + field("women_fertility", bool(), nullable = TRUE), + field("yearlyEquivalisedConsumption", float32(), nullable = FALSE), + field("ynbcpdf_dv", float32(), nullable = TRUE), + field("yplgrs_dv", float32(), nullable = FALSE), + field("ypnbihs_dv", float32(), nullable = TRUE), + field("ypncp", float32(), nullable = FALSE), + field("ypnoab", float32(), nullable = FALSE), + field("yptciihs_dv", float32(), nullable = FALSE), +) + +schema.person.save <- schema.person +schema.person.save[[match("countFemale", names(schema.person))]] <- Field$create("countFemale", uint8()) +schema.person.save[[match("countMale", names(schema.person))]] <- Field$create("countMale", uint8()) +schema.person.save[[match("countNaN", names(schema.person))]] <- Field$create("countNaN", uint8()) +schema.person.save[[match("countOK", names(schema.person))]] <- Field$create("countOK", uint8()) +schema.person.save[[match("countOKover32", names(schema.person))]] <- Field$create("countOKover32", uint8()) +schema.person.save[[match("labourSupplyWeekly", names(schema.person))]] <- Field$create("labourSupplyWeekly", uint8()) +schema.person.save[[match("time", names(schema.person))]] <- Field$create("time", uint16()) +schema.person.save[[match("idBenefitUnit", names(schema.person))]] <- Field$create("id_BenefitUnit", uint64()) + +schema.benefit_unit <- schema( + field("run", uint16(), nullable = FALSE), + field("time", float32(), nullable = FALSE), + field("id_BenefitUnit", uint64(), nullable = FALSE), + field("atRiskOfPoverty", uint8(), nullable = FALSE), + field("consumptionAnnual", float32(), nullable = TRUE), + field("createdByConstructor", utf8(), nullable = FALSE), + field("dhh_owned", bool(), nullable = FALSE), + field("dhhtp_c4", utf8(), nullable = FALSE), + field("disposableIncomeMonthly", float32(), nullable = FALSE), + field("equivalisedDisposableIncomeYearly", float32(), nullable = FALSE), + field("idFemale", uint64(), nullable = TRUE), + field("idHousehold", uint64(), nullable = FALSE), + field("idMale", uint64(), nullable = TRUE), + field("liquidWealth", float32(), nullable = TRUE), + field("n_children_0", uint8(), nullable = FALSE), + field("n_children_1", uint8(), nullable = FALSE), + field("n_children_2", uint8(), nullable = FALSE), + field("n_children_3", uint8(), nullable = FALSE), + field("n_children_4", uint8(), nullable = FALSE), + field("n_children_5", uint8(), nullable = FALSE), + field("n_children_6", uint8(), nullable = FALSE), + field("n_children_7", uint8(), nullable = FALSE), + field("n_children_8", uint8(), nullable = FALSE), + field("n_children_9", uint8(), nullable = FALSE), + field("n_children_10", uint8(), nullable = FALSE), + field("n_children_11", uint8(), nullable = FALSE), + field("n_children_12", uint8(), nullable = FALSE), + field("n_children_13", uint8(), nullable = FALSE), + field("n_children_14", uint8(), nullable = FALSE), + field("n_children_15", uint8(), nullable = FALSE), + field("n_children_16", uint8(), nullable = FALSE), + field("n_children_17", uint8(), nullable = FALSE), + field("occupancy", utf8(), nullable = FALSE), + field("region", utf8(), nullable = FALSE), + field("size", uint8(), nullable = FALSE), + field("weight", float32(), nullable = FALSE), + field("ydses_c5", utf8(), nullable = FALSE) +) + +schema.household <- schema( + field("run", uint16(), nullable = FALSE), + field("time", float32(), nullable = FALSE), + field("id_Household", uint64(), nullable = FALSE), + field("size", uint8(), nullable = FALSE), + field("weight", float32(), nullable = FALSE) +)