diff --git a/.env.example b/.env.example index 85d7251..c942073 100644 --- a/.env.example +++ b/.env.example @@ -16,5 +16,3 @@ AIRTABLE_API_TOKEN="" MAIL_SERVICE="" SENDGRID_API_KEY="" # if you select the sendgrid backend - -NATS_URL="" # default "http://localhost:4222" diff --git a/Cargo.lock b/Cargo.lock index 6f3f650..776b8ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,40 +145,6 @@ dependencies = [ "zstd-safe", ] -[[package]] -name = "async-nats" -version = "0.36.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f71e5a1bab60f46b0b005f4808b8ee83ef6d577608923de938403393c9a30cf8" -dependencies = [ - "base64 0.22.1", - "bytes", - "futures", - "memchr", - "nkeys", - "nuid", - "once_cell", - "portable-atomic", - "rand", - "regex", - "ring", - "rustls-native-certs", - "rustls-pemfile", - "rustls-webpki", - "serde", - "serde_json", - "serde_nanos", - "serde_repr", - "thiserror", - "time", - "tokio", - "tokio-rustls", - "tokio-util", - "tracing", - "tryhard", - "url", -] - [[package]] name = "async-trait" version = "0.1.81" @@ -411,9 +377,6 @@ name = "bytes" version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" -dependencies = [ - "serde", -] [[package]] name = "cc" @@ -580,16 +543,6 @@ dependencies = [ "url", ] -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -694,32 +647,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "curve25519-dalek" -version = "4.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" -dependencies = [ - "cfg-if", - "cpufeatures", - "curve25519-dalek-derive", - "digest", - "fiat-crypto", - "rustc_version", - "subtle", -] - -[[package]] -name = "curve25519-dalek-derive" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "darling" version = "0.20.10" @@ -865,28 +792,6 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" -[[package]] -name = "ed25519" -version = "2.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" -dependencies = [ - "signature", -] - -[[package]] -name = "ed25519-dalek" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" -dependencies = [ - "curve25519-dalek", - "ed25519", - "sha2", - "signature", - "subtle", -] - [[package]] name = "either" version = "1.13.0" @@ -949,12 +854,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" -[[package]] -name = "fiat-crypto" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" - [[package]] name = "flate2" version = "1.0.30" @@ -1761,21 +1660,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "nkeys" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2de02c883c178998da8d0c9816a88ef7ef5c58314dd1585c97a4a5679f3ab337" -dependencies = [ - "data-encoding", - "ed25519", - "ed25519-dalek", - "getrandom", - "log", - "rand", - "signatory", -] - [[package]] name = "nom" version = "7.1.3" @@ -1796,15 +1680,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "nuid" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" -dependencies = [ - "rand", -] - [[package]] name = "num-bigint" version = "0.4.6" @@ -1893,12 +1768,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" -[[package]] -name = "openssl-probe" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" - [[package]] name = "overload" version = "0.1.1" @@ -2595,19 +2464,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" -dependencies = [ - "openssl-probe", - "rustls-pemfile", - "rustls-pki-types", - "schannel", - "security-framework", -] - [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -2656,21 +2512,11 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "schannel" -version = "0.1.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" -dependencies = [ - "windows-sys 0.52.0", -] - [[package]] name = "scipio" version = "0.1.0" dependencies = [ "anyhow", - "async-nats", "async-trait", "axum", "axum-extra", @@ -2727,29 +2573,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "security-framework" -version = "2.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" -dependencies = [ - "bitflags 2.6.0", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "semver" version = "1.0.23" @@ -2787,15 +2610,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_nanos" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" -dependencies = [ - "serde", -] - [[package]] name = "serde_path_to_error" version = "0.1.16" @@ -2806,17 +2620,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_repr" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "serde_spanned" version = "0.6.7" @@ -2914,18 +2717,6 @@ dependencies = [ "libc", ] -[[package]] -name = "signatory" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" -dependencies = [ - "pkcs8", - "rand_core", - "signature", - "zeroize", -] - [[package]] name = "signature" version = "2.2.0" @@ -3667,17 +3458,6 @@ dependencies = [ "toml", ] -[[package]] -name = "tryhard" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9f0a709784e86923586cff0d872dba54cd2d2e116b3bc57587d15737cfce9d" -dependencies = [ - "futures", - "pin-project-lite", - "tokio", -] - [[package]] name = "tungstenite" version = "0.21.0" diff --git a/Cargo.toml b/Cargo.toml index 50fc7a2..e92a0f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] anyhow = "1.0.86" -async-nats = "0.36.0" async-trait = "0.1.81" axum = { version = "0.7.5", features = [ "http2", diff --git a/migrations/20240722170323_initial.up.sql b/migrations/20240722170323_initial.up.sql index d47b375..3941e3c 100644 --- a/migrations/20240722170323_initial.up.sql +++ b/migrations/20240722170323_initial.up.sql @@ -446,10 +446,6 @@ select nc.org_name, nc.project_name, nc.impact_causes, - -- nc.agreement_and_invoice_sent, - -- nc.services_agreement_signature, - -- nc.availability_confirmed, - -- nc.invoice_paid, nc.org_website, nc.country_hq, nc.us_state_hq, @@ -503,42 +499,21 @@ group by m.id, pc.name; -create or replace function get_volunteer_id(email text, project_cycle_id uuid) - returns uuid - as $$ - select - id - from - volunteers - where - email = $1 - and project_cycle_id = $2; -$$ -language sql; - -create or replace function get_client_id(org_name text, project_cycle_id uuid) - returns uuid - as $$ - select - id - from - nonprofit_clients - where - org_name = $1 - and project_cycle_id = $2; -$$ -language sql; - -create or replace function get_mentor_id(email text, project_cycle_id uuid) - returns uuid - as $$ - select - id - from - mentors - where - email = $1 - and project_cycle_id = $2; -$$ -language sql; +create view exported_volunteer_details as +select + ev.id, + ev.created_at, + ev.updated_at, + ev.volunteer_id, + ev.workspace_email, + ev.org_unit, + j.id as job_id, + j.project_cycle_id, + j.status +from + volunteers_exported_to_workspace ev + left join jobs j on ev.job_id = j.id +group by + ev.id, + j.id; diff --git a/src/app/api/v1/data_exports/controllers.rs b/src/app/api/v1/data_exports/controllers.rs index b0f63af..f50f969 100644 --- a/src/app/api/v1/data_exports/controllers.rs +++ b/src/app/api/v1/data_exports/controllers.rs @@ -1,7 +1,5 @@ //! Controllers for the data exports API. -use std::sync::Arc; - use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::response::Response; @@ -10,14 +8,13 @@ use chrono::Utc; use tokio::task; use uuid::Uuid; -use crate::app::api::v1::data_exports::export_users_to_workspace::{ - export_task, ExportUsersToWorkspaceTaskParams, -}; +use super::workspace::policies::{EmailPolicy, PasswordPolicy}; +use super::workspace::{export_task, ExportParams}; +use super::ExportServices; use crate::app::api::v1::data_exports::requests::ExportUsersToWorkspaceRequest; use crate::app::api::v1::data_exports::responses::ExportUsersToWorkspaceResponse; use crate::app::api_response; use crate::app::errors::AppError; -use crate::app::state::Services; use crate::services::auth::AuthData; use crate::services::storage::jobs::CreateJobBuilder; use crate::services::storage::types::{ExportDesination, JobData, JobDetails, JobType}; @@ -31,9 +28,7 @@ use crate::services::storage::ExecOptsBuilder; /// * `request`: The request data /// /// This endpoint starts a job, records it in the database, and returns immediately. The task it -/// spawns does not block, and it can be cancelled by sending a message to the NATS server by -/// publishing to the topic `pantheon.export.cancel.{job_id}` where `job_id` is the ID of the job in -/// the database. Data about the job can be retrieved by using the Jobs API. +/// spawns does not block. #[utoipa::path( post, path = "/{project_cycle_id}/workspace", @@ -47,16 +42,11 @@ use crate::services::storage::ExecOptsBuilder; ), )] pub async fn export_users_to_workspace( - State(ctx): State>, + State(services): State, Path(project_cycle_id): Path, Extension(auth): Extension, Json(request): Json, ) -> Result { - let storage_layer = ctx.storage_layer.clone(); - let workspace = ctx.workspace.clone(); - let nats = ctx.nats.clone(); - let mail = ctx.mail.clone(); - let current_time = Utc::now(); let time_only = current_time.format("%H:%M:%S").to_string(); @@ -72,28 +62,55 @@ pub async fn export_users_to_workspace( }) .build()?; - let job_id = storage_layer + let job_id = services + .storage_layer .create_job(Some(project_cycle_id), data, &mut ExecOptsBuilder::default().build()?) .await?; - let subject = format!("pantheon.export.cancel.{}", job_id); - log::info!("{job_id}"); + let email_policy = EmailPolicy::from(&request); + let password_policy = PasswordPolicy::from(&request); - let mut subscriber = nats.subscribe(subject).await?; - subscriber.unsubscribe_after(1).await?; + let already_exported = services + .storage_layer + .fetch_exported_volunteer_details_by_project_cycle( + project_cycle_id, + &mut ExecOptsBuilder::default().build()?, + ) + .await? + .iter() + .map(|v| v.volunteer_id) + .collect::>(); - let principal = auth.email()?; + let volunteers = if request.skip_users_on_conflict { + log::info!("Skipping users that have already been exported"); + request + .volunteers + .into_iter() + .filter(|v| !already_exported.contains(&v.volunteer_id)) + .collect() + } else { + for v in &request.volunteers { + if already_exported.contains(&v.volunteer_id) { + log::error!("One or more users have already been exported"); + return Ok(api_response::error( + StatusCode::BAD_REQUEST, + "One or more users have already been exported", + )); + } + } + request.volunteers + }; - let params = ExportUsersToWorkspaceTaskParams { - subscriber, - request, - principal, + let params = ExportParams { job_id, - project_cycle_id, + email_policy, + password_policy, + principal: auth.email()?, + volunteers, }; task::spawn(async move { - let _ = export_task(storage_layer.clone(), workspace.clone(), mail, params).await; + let _ = export_task(&services, params).await; }); Ok(api_response::success(StatusCode::OK, ExportUsersToWorkspaceResponse { job_id })?) diff --git a/src/app/api/v1/data_exports/export_users_to_workspace.rs b/src/app/api/v1/data_exports/export_users_to_workspace.rs deleted file mode 100644 index f31bfb5..0000000 --- a/src/app/api/v1/data_exports/export_users_to_workspace.rs +++ /dev/null @@ -1,658 +0,0 @@ -//! This module is responsible for the low level details of exporting users to Google Workspace. -//! -//! It contains the task that is spawned when a user requests to export volunteers to Google -//! Workspace. - -use std::collections::HashMap; -use std::sync::Arc; - -use anyhow::Result; -use async_nats::Subscriber; -use chrono::Utc; -use futures::StreamExt; -use rand::distributions::Alphanumeric; -use rand::Rng; -use tokio::time::Duration; -use tokio::{task, time}; -use uuid::Uuid; - -use crate::app::api::v1::data_exports::requests::ExportUsersToWorkspaceRequest; -use crate::services::mail::{MailService, OnboardingEmailParamsBuilder}; -use crate::services::storage::entities::VolunteerDetails; -use crate::services::storage::jobs::{CreateJobBuilder, UpdateJobStatus}; -use crate::services::storage::types::{JobData, JobDetails, JobStatus, JobType}; -use crate::services::storage::volunteers::InsertVolunteerExportedToWorkspace; -use crate::services::storage::{ExecOpts, ExecOptsBuilder, StorageService}; -use crate::services::workspace::entities::{ - CreateWorkspaceUser, CreateWorkspaceUserBuilder, NameBuilder, -}; -use crate::services::workspace::WorkspaceService; - -pub struct ExportUsersToWorkspaceTaskParams { - pub principal: String, - pub request: ExportUsersToWorkspaceRequest, - pub subscriber: Subscriber, - pub job_id: Uuid, - pub project_cycle_id: Uuid, -} - -/// Builds an email address for a user in Google Workspace. -/// -/// * `use_first_and_last_names`: Whether to use the first and last names of the user as part of -/// their email handle -/// * `add_unique_numeric_suffix`: Whether to add a unique (two digit) numeric suffix to the email -/// handle -/// * `separator`: The separator to use between the first and last names (if `use_first_and_last_names` -/// is set to `true`) -/// * `first_name`: The first name of the user -/// * `last_name`: The last name of the user -/// -/// If there are spaces in the generated email, they will be removed. For example, if a volunteer -/// lists their first name as `Minh Uyen` and their last name as `Hoang`, the email will -/// concatenate `Minh` and `Uyen` into `minhuyen`. -fn build_email( - use_first_and_last_names: bool, - add_unique_numeric_suffix: bool, - separator: Option, - first_name: &str, - last_name: &str, -) -> String { - let mut base = if use_first_and_last_names { - format!("{}{}{}", first_name, separator.unwrap_or("".to_string()), last_name) - } else { - first_name.to_owned() - }; - - if add_unique_numeric_suffix { - let mut rng = rand::thread_rng(); - let suffix = rng.gen_range(10..100); - - base.push_str(&suffix.to_string()); - } - - let mut cleaned = base.chars().filter(|c| c.is_alphanumeric()).collect::(); - - cleaned.push_str("@developforgood.org"); - cleaned -} - -/// Generates a random password between 8 and 64 characters for a user in Google Workspace. -/// -/// * `len`: The length of the password to generate (must be between 8 and 64 characters). If it -/// isn't, the function will default to 8 characters. -fn generate_password(len: u8) -> String { - if !(8..=64).contains(&len) { - log::warn!( - "Password length must be between 8 and 64 characters. Defaulting to 8 characters." - ); - } - match len { - // minimum, and default, is 8. max is 64 - 0..=7 | 65.. => rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(8) - .map(char::from) - .collect::(), - 8..=64 => rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(len as usize) - .map(char::from) - .collect::(), - } -} - -/// The result of processing volunteers for export to Google Workspace. -/// -/// * `processed_volunteers`: Data to create users in Google Workspace -/// * `pantheon_user_data`: Data to insert into the pantheon database -/// * `id_map`: A map of the generated email addresses to the Pantheon IDs of the volunteers -struct ProcessVolunteersResult { - processed_volunteers: Vec, - pantheon_user_data: Vec, - id_map: HashMap, -} - -/// Processes volunteers for export to Google Workspace. -/// -/// * `params`: Data about the export request -/// * `volunteers`: Data to process -fn process_volunteers( - params: &ExportUsersToWorkspaceTaskParams, - volunteers: Vec, -) -> Result { - // Maps the _generated_ @developforgood.org email address of a volunteer to their Pantheon ID. - let mut id_map = HashMap::::with_capacity(volunteers.len()); - - // Pantheon export data refers to the data that will be stored in the pantheon database. We - // will insert records for each exported volunteer. - let mut pantheon_export_data = - Vec::::with_capacity(volunteers.len()); - - let mut processed = Vec::with_capacity(volunteers.len()); - - for v in volunteers.iter() { - let primary_email = build_email( - params.request.use_first_and_last_name, - params.request.add_unique_numeric_suffix, - params.request.separator.clone(), - &v.first_name.to_lowercase(), - &v.last_name.to_lowercase(), - ); - - let password = generate_password(params.request.generated_password_length); - - // Associate the generated email address with the volunteer's ID in Pantheon. - id_map.insert(primary_email.clone(), v.volunteer_id); - - let workspace_user = CreateWorkspaceUserBuilder::default() - .primary_email(primary_email.clone()) - .name( - NameBuilder::default() - .given_name(v.first_name.clone()) - .family_name(v.last_name.clone()) - .build() - .unwrap(), - ) - .password(password) - .change_password_at_next_login(params.request.change_password_at_next_login) - .recovery_email(v.email.clone()) - // .recovery_phone(v.phone.clone()) - .build()?; - - pantheon_export_data.push(InsertVolunteerExportedToWorkspace { - volunteer_id: v.volunteer_id, - job_id: params.job_id, - workspace_email: primary_email, - org_unit: "/Programs/PantheonUsers".to_owned(), - }); - - processed.push(workspace_user); - } - - Ok(ProcessVolunteersResult { - processed_volunteers: processed, - pantheon_user_data: pantheon_export_data, - id_map, - }) -} - -/// Exports users to Google Workspace. -/// -/// * `storage_layer`: Handle to the storage layer -/// * `workspace`: Handle to the workspace client -/// * `mail`: Handle to the email client -/// * `principal`: The principal that is making the request -/// * `result`: The result of processing the volunteers -/// * `successfully_exported_users`: A list to keep track of users that are successfully exported -/// in this function -/// -/// At its core, this function creates users in Google Workspace and sends them an onboarding -/// email. If this was successful, it will mark the users as exported in the pantheon database. -async fn export_users_to_workspace( - storage_layer: Arc, - workspace: Arc, - mail: Arc, - principal: &str, - result: ProcessVolunteersResult, - successfully_exported_users: &mut Vec<(Uuid, String)>, -) -> Result<()> { - // NOTE: On the UI, there is an undo button that appears on the toast that pops up after the - // user clicks the "Export" button. This button will send a signal to cancel the export job and - // we want to have 7 seconds of leeway to kind of 'wait' so that we don't do any work that's - // just going to be undone. If they change their minds later, then we will undo the work that - // we've done, but this is a small optimization to prevent unnecessary work. - - log::info!("waiting for potential immediate cancellation"); - time::sleep(Duration::from_secs(7)).await; - - let ProcessVolunteersResult { processed_volunteers, pantheon_user_data, id_map } = result; - - for volunteer in processed_volunteers { - let email = volunteer.recovery_email.clone(); - let primary_email = volunteer.primary_email.clone(); - - // Try to create the user in Google Workspace - workspace.create_user(principal, volunteer.clone()).await?; - if let Some(id) = id_map.get(&primary_email) { - successfully_exported_users.push((*id, primary_email.clone())); - } - - let now = Utc::now().timestamp() as u64; - - let email_params = OnboardingEmailParamsBuilder::default() - .first_name(volunteer.name.given_name) - .last_name(volunteer.name.family_name) - // .email(email.clone()) - .email("anish@developforgood.org") - .workspace_email(primary_email.clone()) - .temporary_password(volunteer.password) - .send_at(now + 60) - .build()?; - - mail.send_onboarding_email(email_params).await?; - - log::info!("exported user: {} -> {}\nSLEEPING FOR 15 SECONDS", email, primary_email); - - // If you're trying to test the undo functionality, you can uncomment the sleep since it - // makes it easier to cancel before the job finishes, since you're probably using this in - // tandem with NoopWorkspaceClient. - // time::sleep(Duration::from_secs(15)).await; - } - - // Mark the volunteers as exported to Google Workspace (this only happens if every single - // volunteer was exported successfully) - - if !pantheon_user_data.is_empty() { - storage_layer - .batch_insert_volunteers_exported_to_workspace( - pantheon_user_data, - &mut ExecOptsBuilder::default().build()?, - ) - .await?; - } - - Ok(()) -} - -/// Undoes a partial export of users to Google Workspace. -/// -/// * `storage_layer`: Handle to the storage layer -/// * `workspace`: Handle to the workspace client -/// * `principal`: The principal that is making the request -/// * `successfully_exported_users`: The list of users that were successfully exported -/// -/// This function will greedily attempt to delete every user in `successfully_exported_users` from -/// both Google Workspace and the pantheon database. If a user cannot be deleted from Google -/// Workspace, they will not be removed from the pantheon database either, and an error will be -/// logged. -async fn undo_partial_export( - storage_layer: Arc, - workspace: Arc, - principal: &str, - successfully_exported_users: Vec<(Uuid, String)>, -) -> Result> { - // This is where the users are removed from workspace (and pantheon's database) - - // Track the successfully deleted users (in workspace) - let mut successfully_deleted_users = - Vec::<(Uuid, String)>::with_capacity(successfully_exported_users.len()); - - // Track the failed deletions (in workspace) - let mut failed_deletions = Vec::<(Uuid, String)>::new(); - - // Delete the users from Google Workspace - for (id, workspace_email) in &successfully_exported_users { - // NOTE: We don't propagate this error because we want to attempt to delete every possible - // user requested. - log::info!("SLEEPING FOR 5 SECONDS"); - time::sleep(Duration::from_secs(5)).await; - log::info!("deleting user: {workspace_email}"); - match workspace.delete_user(principal, workspace_email).await { - Ok(_) => successfully_deleted_users.push((*id, workspace_email.to_owned())), - Err(e) => { - log::error!("error deleting user: {e}"); - failed_deletions.push((*id, workspace_email.to_owned())); - } - } - } - - // Remove the users from the pantheon database - storage_layer - .batch_remove_volunteers_exported_to_workspace( - successfully_deleted_users.iter().map(|(id, _)| *id).collect(), - &mut ExecOptsBuilder::default().build()?, - ) - .await?; - - Ok(failed_deletions) -} - -/// Parameters for the undo export task. -/// -/// * `job_id`: The ID of the job that is being undone -/// * `project_cycle_id`: The ID of the project cycle that the job is associated with -/// * `principal`: The principal that is making the request -/// * `successfully_exported_users`: The users we are attempting to delete -struct UndoExportTaskParams { - job_id: Uuid, - project_cycle_id: Uuid, - principal: String, - successfully_exported_users: Vec<(Uuid, String)>, -} - -/// The post-cancellation undo export task. -/// -/// * `storage_layer`: Handle to the storage layer -/// * `workspace`: Handle to the workspace client -/// * `undo_job_id`: The ID of the job that is undoing the export -/// * `params`: The parameters for the undo task -async fn post_cancellation_undo_partial_export_task( - storage_layer: Arc, - workspace: Arc, - undo_job_id: Uuid, - params: UndoExportTaskParams, -) -> Result<()> { - // NOTE: undo_partial_export_task will attempt to delete all users possible. Failed - // deletes will be returned as a Vec where each Uuid represents a volunteer - // in Pantheon. It will be empty if all deletions were successful. - let principal = params.principal.clone(); - let res = undo_partial_export( - storage_layer.clone(), - workspace.clone(), - &principal, - params.successfully_exported_users, - ) - .await; - - let (status, error) = if res.is_ok() { - (JobStatus::Complete, None) - } else { - (JobStatus::Error, Some(format!("failed to undo partial export: {res:?}"))) - }; - - storage_layer - .update_job_status( - undo_job_id, - UpdateJobStatus { status, error }, - &mut ExecOpts { tx: None }, - ) - .await?; - - Ok(()) -} - -/// Handles the post-cancellation of an export job. -/// -/// This starts an async job to handle the cancellation to undo the export -/// -/// * `storage_layer`: Handle to the storage layer -/// * `workspace`: Handle to the workspace client -/// * `params`: The parameters for the post-cancellation task -async fn handle_post_cancellation_undo_export( - storage_layer: Arc, - workspace: Arc, - params: UndoExportTaskParams, -) -> Result<()> { - // Create a job to undo the partial export - let current_time = Utc::now(); - let time_only = current_time.format("%H:%M:%S").to_string(); - let undo_job_id = storage_layer - .create_job( - Some(params.project_cycle_id), - CreateJobBuilder::default() - .label(format!("Undo Partial Export @ {time_only}")) - .description(Some("Undo partial export of users to Google Workspace".to_owned())) - .data(JobDetails { - job_type: JobType::UndoWorkspaceExport, - error: None, - data: JobData::UndoWorkspaceExport { - volunteers: params.successfully_exported_users.clone(), - }, - }) - .build()?, - &mut ExecOptsBuilder::default().build()?, - ) - .await?; - - // Mark the current export job as canceled - storage_layer - .update_job_status( - params.job_id, - UpdateJobStatus { status: JobStatus::Cancelled, error: None }, - &mut ExecOpts { tx: None }, - ) - .await?; - - log::info!( - "successfully resolved export job (job status: cancelled). undo job id: {undo_job_id}" - ); - - // Spawn a nonblocking task to handle the undo - task::spawn(async move { - let _ = post_cancellation_undo_partial_export_task( - storage_layer.clone(), - workspace.clone(), - undo_job_id, - params, - ) - .await; - }); - Ok(()) -} - -/// Parameters for the post-cancellation handler. -/// -/// * `job_id`: The ID of the job that was canceled -/// * `project_cycle_id`: The ID of the project cycle that the job is associated with -/// * `principal`: The principal that is making the request -struct HandlePostCancellationParams { - job_id: Uuid, - project_cycle_id: Uuid, - principal: String, -} - -/// Wraps the handling of the post-cancellation of an export job. -/// -/// * `storage_layer`: Handle to the storage layer -/// * `workspace`: Handle to the workspace client -/// * `params`: The parameters for the post-cancellation handler -/// * `successfully_exported_users`: The users that were successfully exported -async fn handle_post_cancellation( - storage_layer: Arc, - workspace: Arc, - params: HandlePostCancellationParams, - successfully_exported_users: Vec<(Uuid, String)>, -) -> Result<()> { - let undo_params = UndoExportTaskParams { - job_id: params.job_id, - project_cycle_id: params.project_cycle_id, - principal: params.principal.clone(), - successfully_exported_users: successfully_exported_users.clone(), - }; - handle_post_cancellation_undo_export(storage_layer, workspace, undo_params).await?; - Ok(()) -} - -/// Parameters for the post-export handler. -/// -/// * `export_result`: The result of the export -/// * `job_id`: The ID of the job that handled the export -/// * `project_cycle_id`: The ID of the project cycle that the job is associated with -/// * `principal`: The principal that is making the request -/// * `successfully_exported_users`: The users that were successfully exported -struct HandlePostExportParams { - export_result: Result<()>, - job_id: Uuid, - project_cycle_id: Uuid, - principal: String, - successfully_exported_users: Vec<(Uuid, String)>, -} - -/// The task that handles the undo of an export job if the export job was not a complete success. -/// (if all users were not exported successfully). -/// -/// * `storage_layer`: Handle to the storage layer -/// * `workspace`: Handle to the workspace client -/// * `principal`: The principal that is making the request -/// * `undo_job_id`: The ID of the job that is undoing the export -/// * `successfully_exported_users`: The users that were successfully exported -async fn post_export_undo_task( - storage_layer: Arc, - workspace: Arc, - principal: &str, - undo_job_id: Uuid, - successfully_exported_users: Vec<(Uuid, String)>, -) -> Result<()> { - let failed_deletions = undo_partial_export( - storage_layer.clone(), - workspace.clone(), - principal, - successfully_exported_users, - ) - .await?; - - let (status, error) = if failed_deletions.is_empty() { - (JobStatus::Complete, None) - } else { - log::error!("failed to delete {} users: {failed_deletions:?}", failed_deletions.len()); - ( - JobStatus::Error, - Some(format!( - "failed to delete {} users: {failed_deletions:?}", - failed_deletions.len() - )), - ) - }; - - storage_layer - .update_job_status( - undo_job_id, - UpdateJobStatus { status, error }, - &mut ExecOpts { tx: None }, - ) - .await?; - - Ok(()) -} - -/// Handles the post-export of an export job. -/// -/// * `storage_layer`: Handle to the storage layer -/// * `workspace`: Handle to the workspace client -/// * `params`: Parameters for the post-export handler -async fn handle_post_export( - storage_layer: Arc, - workspace: Arc, - params: HandlePostExportParams, -) -> Result<()> { - // [F2-2]: Based on the export result, update the job status - let res = params.export_result; - let (status, error) = if res.is_ok() { - (JobStatus::Complete, None) - } else { - ( - JobStatus::Error, - Some(format!("unrecoverable error exporting all users to Google Workspace: {res:?}")), - ) - }; - - storage_layer - .update_job_status( - params.job_id, - UpdateJobStatus { status, error }, - &mut ExecOpts { tx: None }, - ) - .await?; - - // [F2-3]: If there was an error, create a job to document the undo process - if JobStatus::Error == status && !params.successfully_exported_users.is_empty() { - let current_time = Utc::now(); - let time_only = current_time.format("%H:%M:%S").to_string(); - let undo_job_id = storage_layer - .create_job( - Some(params.project_cycle_id), - CreateJobBuilder::default() - .label(format!("Undo Partial Export @ {time_only}")) - .description(Some( - "Undo partial export of users to Google Workspace".to_owned(), - )) - .data(JobDetails { - job_type: JobType::UndoWorkspaceExport, - error: None, - data: JobData::UndoWorkspaceExport { - volunteers: params.successfully_exported_users.clone(), - }, - }) - .build()?, - &mut ExecOptsBuilder::default().build()?, - ) - .await?; - - // [F2-4]: Spawn a nonblocking task to handle the undo - task::spawn(async move { - let _ = post_export_undo_task( - storage_layer.clone(), - workspace.clone(), - ¶ms.principal, - undo_job_id, - params.successfully_exported_users, - ) - .await; - }); - } - - Ok(()) -} - -/// The export task that is spawned when a user requests to export volunteers to Google Workspace. -/// -/// * `storage_layer`: Handle to the storage layer -/// * `workspace`: Handle to the workspace client -/// * `mail`: Handle to the mail client -/// * `params`: Parameters for the export task -pub async fn export_task( - storage_layer: Arc, - workspace: Arc, - mail: Arc, - mut params: ExportUsersToWorkspaceTaskParams, -) -> Result<()> { - let process_result = process_volunteers(¶ms, params.request.volunteers.clone())?; - - // NOTE: This maps volunteer IDs in pantheon to the generated email addresses in Google Workspace. - // The emails are NOT the same as the ones in the email field in the volunteers relation in - // pantheon's database. This needs to be here because the way we handle multiple futures in the - // tokio::select! macro necessitates that multiple handlers have this data in order to - // facilitate the undo process if one is necessary. - let mut successfully_exported_users = - Vec::<(Uuid, String)>::with_capacity(process_result.processed_volunteers.len()); - - // Race the following futures: - // - // 1 Await a cancellation signal: - // - If received, create a job to undo the partial export, and then spawn an async task to - // handle the deletion - // 2. Export all users to Google Workspace - // - If successful, mark the job as complete - // - If failed, mark the job as errored - // 3. Timeout after 10 minutes - // - tokio::select! { - // Future 1 - _ = params.subscriber.next() => { - // NOTE: At this point, we have received a signal to cancel this export job. - log::info!("request cancelled...attempting to undo partial export"); - let post_cancel_params = HandlePostCancellationParams { - job_id: params.job_id, - project_cycle_id: params.project_cycle_id, - principal: params.principal.clone(), - }; - log::info!("successfully_exported_users: {successfully_exported_users:?}"); - handle_post_cancellation(storage_layer.clone(), workspace.clone(), post_cancel_params, successfully_exported_users).await?; - }, - - // Future 2 - res = export_users_to_workspace(storage_layer.clone(), workspace.clone(), mail.clone(), ¶ms.principal, process_result, &mut successfully_exported_users) => { - // NOTE: At this point, we have completed out attempt to export all users to Google Workspace. - - // [F2-1]: Call a post-export hook with the right parameters - let post_export_params = HandlePostExportParams { - export_result: res, - job_id: params.job_id, - project_cycle_id: params.project_cycle_id, - principal: params.principal.clone(), - successfully_exported_users, - }; - - handle_post_export(storage_layer, workspace, post_export_params).await?; - }, - - // Future 3 - () = time::sleep(Duration::from_secs(1200)) => { - // NOTE: A request times out after 10 minutes. An export task really shouldn't take - // this long. - log::warn!("request timed out"); - } - }; - - Ok(()) -} diff --git a/src/app/api/v1/data_exports/mod.rs b/src/app/api/v1/data_exports/mod.rs index 423155d..6f24231 100644 --- a/src/app/api/v1/data_exports/mod.rs +++ b/src/app/api/v1/data_exports/mod.rs @@ -1,12 +1,13 @@ //! Data Exports API. mod controllers; -mod export_users_to_workspace; mod requests; mod responses; +mod workspace; use std::sync::Arc; +use axum::extract::FromRef; use axum::middleware::from_fn_with_state; use axum::{routing, Router}; use utoipa::OpenApi; @@ -14,6 +15,22 @@ use utoipa::OpenApi; use crate::app::api::middleware::make_rbac; use crate::app::state::Services; +struct ExportServices { + pub storage_layer: Arc, + pub workspace: Arc, + pub mail: Arc, +} + +impl FromRef> for ExportServices { + fn from_ref(ctx: &Arc) -> Self { + Self { + storage_layer: ctx.storage_layer.clone(), + workspace: ctx.workspace.clone(), + mail: ctx.mail.clone(), + } + } +} + /// Documents the API for data exports #[derive(OpenApi)] #[openapi( @@ -34,6 +51,7 @@ pub async fn build(ctx: Arc) -> Router<()> { Router::new() .route("/:project_cycle_id/workspace", export_users_to_workspace) + // .route("/x", routing::post(workspace::c)) .route_layer(from_fn_with_state(ctx.clone(), guard1)) .with_state(ctx.clone()) } diff --git a/src/app/api/v1/data_exports/workspace/mod.rs b/src/app/api/v1/data_exports/workspace/mod.rs new file mode 100644 index 0000000..a729d6a --- /dev/null +++ b/src/app/api/v1/data_exports/workspace/mod.rs @@ -0,0 +1,179 @@ +pub mod policies; + +use anyhow::Result; +use policies::{EmailPolicy, PasswordPolicy}; +use uuid::Uuid; + +use super::ExportServices; +use crate::services::mail::{OnboardingEmailParams, OnboardingEmailParamsBuilder}; +use crate::services::storage::entities::VolunteerDetails; +use crate::services::storage::volunteers::InsertVolunteerExportedToWorkspace; +use crate::services::storage::ExecOptsBuilder; +use crate::services::workspace::entities::{ + CreateWorkspaceUser, CreateWorkspaceUserBuilder, NameBuilder, +}; + +pub struct ExportParams { + pub job_id: Uuid, + pub principal: String, + pub email_policy: EmailPolicy, + pub password_policy: PasswordPolicy, + pub volunteers: Vec, +} + +struct ProcessedVolunteers { + pub export_data: Vec, + pub pantheon_data: Vec, + pub onboarding_email_data: Vec, +} + +fn process_volunteers(params: &ExportParams) -> Result { + let mut pantheon_data = + Vec::::with_capacity(params.volunteers.len()); + + let mut export_data = Vec::::with_capacity(params.volunteers.len()); + + let mut onboarding_email_data = + Vec::::with_capacity(params.volunteers.len()); + + for v in ¶ms.volunteers { + let primary_email = params.email_policy.build_email(&v.first_name, &v.last_name); + let temporary_password = params.password_policy.generate_password(); + + let workspace_user = CreateWorkspaceUserBuilder::default() + .primary_email(primary_email.clone()) + .name( + NameBuilder::default() + .given_name(v.first_name.clone()) + .family_name(v.last_name.clone()) + .build() + .unwrap(), + ) + .password(temporary_password) + .change_password_at_next_login(params.password_policy.change_password_at_next_login) + .recovery_email(v.email.clone()) + // .recovery_phone(v.phone.clone()) + .build()?; + + onboarding_email_data.push( + OnboardingEmailParamsBuilder::default() + .first_name(workspace_user.name.given_name.clone()) + .last_name(workspace_user.name.family_name.clone()) + // .email(workspace_user.recovery_email.clone()) + .email("anish@developforgood.org") + .workspace_email(workspace_user.primary_email.clone()) + .temporary_password(workspace_user.password.clone()) + .build()?, + ); + + export_data.push(workspace_user); + + pantheon_data.push(InsertVolunteerExportedToWorkspace { + volunteer_id: v.volunteer_id, + job_id: params.job_id, + workspace_email: primary_email, + org_unit: "/Programs/PantheonUsers".to_owned(), + }); + } + + Ok(ProcessedVolunteers { export_data, pantheon_data, onboarding_email_data }) +} + +async fn export_volunteers_to_workspace( + services: &ExportServices, + principal: &str, + export_data: Vec, +) -> usize { + let mut successfully_exported = 0usize; + for user in export_data { + let name = format!("{} {}", &user.name.given_name, &user.name.family_name); + match services.workspace.create_user(principal, user).await { + Ok(_) => { + log::info!("Successfully exported user {} to workspace", name); + successfully_exported += 1; + } + Err(e) => { + log::error!("Failed to export user to workspace: {}", e); + break; + } + } + } + + successfully_exported +} + +async fn save_exported_volunteers<'a>( + services: &ExportServices, + save_data: Vec, +) -> Result<()> { + services + .storage_layer + .batch_insert_volunteers_exported_to_workspace( + save_data, + &mut ExecOptsBuilder::default().build()?, + ) + .await?; + + Ok(()) +} + +async fn send_onboarding_emails( + services: &ExportServices, + onboarding_data: Vec, +) -> Result<()> { + for email in onboarding_data { + services.mail.send_onboarding_email(email).await?; + } + Ok(()) +} + +pub async fn export_task(services: &ExportServices, params: ExportParams) -> Result<()> { + let mut processed = process_volunteers(¶ms)?; + + let number_of_users_to_export = processed.export_data.len(); + let exported_count = + export_volunteers_to_workspace(services, ¶ms.principal, processed.export_data).await; + + if exported_count != number_of_users_to_export { + log::error!( + "Failed to export all users to workspace. Exported {} out of {}", + exported_count, + number_of_users_to_export + ); + processed.pantheon_data.truncate(exported_count); + processed.onboarding_email_data.truncate(exported_count); + } + + match save_exported_volunteers(services, processed.pantheon_data).await { + Ok(_) => match send_onboarding_emails(services, processed.onboarding_email_data).await { + Ok(_) => { + services + .storage_layer + .mark_job_complete(params.job_id, &mut ExecOptsBuilder::default().build()?) + .await? + } + Err(e) => { + services + .storage_layer + .mark_job_errored( + params.job_id, + e.to_string(), + &mut ExecOptsBuilder::default().build()?, + ) + .await? + } + }, + Err(e) => { + services + .storage_layer + .mark_job_errored( + params.job_id, + e.to_string(), + &mut ExecOptsBuilder::default().build()?, + ) + .await? + } + } + + Ok(()) +} diff --git a/src/app/api/v1/data_exports/workspace/policies.rs b/src/app/api/v1/data_exports/workspace/policies.rs new file mode 100644 index 0000000..81297c5 --- /dev/null +++ b/src/app/api/v1/data_exports/workspace/policies.rs @@ -0,0 +1,84 @@ +use rand::distributions::Alphanumeric; +use rand::Rng; + +use crate::app::api::v1::data_exports::requests::ExportUsersToWorkspaceRequest; + +pub struct EmailPolicy { + pub add_unique_numeric_suffix: bool, + pub separator: Option, + pub use_first_and_last_name: bool, +} + +impl EmailPolicy { + pub fn build_email(&self, first_name: &str, last_name: &str) -> String { + let mut base = if self.use_first_and_last_name { + format!( + "{}{}{}", + first_name.to_lowercase(), + self.separator.as_ref().unwrap_or(&"".to_string()), + last_name.to_lowercase() + ) + } else { + first_name.to_lowercase() + }; + + if self.add_unique_numeric_suffix { + let mut rng = rand::thread_rng(); + let suffix = rng.gen_range(10..100); + + base.push_str(&suffix.to_string()); + } + + let mut cleaned = base.chars().filter(|c| c.is_alphanumeric()).collect::(); + + cleaned.push_str("@developforgood.org"); + cleaned + } +} + +pub struct PasswordPolicy { + pub change_password_at_next_login: bool, + pub generated_password_length: u8, +} + +impl PasswordPolicy { + pub fn generate_password(&self) -> String { + if !(8..=64).contains(&self.generated_password_length) { + log::warn!( + "Password length must be between 8 and 64 characters. Defaulting to 8 characters." + ); + } + match self.generated_password_length { + // minimum, and default, is 8. max is 64 + 0..=7 | 65.. => rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(8) + .map(char::from) + .collect::(), + 8..=64 => rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(self.generated_password_length as usize) + .map(char::from) + .collect::(), + } + } +} + +impl From<&ExportUsersToWorkspaceRequest> for EmailPolicy { + fn from(request: &ExportUsersToWorkspaceRequest) -> Self { + Self { + add_unique_numeric_suffix: request.add_unique_numeric_suffix, + separator: request.separator.clone(), + use_first_and_last_name: request.use_first_and_last_name, + } + } +} + +impl From<&ExportUsersToWorkspaceRequest> for PasswordPolicy { + fn from(request: &ExportUsersToWorkspaceRequest) -> Self { + Self { + change_password_at_next_login: request.change_password_at_next_login, + generated_password_length: request.generated_password_length, + } + } +} diff --git a/src/app/api/v1/data_imports/airtable/intermediate.rs b/src/app/api/v1/data_imports/airtable/intermediate.rs new file mode 100644 index 0000000..94da8d9 --- /dev/null +++ b/src/app/api/v1/data_imports/airtable/intermediate.rs @@ -0,0 +1,213 @@ +use derive_builder::Builder; +use serde::{Deserialize, Serialize}; + +use crate::services::storage::mentors::CreateMentor; +use crate::services::storage::nonprofits::CreateNonprofit; +use crate::services::storage::types::{ + AgeRange, ClientSize, Ethnicity, Fli, Gender, ImpactCause, Lgbt, MentorExperienceLevel, + MentorYearsExperience, StudentStage, VolunteerHearAbout, +}; +use crate::services::storage::volunteers::CreateVolunteer; + +#[derive(Builder, Deserialize, Serialize, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct IntermediateNonprofitData { + #[builder(setter(into))] + #[serde(rename = "FirstName")] + pub representative_first_name: String, + #[builder(setter(into))] + #[serde(rename = "LastName")] + pub representative_last_name: String, + #[builder(setter(into))] + #[serde(rename = "JobTitle")] + pub representative_job_title: String, + #[builder(setter(into))] + #[serde(rename = "NonprofitEmail")] + pub email: String, + #[serde(rename = "Cc")] + #[builder(setter(into), default = "None")] + pub email_cc: Option, + #[builder(setter(into))] + pub phone: String, + #[builder(setter(into))] + pub org_name: String, + #[builder(setter(into))] + pub project_name: String, + #[builder(setter(into), default = "None")] + pub org_website: Option, + #[serde(rename = "CountryHQ")] + #[builder(setter(into), default = "None")] + pub country_hq: Option, + #[serde(rename = "StateHQ")] + #[builder(setter(into), default = "None")] + pub us_state_hq: Option, + #[builder(setter(into))] + pub address: String, + pub size: ClientSize, + pub impact_causes: Option>, +} + +impl From for CreateNonprofit { + fn from(value: IntermediateNonprofitData) -> Self { + CreateNonprofit { + representative_first_name: value.representative_first_name, + representative_last_name: value.representative_last_name, + representative_job_title: value.representative_job_title, + email: value.email, + email_cc: value.email_cc, + phone: value.phone, + org_name: value.org_name, + project_name: value.project_name, + org_website: value.org_website, + country_hq: value.country_hq, + us_state_hq: value.us_state_hq, + address: value.address, + size: value.size, + impact_causes: value + .impact_causes + .unwrap_or(vec!["Other".to_owned()]) + .iter() + .map(|c| match c.as_str() { + "reco1zHRYv8lTQDaI" => ImpactCause::Animals, + "recXhhTPsuQ2PMjU4" => ImpactCause::CareerAndProfessionalDevelopment, + "recvWKilRRABCcHuI" => ImpactCause::DisasterRelief, + "recYfRNFDpm2nedjM" => ImpactCause::Education, + "recOlWiJTppnQwnll" => ImpactCause::EnvironmentAndSustainability, + "recix0Y5qCXYfZGRz" => ImpactCause::FaithAndReligion, + "recKs8kboTORruStC" => ImpactCause::HealthAndMedicine, + "recEmtYMgeOlPeOVQ" => ImpactCause::GlobalRelations, + "reczSSbvdW2NoOX2p" => ImpactCause::PovertyAndHunger, + "rec5dt6EVyUeIaCR7" => ImpactCause::SeniorServices, + "recMt9349gwuRAQXf" => ImpactCause::JusticeAndEquity, + "rec8cH6YTQMeYqXUh" => ImpactCause::VeteransAndMilitaryFamilies, + _ => ImpactCause::Other, + }) + .collect(), + } + } +} + +#[derive(Debug, Builder, Serialize, Deserialize, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct IntermediateVolunteerData { + #[builder(setter(into))] + pub first_name: String, + #[builder(setter(into))] + pub last_name: String, + #[builder(setter(into))] + pub email: String, + #[builder(setter(into), default = "None")] + pub phone: Option, + #[builder(default = "Gender::PreferNotToSay")] + #[serde(rename = "Gender")] + pub volunteer_gender: Gender, + #[builder(default = "vec![Ethnicity::PreferNotToSay]")] + #[serde(rename = "Ethnicity")] + pub volunteer_ethnicity: Vec, + #[builder(default = "AgeRange::R18_24")] + #[serde(rename = "AgeRange")] + pub volunteer_age_range: AgeRange, + #[builder(default = "vec![]")] + pub university: Vec, + #[serde(rename = "LGBT")] + pub lgbt: Lgbt, + pub country: String, + #[builder(setter(into), default = "None")] + #[serde(rename = "State")] + pub us_state: Option, + #[builder(default = "vec![Fli::PreferNotToSay]")] + #[serde(rename = "FLI")] + pub fli: Vec, + pub student_stage: StudentStage, + pub majors: String, + pub minors: Option, + pub hear_about: Vec, +} + +impl From for CreateVolunteer { + fn from(value: IntermediateVolunteerData) -> Self { + CreateVolunteer { + first_name: value.first_name, + last_name: value.last_name, + email: value.email, + phone: value.phone, + volunteer_gender: value.volunteer_gender, + volunteer_ethnicity: value.volunteer_ethnicity, + volunteer_age_range: value.volunteer_age_range, + university: value.university, + lgbt: value.lgbt, + country: value.country, + us_state: value.us_state, + fli: value.fli, + student_stage: value.student_stage, + majors: value.majors.split(",").map(|m| m.trim().to_string()).collect(), + minors: value + .minors + .unwrap_or_default() + .split(",") + .map(|m| m.trim().to_string()) + .collect(), + hear_about: value.hear_about, + } + } +} + +#[derive(Builder, Serialize, Deserialize, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct IntermediateMentorData { + // pub project_cycle_id: Uuid, + #[builder(setter(into))] + pub first_name: String, + #[builder(setter(into))] + pub last_name: String, + #[builder(setter(into))] + pub email: String, + #[builder(setter(into))] + pub phone: String, + #[builder(setter(into))] + pub company: String, + #[builder(setter(into))] + pub job_title: String, + #[builder(setter(into))] + pub country: String, + #[builder(setter(into), default = "None")] + pub us_state: Option, + #[builder(setter(into))] + pub years_experience: String, + #[builder(setter(into))] + pub experience_level: MentorExperienceLevel, + pub prior_mentorship: Vec, + #[serde(rename = "PriorDFG")] + pub prior_dfg: Option>, + pub university: Option>, + pub hear_about: Option>, +} + +impl From for CreateMentor { + fn from(value: IntermediateMentorData) -> Self { + CreateMentor { + first_name: value.first_name, + last_name: value.last_name, + email: value.email, + phone: value.phone, + company: value.company, + job_title: value.job_title, + country: value.country, + us_state: value.us_state, + years_experience: match value.years_experience.as_str() { + "2-5" => MentorYearsExperience::R2_5, + "6-10" => MentorYearsExperience::R6_10, + "11-15" => MentorYearsExperience::R11_15, + "16-20" => MentorYearsExperience::R16_20, + _ => MentorYearsExperience::R21Plus, + }, + experience_level: value.experience_level, + prior_mentor: value.prior_mentorship.contains(&"Yes, I've been a mentor".to_owned()), + prior_mentee: value.prior_mentorship.contains(&"Yes, I've been a mentee".to_owned()), + // prior_student: !value.prior_dfg.contains(&"No".to_owned()), + prior_student: value.prior_dfg.unwrap_or_default().contains(&"Yes".to_owned()), + university: value.university.unwrap_or_default(), + hear_about: value.hear_about.unwrap_or(vec![VolunteerHearAbout::Other]), + } + } +} diff --git a/src/app/api/v1/data_imports/airtable/mod.rs b/src/app/api/v1/data_imports/airtable/mod.rs new file mode 100644 index 0000000..7dd9116 --- /dev/null +++ b/src/app/api/v1/data_imports/airtable/mod.rs @@ -0,0 +1,517 @@ +mod intermediate; + +use std::collections::HashMap; + +use anyhow::{bail, Result}; +use intermediate::{IntermediateMentorData, IntermediateNonprofitData, IntermediateVolunteerData}; +use serde_json::Value; +use uuid::Uuid; + +use super::ImportServices; +use crate::services::airtable::base_data::records::responses::ListRecordsResponse; +use crate::services::airtable::base_data::records::ListRecordsQueryBuilder; +use crate::services::airtable::entities::record::Record; +use crate::services::airtable::entities::schema::Table; +use crate::services::storage::cycles::CreateCycleBuilder; +use crate::services::storage::mentors::CreateMentor; +use crate::services::storage::nonprofits::CreateNonprofit; +use crate::services::storage::volunteers::CreateVolunteer; +use crate::services::storage::ExecOptsBuilder; + +pub struct ImportParams { + pub name: String, + pub description: String, + pub job_id: Uuid, + pub base_id: String, + pub tables: Vec, +} + +async fn fetch_nonprofits( + services: &ImportServices, + params: &ImportParams, +) -> Result>> { + let Some(nonprofits_table) = params.tables.iter().find(|t| t.name == "Nonprofits") else { + bail!("Nonprofits table not found"); + }; + + let Some(finalized_nonprofits_view) = nonprofits_table + .views + .iter() + .find(|t| t.name.starts_with("Finalized") && t.name.ends_with("Nonprofit Projects")) + else { + bail!("Finalized Nonprofits view not found: Name should start with `Finalized` and end with `Nonprofit Projects`"); + }; + + let mut query_opts = ListRecordsQueryBuilder::default() + .fields( + [ + "OrgName", + "ProjectName", + "OrgWebsite", + "FirstName", + "LastName", + "JobTitle", + "NonprofitEmail", + "Phone", + "CountryHQ", + "StateHQ", + "Address", + "Size", + "ImpactCauses", + ] + .iter() + .map(ToString::to_string) + .collect::>(), + ) + .view(finalized_nonprofits_view.name.clone()) + .build()?; + + let mut nonprofits = Vec::>::with_capacity(50); + + loop { + let ListRecordsResponse { ref mut records, offset } = services + .airtable + .list_records(¶ms.base_id, "Nonprofits", Some(query_opts.clone())) + .await?; + + nonprofits.append(records); + + if let Some(new_offset) = offset { + query_opts.offset = Some(new_offset); + } else { + break; + } + } + + Ok(nonprofits) +} + +fn process_raw_nonprofits(nonprofits: Vec>) -> Result> { + let create_nonprofit_data = nonprofits + .iter() + .filter_map(|n| { + match serde_json::from_value::(n.fields.clone()) { + Ok(data) => Some(data), + Err(e) => { + log::error!("{}", e); + None + } + } + }) + .map(CreateNonprofit::from) + .collect::>(); + + log::info!("Processed {} nonprofits", create_nonprofit_data.len()); + Ok(create_nonprofit_data) +} + +async fn fetch_volunteers( + services: &ImportServices, + params: &ImportParams, +) -> Result>> { + let fields = [ + "FirstName", + "LastName", + "OrgName (from ProjectRecordID)", + "Email", + "Phone", + "Gender", + "Ethnicity", + "AgeRange", + "University", + "LGBT", + "Country", + "State", + "FLI", + "StudentStage", + "Majors", + "Minors", + "HearAbout", + ] + .iter() + .map(ToString::to_string) + .collect::>(); + + let mut query_opts = ListRecordsQueryBuilder::default() + .fields(fields) + .view("All Committed Student Volunteers - Active".to_owned()) + .build()?; + + let mut volunteers = Vec::>::with_capacity(500); + + loop { + let ListRecordsResponse { ref mut records, offset } = services + .airtable + .list_records(¶ms.base_id, "Volunteers", Some(query_opts.clone())) + .await?; + volunteers.append(records); + if let Some(new_offset) = offset { + query_opts.offset = Some(new_offset); + } else { + break; + } + } + Ok(volunteers) +} + +struct ProcessedVolunteers { + create_volunteer_data: Vec, + volunteer_nonprofit_linkage: Vec<(String, String)>, +} + +fn process_raw_volunteers(volunteers: Vec>) -> Result { + let mut volunteer_nonprofit_linkage = Vec::<(String, String)>::with_capacity(500); + + let create_volunteer_data = volunteers + .iter() + .filter_map(|v| { + match ( + &v.fields["OrgName (from ProjectRecordID)"], + serde_json::from_value::(v.fields.clone()), + ) { + (Value::Array(clients), Ok(data)) => { + for client in clients { + if let Some(org_name) = client.as_str() { + volunteer_nonprofit_linkage + .push((data.email.clone(), org_name.to_owned())); + } + } + Some(data) + } + (_, d) => { + dbg!(&d); + None + } + } + }) + .map(CreateVolunteer::from) + .collect::>(); + + log::info!("Processed {} volunteers", create_volunteer_data.len()); + Ok(ProcessedVolunteers { create_volunteer_data, volunteer_nonprofit_linkage }) +} + +async fn fetch_mentors( + services: &ImportServices, + params: &ImportParams, +) -> Result>> { + let fields = [ + "FirstName", + "LastName", + "Email", + "Phone", + "OfferLetterSignature", + "Company", + "JobTitle", + "ProjectRole", + "OrgName (from ProjectRecordID)", + "Country", + "State", + "YearsExperience", + "ExperienceLevel", + "PriorMentorship", + "PriorDFG", + "University", + "HearAbout", + ] + .iter() + .map(ToString::to_string) + .collect::>(); + + let mut query_opts = ListRecordsQueryBuilder::default() + .fields(fields) + .view("All Committed Mentor Volunteers".to_owned()) + .build()?; + + let mut mentors = Vec::>::with_capacity(50); + + loop { + let ListRecordsResponse { ref mut records, offset } = services + .airtable + .list_records(¶ms.base_id, "Volunteers", Some(query_opts.clone())) + .await?; + mentors.append(records); + if let Some(new_offset) = offset { + query_opts.offset = Some(new_offset); + } else { + break; + } + } + + Ok(mentors) +} + +struct ProcessedMentors { + create_mentor_data: Vec, + mentor_nonprofit_linkage: Vec<(String, Vec)>, +} + +fn process_raw_mentors(mentors: Vec>) -> Result { + let mut mentor_nonprofit_linkage = Vec::<(String, Vec)>::with_capacity(50); + + mentors.iter().for_each(|mentor| { + if let (Some(email), Some(Value::Array(clients)), Some(Value::Array(project_roles))) = ( + mentor.fields.get("Email").and_then(|v| v.as_str().map(String::from)), + mentor.fields.get("OrgName (from ProjectRecordID)"), + mentor.fields.get("ProjectRole"), + ) { + let nonprofit_names = clients + .iter() + .filter_map(|project| project.as_str().map(String::from)) + .collect::>(); + + let role_names = project_roles + .iter() + .filter_map(|role| role.as_str().map(String::from)) + .collect::>(); + + if role_names.contains(&"Team Mentor".to_owned()) { + mentor_nonprofit_linkage.push((email, nonprofit_names)); + } + } + }); + + let create_mentor_data = mentors + .iter() + .filter_map(|m| match serde_json::from_value::(m.fields.clone()) { + Ok(data) => Some(data), + Err(e) => { + log::error!("{m:?}"); + log::error!("{}", e); + None + } + }) + .map(CreateMentor::from) + .collect::>(); + + log::info!("Processed {} mentors", create_mentor_data.len()); + Ok(ProcessedMentors { create_mentor_data, mentor_nonprofit_linkage }) +} + +async fn fetch_mentor_mentee_linkage( + services: &ImportServices, + params: &ImportParams, +) -> Result> { + let mut query_opts = ListRecordsQueryBuilder::default() + .fields(vec!["Email".to_owned(), "Mentee Email (from Volunteers)".to_owned()]) + .view("All Committed Mentor Volunteers - 1:1 Mentor-Mentee Pairings".to_owned()) + .build()?; + let mut mentor_mentee_linkage = Vec::<(String, String)>::with_capacity(50); + + loop { + let ListRecordsResponse { ref mut records, offset } = services + .airtable + .list_records(¶ms.base_id, "Volunteers", Some(query_opts.clone())) + .await?; + + for record in records.iter() { + if let (Some(email), Some(Value::Array(mentee_emails))) = ( + record.fields.get("Email").and_then(|v| v.as_str().map(String::from)), + record.fields.get("Mentee Email (from Volunteers)"), + ) { + mentee_emails.iter().for_each(|mentee_email| { + if let Some(mentee_email) = mentee_email.as_str() { + mentor_mentee_linkage.push((email.clone(), mentee_email.to_owned())); + } + }); + } + } + + if let Some(new_offset) = offset { + query_opts.offset = Some(new_offset); + } else { + break; + } + } + Ok(mentor_mentee_linkage) +} + +struct ImportBaseData { + name: String, + description: String, + nonprofits: Vec, + volunteers: Vec, + volunteer_nonprofit_linkage: Vec<(String, String)>, + mentors: Vec, + mentor_nonprofit_linkage: Vec<(String, Vec)>, + mentor_mentee_linkage: Vec<(String, String)>, +} + +async fn collect_import_base_data( + services: &ImportServices, + params: &ImportParams, +) -> Result { + let nonprofits = fetch_nonprofits(services, params).await?; + let create_nonprofit_data = process_raw_nonprofits(nonprofits)?; + + let volunteers = fetch_volunteers(services, params).await?; + let ProcessedVolunteers { create_volunteer_data, volunteer_nonprofit_linkage } = + process_raw_volunteers(volunteers)?; + + let mentors = fetch_mentors(services, params).await?; + let ProcessedMentors { create_mentor_data, mentor_nonprofit_linkage } = + process_raw_mentors(mentors)?; + + let mentor_mentee_linkage = fetch_mentor_mentee_linkage(services, params).await?; + + Ok(ImportBaseData { + name: params.name.to_owned(), + description: params.description.to_owned(), + nonprofits: create_nonprofit_data, + volunteers: create_volunteer_data, + volunteer_nonprofit_linkage, + mentors: create_mentor_data, + mentor_nonprofit_linkage, + mentor_mentee_linkage, + }) +} + +async fn store_base_data( + services: &ImportServices, + data: ImportBaseData, + params: &ImportParams, +) -> Result<()> { + let mut tx = services.storage_layer.acquire().await?; + let mut exec_opts = ExecOptsBuilder::default().tx(&mut tx).build()?; + + let project_cycle_id = services + .storage_layer + .create_cycle( + CreateCycleBuilder::default().name(data.name).description(data.description).build()?, + &mut exec_opts, + ) + .await?; + + let nonprofits = services + .storage_layer + .batch_create_nonprofits(project_cycle_id, data.nonprofits, &mut exec_opts) + .await?; + + let volunteers = services + .storage_layer + .batch_create_volunteers(project_cycle_id, data.volunteers, &mut exec_opts) + .await?; + + let mentors = services + .storage_layer + .batch_create_mentors(project_cycle_id, data.mentors, &mut exec_opts) + .await?; + + let nonprofit_name_id_map = HashMap::::from_iter(nonprofits); + let volunteer_email_id_map = HashMap::::from_iter(volunteers); + let mentor_email_id_map = HashMap::::from_iter(mentors); + + let volunteer_nonprofit_linkage = data + .volunteer_nonprofit_linkage + .iter() + .filter_map(|(email, org_name)| { + let volunteer_id = volunteer_email_id_map.get(email)?; + let nonprofit_id = nonprofit_name_id_map.get(org_name)?; + Some((*volunteer_id, *nonprofit_id)) + }) + .collect::>(); + + let mentor_nonprofit_linkage = data + .mentor_nonprofit_linkage + .iter() + .filter_map(|(email, org_names)| { + let mentor_id = mentor_email_id_map.get(email)?; + let nonprofit_ids = org_names + .iter() + .filter_map(|org_name| nonprofit_name_id_map.get(org_name)) + .cloned() + .collect::>(); + Some((*mentor_id, nonprofit_ids)) + }) + .flat_map(|(mentor_id, nonprofit_ids)| { + nonprofit_ids + .iter() + .map(|nonprofit_id| (mentor_id, *nonprofit_id)) + .collect::>() + }) + .collect::>(); + + let volunteer_mentee_linkage = data + .mentor_mentee_linkage + .iter() + .filter_map(|(mentor_email, mentee_email)| { + let mentor_id = mentor_email_id_map.get(mentor_email); + let mentee_id = volunteer_email_id_map.get(mentee_email); + if mentor_id.is_none() || mentee_id.is_none() { + // log::info!( + // "mentor_email: {:?}, mentee_email: {:?}", + // mentor_email, + // mentee_email + // ); + } + Some((*(mentee_id?), *(mentor_id?))) + }) + .collect::>(); + + if !volunteer_nonprofit_linkage.is_empty() { + services + .storage_layer + .batch_link_volunteers_to_nonprofits( + project_cycle_id, + volunteer_nonprofit_linkage, + &mut exec_opts, + ) + .await?; + } + + if !mentor_nonprofit_linkage.is_empty() { + services + .storage_layer + .batch_link_mentors_to_nonprofits( + project_cycle_id, + mentor_nonprofit_linkage, + &mut exec_opts, + ) + .await?; + } + + if !volunteer_mentee_linkage.is_empty() { + services + .storage_layer + .batch_link_volunteers_to_mentors( + project_cycle_id, + volunteer_mentee_linkage, + &mut exec_opts, + ) + .await?; + } + + // Update this job's project_cycle_id now that the project cycle exists + services + .storage_layer + .set_job_project_cycle(params.job_id, project_cycle_id, &mut exec_opts) + .await?; + + tx.commit().await?; + Ok(()) +} + +pub async fn import_task(services: &ImportServices, params: &ImportParams) -> Result<()> { + let data = collect_import_base_data(services, params).await?; + + match store_base_data(services, data, params).await { + Ok(_) => { + services + .storage_layer + .mark_job_complete(params.job_id, &mut ExecOptsBuilder::default().build()?) + .await? + } + Err(e) => { + services + .storage_layer + .mark_job_errored( + params.job_id, + e.to_string(), + &mut ExecOptsBuilder::default().build()?, + ) + .await?; + } + }; + + Ok(()) +} diff --git a/src/app/api/v1/data_imports/controllers.rs b/src/app/api/v1/data_imports/controllers.rs index 3fc8a19..5579d5f 100644 --- a/src/app/api/v1/data_imports/controllers.rs +++ b/src/app/api/v1/data_imports/controllers.rs @@ -8,7 +8,8 @@ use axum::Json; use chrono::Utc; use tokio::task; -use super::{import_airtable_base_task, ImportAirtableBaseTaskParams}; +use super::ImportServices; +use crate::app::api::v1::data_imports::airtable::{import_task, ImportParams}; use crate::app::api::v1::data_imports::requests::ImportAirtableBase; use crate::app::api::v1::data_imports::responses::AvailableBases; use crate::app::api_response; @@ -53,13 +54,12 @@ pub async fn list_available_airtable_bases( ) ] pub async fn import_airtable_base( - State(ctx): State>, + State(services): State, Path(base_id): Path, Json(payload): Json, ) -> Result { - let storage_layer = &ctx.storage_layer; - let airtable = &ctx.airtable; - let nats = &ctx.nats; + let storage_layer = &services.storage_layer; + let airtable = &services.airtable; let current_time = Utc::now(); let time_only = current_time.format("%H:%M:%S").to_string(); @@ -84,21 +84,16 @@ pub async fn import_airtable_base( log::info!("Started import job {job_id}"); - let mut subscriber = nats.subscribe(format!("pantheon.import.cancel.{}", job_id)).await?; - subscriber.unsubscribe_after(1).await?; + let params = ImportParams { + name: payload.name, + description: payload.description, + job_id, + base_id: base_id.clone(), + tables: schema.tables, + }; task::spawn(async move { - let _ = import_airtable_base_task( - ctx, - ImportAirtableBaseTaskParams { - name: payload.name, - description: payload.description, - base_id, - job_id, - subscriber, - }, - ) - .await; + let _ = import_task(&services, ¶ms).await; }); Ok(api_response::success( diff --git a/src/app/api/v1/data_imports/import_airtable_base.rs b/src/app/api/v1/data_imports/import_airtable_base.rs deleted file mode 100644 index 74f1e91..0000000 --- a/src/app/api/v1/data_imports/import_airtable_base.rs +++ /dev/null @@ -1,798 +0,0 @@ -// ╭─────────────────────────────────────────────────────────╮ -// │ Importing an Airtable base │ -// ╰─────────────────────────────────────────────────────────╯ -// ───────────────────────────────────────────────────────── -// @author Anish Sinha -// ───────────────────────────────────────────────────────── -// This file implements importing an airtable base according -// to the version 1 schema. -// - -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::Result; -use async_nats::Subscriber; -use derive_builder::Builder; -use futures::StreamExt; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use tokio::time; -use uuid::Uuid; - -use crate::app::state::Services; -use crate::services::airtable::base_data::records::responses::ListRecordsResponse; -use crate::services::airtable::base_data::records::ListRecordsQueryBuilder; -use crate::services::airtable::entities::record::Record; -use crate::services::storage::cycles::CreateCycleBuilder; -use crate::services::storage::jobs::UpdateJobStatus; -use crate::services::storage::mentors::CreateMentor; -use crate::services::storage::nonprofits::CreateNonprofit; -use crate::services::storage::types::{ - AgeRange, ClientSize, Ethnicity, Fli, Gender, ImpactCause, JobStatus, Lgbt, - MentorExperienceLevel, MentorYearsExperience, StudentStage, VolunteerHearAbout, -}; -use crate::services::storage::volunteers::CreateVolunteer; -use crate::services::storage::ExecOptsBuilder; - -/// This fetches nonprofits from Develop for Good's Airtable. -/// -/// * `ctx`: Copy of app context -/// * `base_id`: The base to fetch from -/// -/// TODO: Refactor the specific view name to a more generic one (Finalized Sum24 Nonprofit Projects -> Finalized Nonprofit Projects) -async fn fetch_nonprofits(ctx: Arc, base_id: &str) -> Result>> { - let airtable = &ctx.airtable; - - let mut query_opts = ListRecordsQueryBuilder::default() - .fields( - [ - "OrgName", - "ProjectName", - "OrgWebsite", - "FirstName", - "LastName", - "JobTitle", - "NonprofitEmail", - "Phone", - "CountryHQ", - "StateHQ", - "Address", - "Size", - "ImpactCauses", - ] - .iter() - .map(ToString::to_string) - .collect::>(), - ) - .view("Finalized Sum24 Nonprofit Projects".to_owned()) - .build()?; - - let mut nonprofits = Vec::>::with_capacity(50); - - loop { - let ListRecordsResponse { ref mut records, offset } = - airtable.list_records(base_id, "Nonprofits", Some(query_opts.clone())).await?; - - nonprofits.append(records); - - if let Some(new_offset) = offset { - query_opts.offset = Some(new_offset); - } else { - break; - } - } - - Ok(nonprofits) -} - -#[derive(Builder, Deserialize, Serialize, Clone)] -#[serde(rename_all = "PascalCase")] -pub struct IntermediateNonprofitData { - #[builder(setter(into))] - #[serde(rename = "FirstName")] - pub representative_first_name: String, - #[builder(setter(into))] - #[serde(rename = "LastName")] - pub representative_last_name: String, - #[builder(setter(into))] - #[serde(rename = "JobTitle")] - pub representative_job_title: String, - #[builder(setter(into))] - #[serde(rename = "NonprofitEmail")] - pub email: String, - #[serde(rename = "Cc")] - #[builder(setter(into), default = "None")] - pub email_cc: Option, - #[builder(setter(into))] - pub phone: String, - #[builder(setter(into))] - pub org_name: String, - #[builder(setter(into))] - pub project_name: String, - #[builder(setter(into), default = "None")] - pub org_website: Option, - #[serde(rename = "CountryHQ")] - #[builder(setter(into), default = "None")] - pub country_hq: Option, - #[serde(rename = "StateHQ")] - #[builder(setter(into), default = "None")] - pub us_state_hq: Option, - #[builder(setter(into))] - pub address: String, - pub size: ClientSize, - pub impact_causes: Option>, -} - -impl From for CreateNonprofit { - fn from(value: IntermediateNonprofitData) -> Self { - CreateNonprofit { - representative_first_name: value.representative_first_name, - representative_last_name: value.representative_last_name, - representative_job_title: value.representative_job_title, - email: value.email, - email_cc: value.email_cc, - phone: value.phone, - org_name: value.org_name, - project_name: value.project_name, - org_website: value.org_website, - country_hq: value.country_hq, - us_state_hq: value.us_state_hq, - address: value.address, - size: value.size, - impact_causes: value - .impact_causes - .unwrap_or(vec!["Other".to_owned()]) - .iter() - .map(|c| match c.as_str() { - "reco1zHRYv8lTQDaI" => ImpactCause::Animals, - "recXhhTPsuQ2PMjU4" => ImpactCause::CareerAndProfessionalDevelopment, - "recvWKilRRABCcHuI" => ImpactCause::DisasterRelief, - "recYfRNFDpm2nedjM" => ImpactCause::Education, - "recOlWiJTppnQwnll" => ImpactCause::EnvironmentAndSustainability, - "recix0Y5qCXYfZGRz" => ImpactCause::FaithAndReligion, - "recKs8kboTORruStC" => ImpactCause::HealthAndMedicine, - "recEmtYMgeOlPeOVQ" => ImpactCause::GlobalRelations, - "reczSSbvdW2NoOX2p" => ImpactCause::PovertyAndHunger, - "rec5dt6EVyUeIaCR7" => ImpactCause::SeniorServices, - "recMt9349gwuRAQXf" => ImpactCause::JusticeAndEquity, - "rec8cH6YTQMeYqXUh" => ImpactCause::VeteransAndMilitaryFamilies, - _ => ImpactCause::Other, - }) - .collect(), - } - } -} - -// reco1zHRYv8lTQDaI - -fn process_raw_nonprofits(nonprofits: Vec>) -> Result> { - let create_nonprofit_data = nonprofits - .iter() - .filter_map(|n| { - match serde_json::from_value::(n.fields.clone()) { - Ok(data) => Some(data), - Err(e) => { - log::error!("{}", e); - None - } - } - }) - .map(CreateNonprofit::from) - .collect::>(); - - log::info!("Processed {} nonprofits", create_nonprofit_data.len()); - Ok(create_nonprofit_data) -} - -/// This fetches volunteers from Develop for Good's Airtable. -/// -/// * `ctx`: Copy of app context -/// * `base_id`: The base to fetch from -async fn fetch_volunteers(ctx: Arc, base_id: &str) -> Result>> { - let airtable = &ctx.airtable; - - let fields = [ - "FirstName", - "LastName", - "OrgName (from ProjectRecordID)", - "Email", - "Phone", - "Gender", - "Ethnicity", - "AgeRange", - "University", - "LGBT", - "Country", - "State", - "FLI", - "StudentStage", - "Majors", - "Minors", - "HearAbout", - ] - .iter() - .map(ToString::to_string) - .collect::>(); - - let mut query_opts = ListRecordsQueryBuilder::default() - .fields(fields) - .view("All Committed Student Volunteers - Active".to_owned()) - .build()?; - - let mut volunteers = Vec::>::with_capacity(500); - - loop { - let ListRecordsResponse { ref mut records, offset } = - airtable.list_records(base_id, "Volunteers", Some(query_opts.clone())).await?; - volunteers.append(records); - if let Some(new_offset) = offset { - query_opts.offset = Some(new_offset); - } else { - break; - } - } - Ok(volunteers) -} - -/// Container for holding processed volunteer data. -/// -/// * `volunteers`: Data for creating volunteers as records in the database -/// * `linkage`: The linkage between volunteers and nonprofits -struct ProcessedVolunteers { - create_volunteer_data: Vec, - volunteer_nonprofit_linkage: Vec<(String, String)>, -} - -#[derive(Debug, Builder, Serialize, Deserialize, Clone)] -#[serde(rename_all = "PascalCase")] -struct IntermediateVolunteerData { - #[builder(setter(into))] - pub first_name: String, - #[builder(setter(into))] - pub last_name: String, - #[builder(setter(into))] - pub email: String, - #[builder(setter(into), default = "None")] - pub phone: Option, - #[builder(default = "Gender::PreferNotToSay")] - #[serde(rename = "Gender")] - pub volunteer_gender: Gender, - #[builder(default = "vec![Ethnicity::PreferNotToSay]")] - #[serde(rename = "Ethnicity")] - pub volunteer_ethnicity: Vec, - #[builder(default = "AgeRange::R18_24")] - #[serde(rename = "AgeRange")] - pub volunteer_age_range: AgeRange, - #[builder(default = "vec![]")] - pub university: Vec, - #[serde(rename = "LGBT")] - pub lgbt: Lgbt, - pub country: String, - #[builder(setter(into), default = "None")] - #[serde(rename = "State")] - pub us_state: Option, - #[builder(default = "vec![Fli::PreferNotToSay]")] - #[serde(rename = "FLI")] - pub fli: Vec, - pub student_stage: StudentStage, - pub majors: String, - pub minors: Option, - pub hear_about: Vec, -} - -impl From for CreateVolunteer { - fn from(value: IntermediateVolunteerData) -> Self { - CreateVolunteer { - first_name: value.first_name, - last_name: value.last_name, - email: value.email, - phone: value.phone, - volunteer_gender: value.volunteer_gender, - volunteer_ethnicity: value.volunteer_ethnicity, - volunteer_age_range: value.volunteer_age_range, - university: value.university, - lgbt: value.lgbt, - country: value.country, - us_state: value.us_state, - fli: value.fli, - student_stage: value.student_stage, - majors: value.majors.split(",").map(|m| m.trim().to_string()).collect(), - minors: value - .minors - .unwrap_or_default() - .split(",") - .map(|m| m.trim().to_string()) - .collect(), - hear_about: value.hear_about, - } - } -} - -/// Processes the raw volunteer data from Airtable (received as records) and retrieves information -/// to create volunteers in the database as well as linkage information between volunteers and -/// nonprofits -/// -/// * `volunteers`: The raw volunteer data -fn process_raw_volunteers(volunteers: Vec>) -> Result { - let mut volunteer_nonprofit_linkage = Vec::<(String, String)>::with_capacity(500); - - let create_volunteer_data = volunteers - .iter() - .filter_map(|v| { - match ( - &v.fields["OrgName (from ProjectRecordID)"], - serde_json::from_value::(v.fields.clone()), - ) { - (Value::Array(clients), Ok(data)) => { - for client in clients { - if let Some(org_name) = client.as_str() { - volunteer_nonprofit_linkage - .push((data.email.clone(), org_name.to_owned())); - } - } - Some(data) - } - (_, d) => { - dbg!(&d); - None - } - } - }) - .map(CreateVolunteer::from) - .collect::>(); - - log::info!("Processed {} mentors", create_volunteer_data.len()); - Ok(ProcessedVolunteers { create_volunteer_data, volunteer_nonprofit_linkage }) -} - -/// This fetches mentors from Develop for Good's Airtable. -/// -/// * `ctx`: Copy of the app context -/// * `base_id`: The base to fetch from -async fn fetch_mentors(ctx: Arc, base_id: &str) -> Result>> { - let airtable = &ctx.airtable; - - let fields = [ - "FirstName", - "LastName", - "Email", - "Phone", - "OfferLetterSignature", - "Company", - "JobTitle", - "ProjectRole", - "OrgName (from ProjectRecordID)", - "Country", - "State", - "YearsExperience", - "ExperienceLevel", - "PriorMentorship", - "PriorDFG", - "University", - "HearAbout", - ] - .iter() - .map(ToString::to_string) - .collect::>(); - - let mut query_opts = ListRecordsQueryBuilder::default() - .fields(fields) - .view("All Committed Mentor Volunteers".to_owned()) - .build()?; - - let mut mentors = Vec::>::with_capacity(50); - - loop { - let ListRecordsResponse { ref mut records, offset } = - airtable.list_records(base_id, "Volunteers", Some(query_opts.clone())).await?; - mentors.append(records); - if let Some(new_offset) = offset { - query_opts.offset = Some(new_offset); - } else { - break; - } - } - - Ok(mentors) -} - -/// Container for holding processed mentor data. -/// -/// * `mentors`: Data for creating mentors as records in the database -/// * `linkage`: The linkage between mentors and nonprofits -struct ProcessedMentors { - create_mentor_data: Vec, - mentor_nonprofit_linkage: Vec<(String, Vec)>, -} - -#[derive(Builder, Serialize, Deserialize, Clone)] -#[serde(rename_all = "PascalCase")] -struct IntermediateMentorData { - // pub project_cycle_id: Uuid, - #[builder(setter(into))] - pub first_name: String, - #[builder(setter(into))] - pub last_name: String, - #[builder(setter(into))] - pub email: String, - #[builder(setter(into))] - pub phone: String, - #[builder(setter(into))] - pub company: String, - #[builder(setter(into))] - pub job_title: String, - #[builder(setter(into))] - pub country: String, - #[builder(setter(into), default = "None")] - pub us_state: Option, - #[builder(setter(into))] - pub years_experience: String, - #[builder(setter(into))] - pub experience_level: MentorExperienceLevel, - pub prior_mentorship: Vec, - #[serde(rename = "PriorDFG")] - pub prior_dfg: Option>, - pub university: Option>, - pub hear_about: Option>, -} - -// impl TryFrom for CreateMentor { -// type Error = anyhow::Error; -// -// fn try_from(value: IntermediateMentorData) -> std::result::Result { -// -// } -// } - -impl From for CreateMentor { - fn from(value: IntermediateMentorData) -> Self { - CreateMentor { - first_name: value.first_name, - last_name: value.last_name, - email: value.email, - phone: value.phone, - company: value.company, - job_title: value.job_title, - country: value.country, - us_state: value.us_state, - years_experience: match value.years_experience.as_str() { - "2-5" => MentorYearsExperience::R2_5, - "6-10" => MentorYearsExperience::R6_10, - "11-15" => MentorYearsExperience::R11_15, - "16-20" => MentorYearsExperience::R16_20, - _ => MentorYearsExperience::R21Plus, - }, - experience_level: value.experience_level, - prior_mentor: value.prior_mentorship.contains(&"Yes, I've been a mentor".to_owned()), - prior_mentee: value.prior_mentorship.contains(&"Yes, I've been a mentee".to_owned()), - // prior_student: !value.prior_dfg.contains(&"No".to_owned()), - prior_student: value.prior_dfg.unwrap_or_default().contains(&"Yes".to_owned()), - university: value.university.unwrap_or_default(), - hear_about: value.hear_about.unwrap_or(vec![VolunteerHearAbout::Other]), - } - } -} - -/// Processes the raw mentor data from Airtable (received as records) and retrieves information -/// to create mentors in the database as well as linkage information between mentors and -/// nonprofits -/// -/// * `mentors`: The raw mentor data -fn process_raw_mentors(mentors: Vec>) -> Result { - let mut mentor_nonprofit_linkage = Vec::<(String, Vec)>::with_capacity(50); - - mentors.iter().for_each(|mentor| { - if let (Some(email), Some(Value::Array(clients)), Some(Value::Array(project_roles))) = ( - mentor.fields.get("Email").and_then(|v| v.as_str().map(String::from)), - mentor.fields.get("OrgName (from ProjectRecordID)"), - mentor.fields.get("ProjectRole"), - ) { - let nonprofit_names = clients - .iter() - .filter_map(|project| project.as_str().map(String::from)) - .collect::>(); - - let role_names = project_roles - .iter() - .filter_map(|role| role.as_str().map(String::from)) - .collect::>(); - - if role_names.contains(&"Team Mentor".to_owned()) { - mentor_nonprofit_linkage.push((email, nonprofit_names)); - } - } - }); - - let create_mentor_data = mentors - .iter() - .filter_map(|m| match serde_json::from_value::(m.fields.clone()) { - Ok(data) => Some(data), - Err(e) => { - log::error!("{m:?}"); - log::error!("{}", e); - None - } - }) - .map(CreateMentor::from) - .collect::>(); - - log::info!("Processed {} mentors", create_mentor_data.len()); - Ok(ProcessedMentors { create_mentor_data, mentor_nonprofit_linkage }) -} - -async fn fetch_mentor_mentee_linkage( - ctx: Arc, - base_id: &str, -) -> Result> { - let mut query_opts = ListRecordsQueryBuilder::default() - .fields(vec!["Email".to_owned(), "Mentee Email (from Volunteers)".to_owned()]) - .view("All Committed Mentor Volunteers - 1:1 Mentor-Mentee Pairings".to_owned()) - .build()?; - let mut mentor_mentee_linkage = Vec::<(String, String)>::with_capacity(50); - - loop { - let ListRecordsResponse { ref mut records, offset } = - ctx.airtable.list_records(base_id, "Volunteers", Some(query_opts.clone())).await?; - - for record in records.iter() { - if let (Some(email), Some(Value::Array(mentee_emails))) = ( - record.fields.get("Email").and_then(|v| v.as_str().map(String::from)), - record.fields.get("Mentee Email (from Volunteers)"), - ) { - mentee_emails.iter().for_each(|mentee_email| { - if let Some(mentee_email) = mentee_email.as_str() { - mentor_mentee_linkage.push((email.clone(), mentee_email.to_owned())); - } - }); - } - } - - if let Some(new_offset) = offset { - query_opts.offset = Some(new_offset); - } else { - break; - } - } - Ok(mentor_mentee_linkage) -} - -struct ImportBaseData { - name: String, - description: String, - nonprofits: Vec, - volunteers: Vec, - volunteer_nonprofit_linkage: Vec<(String, String)>, - mentors: Vec, - mentor_nonprofit_linkage: Vec<(String, Vec)>, - mentor_mentee_linkage: Vec<(String, String)>, -} - -async fn collect_import_base_data( - ctx: Arc, - base_id: &str, - name: &str, - description: &str, -) -> Result { - let nonprofits = fetch_nonprofits(ctx.clone(), base_id).await?; - let create_nonprofit_data = process_raw_nonprofits(nonprofits)?; - - let volunteers = fetch_volunteers(ctx.clone(), base_id).await?; - let ProcessedVolunteers { create_volunteer_data, volunteer_nonprofit_linkage } = - process_raw_volunteers(volunteers)?; - - let mentors = fetch_mentors(ctx.clone(), base_id).await?; - let ProcessedMentors { create_mentor_data, mentor_nonprofit_linkage } = - process_raw_mentors(mentors)?; - - let mentor_mentee_linkage = fetch_mentor_mentee_linkage(ctx.clone(), base_id).await?; - - Ok(ImportBaseData { - name: name.to_owned(), - description: description.to_owned(), - nonprofits: create_nonprofit_data, - volunteers: create_volunteer_data, - volunteer_nonprofit_linkage, - mentors: create_mentor_data, - mentor_nonprofit_linkage, - mentor_mentee_linkage, - }) -} - -async fn store_base_data(ctx: Arc, data: ImportBaseData, job_id: Uuid) -> Result<()> { - let storage_layer = &ctx.storage_layer; - let mut tx = storage_layer.acquire().await?; - let mut exec_opts = ExecOptsBuilder::default().tx(&mut tx).build()?; - - let project_cycle_id = storage_layer - .create_cycle( - CreateCycleBuilder::default().name(data.name).description(data.description).build()?, - &mut exec_opts, - ) - .await?; - - let nonprofits = storage_layer - .batch_create_nonprofits(project_cycle_id, data.nonprofits, &mut exec_opts) - .await?; - - let volunteers = storage_layer - .batch_create_volunteers(project_cycle_id, data.volunteers, &mut exec_opts) - .await?; - - let mentors = - storage_layer.batch_create_mentors(project_cycle_id, data.mentors, &mut exec_opts).await?; - - let nonprofit_name_id_map = HashMap::::from_iter(nonprofits); - let volunteer_email_id_map = HashMap::::from_iter(volunteers); - let mentor_email_id_map = HashMap::::from_iter(mentors); - - let volunteer_nonprofit_linkage = data - .volunteer_nonprofit_linkage - .iter() - .filter_map(|(email, org_name)| { - let volunteer_id = volunteer_email_id_map.get(email)?; - let nonprofit_id = nonprofit_name_id_map.get(org_name)?; - Some((*volunteer_id, *nonprofit_id)) - }) - .collect::>(); - - let mentor_nonprofit_linkage = data - .mentor_nonprofit_linkage - .iter() - .filter_map(|(email, org_names)| { - let mentor_id = mentor_email_id_map.get(email)?; - let nonprofit_ids = org_names - .iter() - .filter_map(|org_name| nonprofit_name_id_map.get(org_name)) - .cloned() - .collect::>(); - Some((*mentor_id, nonprofit_ids)) - }) - .flat_map(|(mentor_id, nonprofit_ids)| { - nonprofit_ids - .iter() - .map(|nonprofit_id| (mentor_id, *nonprofit_id)) - .collect::>() - }) - .collect::>(); - - let volunteer_mentee_linkage = data - .mentor_mentee_linkage - .iter() - .filter_map(|(mentor_email, mentee_email)| { - let mentor_id = mentor_email_id_map.get(mentor_email); - let mentee_id = volunteer_email_id_map.get(mentee_email); - if mentor_id.is_none() || mentee_id.is_none() { - // log::info!( - // "mentor_email: {:?}, mentee_email: {:?}", - // mentor_email, - // mentee_email - // ); - } - Some((*(mentee_id?), *(mentor_id?))) - }) - .collect::>(); - - if !volunteer_nonprofit_linkage.is_empty() { - storage_layer - .batch_link_volunteers_to_nonprofits( - project_cycle_id, - volunteer_nonprofit_linkage, - &mut exec_opts, - ) - .await?; - } - - if !mentor_nonprofit_linkage.is_empty() { - storage_layer - .batch_link_mentors_to_nonprofits( - project_cycle_id, - mentor_nonprofit_linkage, - &mut exec_opts, - ) - .await?; - } - - if !volunteer_mentee_linkage.is_empty() { - storage_layer - .batch_link_volunteers_to_mentors( - project_cycle_id, - volunteer_mentee_linkage, - &mut exec_opts, - ) - .await?; - } - - // Update this job's project_cycle_id now that the project cycle exists - storage_layer.set_job_project_cycle(job_id, project_cycle_id, &mut exec_opts).await?; - - // let update_data = UpdateJobStatus { status: JobStatus::Complete, error: None }; - // storage_layer.update_job_status(job_id, update_data, &mut exec_opts).await?; - - tx.commit().await?; - Ok(()) -} - -struct RunTaskParams { - pub name: String, - pub description: String, - pub job_id: Uuid, - pub base_id: String, -} - -async fn run_task(ctx: Arc, params: RunTaskParams) -> Result<()> { - // NOTE: Uncomment the following two lines to give yourself enough time to cancel the task - // log::info!("Sleeping for 100 seconds"); - // time::sleep(Duration::from_secs(100)).await; - - let data = - collect_import_base_data(ctx.clone(), ¶ms.base_id, ¶ms.name, ¶ms.description) - .await?; - - store_base_data(ctx.clone(), data, params.job_id).await?; - Ok(()) -} - -pub(super) struct ImportTaskParams { - pub name: String, - pub description: String, - pub job_id: Uuid, - pub base_id: String, - pub subscriber: Subscriber, -} - -impl From for RunTaskParams { - fn from(value: ImportTaskParams) -> Self { - RunTaskParams { - name: value.name, - description: value.description, - job_id: value.job_id, - base_id: value.base_id, - } - } -} - -/// This function runs a full import of an Airtable base asynchronously. It is cancellable at any -/// point before the data is fetched from airtable or at any point before -/// [run_task_with_cancellation] acquires a write lock on the cancellable tasks map. It will also -/// time out if the task takes more than two minutes to complete. -/// -/// * `ctx`: A copy of the app context -/// * `params`: The parameters for the import task [ImportTaskParams] -pub async fn import_task(ctx: Arc, mut params: ImportTaskParams) -> Result<()> { - let storage_layer = &ctx.storage_layer; - let job_id = params.job_id; - - let run_params = RunTaskParams { - name: params.name.clone(), - description: params.description.clone(), - job_id, - base_id: params.base_id.clone(), - }; - - tokio::select! { - _ = params.subscriber.next() => { - storage_layer.cancel_job(job_id, &mut ExecOptsBuilder::default().build()?).await?; - } - res = run_task(ctx.clone(), run_params) => { - match res { - Ok(_) => { - let data = UpdateJobStatus { status: JobStatus::Complete, error: None }; - storage_layer.update_job_status(job_id, data, &mut ExecOptsBuilder::default().build()?).await?; - }, - Err(e) => { - let data = UpdateJobStatus { status: JobStatus::Error, error: Some(e.to_string()) }; - storage_layer.update_job_status(job_id, data, &mut ExecOptsBuilder::default().build()?).await?; - }, - }; - } - () = time::sleep(Duration::from_secs(600)) => { - // NOTE: A request times out after 10 minutes. An export task really shouldn't take - // this long. - log::warn!("request timed out"); - } - }; - - Ok(()) -} diff --git a/src/app/api/v1/data_imports/mod.rs b/src/app/api/v1/data_imports/mod.rs index 517b9a9..7c00d5a 100644 --- a/src/app/api/v1/data_imports/mod.rs +++ b/src/app/api/v1/data_imports/mod.rs @@ -1,15 +1,13 @@ +mod airtable; mod controllers; -mod import_airtable_base; mod requests; mod responses; use std::sync::Arc; +use axum::extract::FromRef; use axum::middleware::from_fn_with_state; use axum::{routing, Router}; -use import_airtable_base::{ - import_task as import_airtable_base_task, ImportTaskParams as ImportAirtableBaseTaskParams, -}; use requests::ImportAirtableBase; use utoipa::OpenApi; @@ -23,6 +21,17 @@ use crate::app::state::Services; )] pub struct DataImportsApi; +pub struct ImportServices { + pub storage_layer: Arc, + pub airtable: Arc, +} + +impl FromRef> for ImportServices { + fn from_ref(ctx: &Arc) -> Self { + Self { storage_layer: ctx.storage_layer.clone(), airtable: ctx.airtable.clone() } + } +} + pub async fn build(ctx: Arc) -> Router<()> { let guard1 = make_rbac(vec!["read:available-bases".to_owned()]).await; diff --git a/src/app/api/v1/jobs/controllers.rs b/src/app/api/v1/jobs/controllers.rs index 62fb65e..e7a368d 100644 --- a/src/app/api/v1/jobs/controllers.rs +++ b/src/app/api/v1/jobs/controllers.rs @@ -1,10 +1,7 @@ use std::sync::Arc; -use axum::extract::{Path, State}; -use axum::http::StatusCode; -use axum::response::{IntoResponse, Response}; +use axum::extract::State; use axum::Json; -use uuid::Uuid; use crate::app::api::v1::jobs::responses::{Job, JobsResponse}; use crate::app::errors::AppError; @@ -48,27 +45,3 @@ pub async fn fetch_jobs(State(ctx): State>) -> Result>, - Path(job_id): Path, -) -> Result { - let nats = &ctx.nats; - let subject = format!("pantheon.export.cancel.{job_id}"); - nats.publish(subject, "".into()).await?; - log::info!("Published message to cancel job: {job_id}"); - Ok((StatusCode::OK).into_response()) -} diff --git a/src/app/api/v1/jobs/mod.rs b/src/app/api/v1/jobs/mod.rs index 282798e..d76a923 100644 --- a/src/app/api/v1/jobs/mod.rs +++ b/src/app/api/v1/jobs/mod.rs @@ -14,7 +14,6 @@ use crate::app::state::Services; #[openapi( paths( controllers::fetch_jobs, - controllers::cancel_job, ), security(("http" = ["JWT"])) )] @@ -24,11 +23,9 @@ pub async fn build(ctx: Arc) -> Router<()> { let guard1 = make_rbac(vec!["read:jobs".to_owned()]).await; let fetch_jobs = routing::get(controllers::fetch_jobs); - let cancel_job = routing::post(controllers::cancel_job); Router::new() .route("/", fetch_jobs) - .route("/cancel/:job_id", cancel_job) .route_layer(from_fn_with_state(ctx.clone(), guard1)) .with_state(ctx.clone()) } diff --git a/src/app/state/mod.rs b/src/app/state/mod.rs index 2f85291..ff08505 100644 --- a/src/app/state/mod.rs +++ b/src/app/state/mod.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use async_nats::client::Client; use derive_builder::Builder; use sqlx::{Database, Postgres}; @@ -17,7 +16,6 @@ pub struct Services { pub storage_layer: Arc>, pub airtable: Arc, pub workspace: Arc, - pub nats: Client, pub mail: Arc, } @@ -30,7 +28,6 @@ impl Services { storage: {}, workspace: {}, mail: {} - nats: [default nats service] }}", self.authenticator.get_id(), self.airtable.get_id(), diff --git a/src/cli.rs b/src/cli.rs index db8f15b..8c567a4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,4 +1,4 @@ -//! This module defines the command line interface to Pantheon. +//! This module defines the command line interface to Scipio. use std::sync::Arc; @@ -19,6 +19,13 @@ use crate::services::workspace::noop::NoopWorkspaceClient; use crate::services::workspace::service_account::ServiceAccountWorkspaceClient; use crate::services::workspace::WorkspaceService; +#[derive(ValueEnum, Serialize, Debug, Clone)] +#[serde(rename_all = "kebab-case")] +pub enum LaunchMode { + Development, + Production, +} + #[derive(ValueEnum, Serialize, Debug, Clone)] #[serde(rename_all = "kebab-case")] pub enum AuthServiceImpl { @@ -72,7 +79,6 @@ pub enum WorkspaceServiceImpl { /// /// * `sendgrid_api_key`: The Sendgrid API key /// -/// * `nats_url`: The URL of the NATS server to connect to #[derive(Parser, Debug)] pub struct Args { #[arg(long, env, default_value = "http://localhost")] @@ -80,6 +86,9 @@ pub struct Args { #[arg(long, env, default_value = "8888")] pub port: String, + #[arg(long,env,value_enum,default_value_t=LaunchMode::Production)] + pub launch_mode: LaunchMode, + #[arg(long, env, value_enum, default_value_t = AuthServiceImpl::Auth0)] pub auth_service: AuthServiceImpl, #[arg(long, env)] @@ -98,8 +107,6 @@ pub struct Args { pub workspace_private_key: String, #[arg(long, env, default_value = "https://oauth2.googleapis.com/token")] pub workspace_token_url: String, - #[arg(long, env)] - pub workspace_service_account: String, #[arg(long, env)] pub airtable_api_token: String, @@ -111,9 +118,6 @@ pub struct Args { pub mail_service: MailServiceImpl, #[arg(long, env)] pub sendgrid_api_key: Option, - - #[arg(long, env, default_value = "nats://localhost:4222")] - pub nats_url: String, } impl Args { @@ -168,10 +172,6 @@ impl Args { Ok(Arc::new(PgBackend::new(&self.database_url).await?)) } - async fn init_nats_client(&self) -> Result { - Ok(async_nats::connect(&self.nats_url).await?) - } - pub async fn init_services(&self) -> Result> { Ok(Arc::new( ServicesBuilder::default() @@ -179,7 +179,6 @@ impl Args { .storage_layer(self.init_storage_service().await?) .airtable(self.init_airtable_service()?) .workspace(self.init_workspace_service()?) - .nats(self.init_nats_client().await?) .mail(self.init_mail_service()?) .build()?, )) diff --git a/src/main.rs b/src/main.rs index 71050ee..c80ca48 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,10 +24,6 @@ //! - An Airtable client. This is a custom service that interacts with the Airtable API. This is //! unlikely to change, unless Airtable dramatically changes their API. However, this service is //! also swappable as long as it implements `AirtableClient`. -//! - A NATS client. This is used for pub/sub messaging between services. Currently, NATS is a hard -//! dependency and there is no underlying abstraction for this service. That's because I don't -//! know nearly enough about pub/sub messaging to make a good abstraction. If you do, please feel -//! free to do it yourself. //! // ╭───────────────────────────────────────────────────────────────────────────────────────────────────╮ diff --git a/src/services/airtable/base_data/bases/responses.rs b/src/services/airtable/base_data/bases/responses.rs index eb9edab..4b12c55 100644 --- a/src/services/airtable/base_data/bases/responses.rs +++ b/src/services/airtable/base_data/bases/responses.rs @@ -109,6 +109,11 @@ impl V1SchemaValidator for SchemaResponse { .contains(required_view) }); + let n_has_required_dynamic_view = n.views.iter().any(|v| { + v.name.starts_with(&"Finalized".to_owned()) + && v.name.ends_with(&"Nonprofit Projects".to_owned()) + }); + let n_has_required_fields = REQUIRED_NONPROFIT_TABLE_FIELDS.iter().all(|required_field| { n.fields @@ -130,6 +135,7 @@ impl V1SchemaValidator for SchemaResponse { && n_has_required_fields && v_has_required_views && n_has_required_views + && n_has_required_dynamic_view } (_, _) => false, } diff --git a/src/services/mod.rs b/src/services/mod.rs index d71947f..c7ad085 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,7 +1,6 @@ pub mod airtable; pub mod auth; pub mod mail; -pub mod pubsub; pub mod storage; pub mod url; pub mod workspace; diff --git a/src/services/pubsub/mod.rs b/src/services/pubsub/mod.rs deleted file mode 100644 index 7faa013..0000000 --- a/src/services/pubsub/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -//! PubSub service module. -//! -//! This module should define a `PubSubClient` trait and at least one implementation. Currently, -//! there is none so we have a hard dependency on NATS. This should be abstracted out so that we -//! can swap services if needed. - -#[allow(unused)] -pub trait PubSubClient {} diff --git a/src/services/storage/entities.rs b/src/services/storage/entities.rs index 56cf23c..62540c0 100644 --- a/src/services/storage/entities.rs +++ b/src/services/storage/entities.rs @@ -362,3 +362,17 @@ pub struct BasicStats { pub num_nonprofits: i64, pub num_mentors: i64, } + +#[derive(FromRow, Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ExportedVolunteerDetails { + pub id: Uuid, + pub created_at: DateTime, + pub updated_at: Option>, + pub volunteer_id: Uuid, + pub workspace_email: String, + pub org_unit: String, + pub job_id: Uuid, + pub project_cycle_id: Uuid, + pub status: JobStatus, +} diff --git a/src/services/storage/jobs.rs b/src/services/storage/jobs.rs index 65002b2..6e16264 100644 --- a/src/services/storage/jobs.rs +++ b/src/services/storage/jobs.rs @@ -99,6 +99,19 @@ pub trait QueryJobs { unimplemented!() } + async fn mark_job_complete(&self, id: Uuid, exec_opts: &mut ExecOpts) -> Result<()> { + unimplemented!() + } + + async fn mark_job_errored( + &self, + id: Uuid, + error: String, + exec_opts: &mut ExecOpts, + ) -> Result<()> { + unimplemented!() + } + /// Set the project cycle that a job is associated with. /// /// This may be useful if a job is started to import data for a project cycle. The job is @@ -241,4 +254,37 @@ impl QueryJobs for PgBackend { } exec_with_tx!(self, exec_opts, exec, id) } + + async fn mark_job_complete(&self, id: Uuid, exec_opts: &mut ExecOpts) -> Result<()> { + async fn exec(id: Uuid, tx: &mut Transaction<'_, Postgres>) -> Result<()> { + let query = include_str!("queries/jobs/update_job_status.sql"); + sqlx::query(query) + .bind(id) + .bind(JobStatus::Complete) + .bind(Option::::None) + .execute(&mut **tx) + .await?; + Ok(()) + } + exec_with_tx!(self, exec_opts, exec, id) + } + + async fn mark_job_errored( + &self, + id: Uuid, + error: String, + exec_opts: &mut ExecOpts, + ) -> Result<()> { + async fn exec(id: Uuid, error: String, tx: &mut Transaction<'_, Postgres>) -> Result<()> { + let query = include_str!("queries/jobs/update_job_status.sql"); + sqlx::query(query) + .bind(id) + .bind(JobStatus::Error) + .bind(Some(error)) + .execute(&mut **tx) + .await?; + Ok(()) + } + exec_with_tx!(self, exec_opts, exec, id, error) + } } diff --git a/src/services/storage/queries/volunteers/fetch_exported_volunteer_details_by_project_cycle.sql b/src/services/storage/queries/volunteers/fetch_exported_volunteer_details_by_project_cycle.sql new file mode 100644 index 0000000..1a99669 --- /dev/null +++ b/src/services/storage/queries/volunteers/fetch_exported_volunteer_details_by_project_cycle.sql @@ -0,0 +1,14 @@ +select + id, + created_at, + updated_at, + volunteer_id, + workspace_email, + org_unit, + job_id, + project_cycle_id, + status +from + exported_volunteer_details +where + project_cycle_id = $1 diff --git a/src/services/storage/volunteers.rs b/src/services/storage/volunteers.rs index a03ad0b..e57e715 100644 --- a/src/services/storage/volunteers.rs +++ b/src/services/storage/volunteers.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use sqlx::{Database, Postgres, QueryBuilder, Transaction}; use uuid::Uuid; -use super::entities::VolunteerDetails; +use super::entities::{ExportedVolunteerDetails, VolunteerDetails}; use super::exec_with_tx; use super::types::{AgeRange, Ethnicity, Fli, Gender, Lgbt, StudentStage, VolunteerHearAbout}; use crate::services::storage::{Acquire, ExecOpts, PgBackend}; @@ -77,7 +77,7 @@ pub struct EditVolunteer { /// * `workspace_email`: The workspace email the volunteer has been issued /// * `org_unit`: Which Develop for Good Organizational Unit the volunteer has been exported to /// (usually "/Programs/PantheonUsers") -#[derive(Builder)] +#[derive(Builder, Clone)] pub struct InsertVolunteerExportedToWorkspace { pub volunteer_id: Uuid, pub job_id: Uuid, @@ -253,6 +253,14 @@ pub trait QueryVolunteers { ) -> Result<()> { unimplemented!() } + + async fn fetch_exported_volunteer_details_by_project_cycle( + &self, + project_cycle_id: Uuid, + exec_opts: &mut ExecOpts, + ) -> Result> { + unimplemented!() + } } #[async_trait] @@ -565,4 +573,26 @@ impl QueryVolunteers for PgBackend { exec_with_tx!(self, exec_opts, exec, data) } + + async fn fetch_exported_volunteer_details_by_project_cycle( + &self, + project_cycle_id: Uuid, + exec_opts: &mut ExecOpts, + ) -> Result> { + async fn exec( + project_cycle_id: Uuid, + tx: &mut Transaction<'_, Postgres>, + ) -> Result> { + let query = include_str!( + "queries/volunteers/fetch_exported_volunteer_details_by_project_cycle.sql" + ); + let volunteers = sqlx::query_as::<_, ExportedVolunteerDetails>(query) + .bind(project_cycle_id) + .fetch_all(&mut **tx) + .await?; + Ok(volunteers) + } + + exec_with_tx!(self, exec_opts, exec, project_cycle_id) + } } diff --git a/src/services/workspace/mod.rs b/src/services/workspace/mod.rs index 3086ad3..eecf346 100644 --- a/src/services/workspace/mod.rs +++ b/src/services/workspace/mod.rs @@ -70,6 +70,7 @@ pub trait WorkspaceClient: Send + Sync { /// header. This is a security measure and we are delegating authentication to Auth0. Never /// call this function with user provided input. This is one reason why we should try to find /// an alternative to the service account approach. + #[allow(unused)] async fn delete_user(&self, principal: &str, email_of_user_to_delete: &str) -> Result<()> { unimplemented!() }