From afdc18b3e7d2e5896bb5dcae98eacae53e4cff88 Mon Sep 17 00:00:00 2001 From: Lu Yang Date: Mon, 28 Oct 2024 21:56:29 +0000 Subject: [PATCH] add user creation web hook --- lapdev-api/src/session.rs | 17 +++ lapdev-api/src/workspace.rs | 11 ++ lapdev-common/src/lib.rs | 5 + lapdev-conductor/src/scheduler.rs | 5 +- lapdev-conductor/src/server.rs | 117 +++++++++++++----- lapdev-db/src/api.rs | 32 +++++ lapdev-db/src/entities/organization.rs | 1 + ...231130_151650_create_organization_table.rs | 9 +- lapdev-enterprise/src/auto_start_stop.rs | 19 ++- 9 files changed, 178 insertions(+), 38 deletions(-) diff --git a/lapdev-api/src/session.rs b/lapdev-api/src/session.rs index 8f10efe..c5e17e3 100644 --- a/lapdev-api/src/session.rs +++ b/lapdev-api/src/session.rs @@ -11,6 +11,7 @@ use chrono::Utc; use hyper::StatusCode; use lapdev_common::{ console::NewSessionResponse, AuditAction, AuditResourceKind, AuthProvider, ProviderUser, + UserCreationWebhook, }; use lapdev_db::entities; use lapdev_rpc::error::ApiError; @@ -311,6 +312,22 @@ pub(crate) async fn session_authorize( .await?; txn.commit().await?; + + { + let state = state.clone(); + let id = user.id; + tokio::spawn(async move { + let webhook = state.db.get_user_creation_webhook().await?; + reqwest::Client::builder() + .build()? + .post(webhook) + .json(&UserCreationWebhook { id }) + .send() + .await?; + anyhow::Ok(()) + }); + } + user } }; diff --git a/lapdev-api/src/workspace.rs b/lapdev-api/src/workspace.rs index ca6020f..5627d36 100644 --- a/lapdev-api/src/workspace.rs +++ b/lapdev-api/src/workspace.rs @@ -127,6 +127,17 @@ pub async fn delete_workspace( if ws.user_id != user.id { return Err(ApiError::Unauthorized); } + + if ws.status == WorkspaceStatus::PrebuildBuilding.to_string() + || ws.status == WorkspaceStatus::Building.to_string() + || ws.status == WorkspaceStatus::PrebuildCopying.to_string() + || ws.status == WorkspaceStatus::New.to_string() + { + return Err(ApiError::InvalidRequest( + "Can't delete workspace when it's building".to_string(), + )); + } + state .conductor .delete_workspace(ws, info.ip, info.user_agent) diff --git a/lapdev-common/src/lib.rs b/lapdev-common/src/lib.rs index 43b1f0b..b9e0e28 100644 --- a/lapdev-common/src/lib.rs +++ b/lapdev-common/src/lib.rs @@ -818,3 +818,8 @@ pub struct GitProvider { pub scopes: Vec, pub all_scopes: Vec, } + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] +pub struct UserCreationWebhook { + pub id: Uuid, +} diff --git a/lapdev-conductor/src/scheduler.rs b/lapdev-conductor/src/scheduler.rs index eb5eb83..38ce62a 100644 --- a/lapdev-conductor/src/scheduler.rs +++ b/lapdev-conductor/src/scheduler.rs @@ -441,6 +441,9 @@ mod tests { created_by: ActiveValue::Set(Uuid::new_v4()), repo_url: ActiveValue::Set("".to_string()), repo_name: ActiveValue::Set("".to_string()), + oauth_id: ActiveValue::Set(Uuid::new_v4()), + host_id: ActiveValue::Set(Uuid::new_v4()), + osuser: ActiveValue::Set("".to_string()), organization_id: ActiveValue::Set(Uuid::new_v4()), machine_type_id: ActiveValue::Set(machine_type.id), ..Default::default() @@ -473,7 +476,7 @@ mod tests { txn.commit().await.unwrap(); assert_eq!(workspace_host.available_dedicated_cpu, 9); - assert_eq!(workspace_host.available_shared_cpu, 12); + assert_eq!(workspace_host.available_shared_cpu, 9); assert_eq!(workspace_host.available_memory, 48); assert_eq!(workspace_host.available_disk, 980); } diff --git a/lapdev-conductor/src/server.rs b/lapdev-conductor/src/server.rs index 418216f..73f0305 100644 --- a/lapdev-conductor/src/server.rs +++ b/lapdev-conductor/src/server.rs @@ -502,18 +502,41 @@ impl Conductor { tokio::time::sleep(Duration::from_secs(60)).await; } else { for org in orgs { - let workspaces = self - .enterprise - .auto_start_stop - .organization_auto_stop_workspaces(org) - .await - .unwrap_or_default(); - for workspace in workspaces { - tracing::info!( - "stop workspace {} because of auto stop timeout", - workspace.name - ); - let _ = self.stop_workspace(workspace, None, None).await; + { + let workspaces = self + .enterprise + .auto_start_stop + .organization_auto_stop_workspaces(&org) + .await + .unwrap_or_default(); + for workspace in workspaces { + tracing::info!( + "stop workspace {} because of auto stop timeout", + workspace.name + ); + let _ = self.stop_workspace(workspace, None, None).await; + } + } + + if org.running_workspace_limit > 0 { + let usage = self + .enterprise + .usage + .get_monthly_cost(org.id, None, Utc::now().into(), None) + .await + .unwrap_or(0); + if usage as i64 >= org.usage_limit { + if let Ok(workspaces) = self.db.get_org_running_workspaces(org.id).await + { + for workspace in workspaces { + tracing::info!( + "stop workspace {} because of usage limit", + workspace.name + ); + let _ = self.stop_workspace(workspace, None, None).await; + } + } + } } } } @@ -1931,6 +1954,14 @@ impl Conductor { ) .await; + entities::organization::ActiveModel { + id: ActiveValue::Set(ws.organization_id), + has_running_workspace: ActiveValue::Set(true), + ..Default::default() + } + .update(&self.db.conn) + .await?; + Ok(()) } @@ -2147,9 +2178,16 @@ impl Conductor { user_agent.clone(), ) .await?; + if let Some(usage_id) = workspace.usage_id { + self.enterprise + .usage + .end_usage(&txn, usage_id, now.into()) + .await?; + } let update_ws = entities::workspace::ActiveModel { id: ActiveValue::Set(workspace.id), status: ActiveValue::Set(WorkspaceStatus::Deleting.to_string()), + usage_id: ActiveValue::Set(None), ..Default::default() }; let ws = update_ws.update(&txn).await?; @@ -2237,13 +2275,6 @@ impl Conductor { Ok(_) => { let status = WorkspaceStatus::Deleted; let txn = self.db.conn.begin().await?; - if let Some(usage_id) = ws.usage_id { - self.enterprise - .usage - .end_usage(&txn, usage_id, now.into()) - .await?; - } - let host = self .db .get_workspace_host_with_lock(&txn, ws.host_id) @@ -2251,7 +2282,6 @@ impl Conductor { entities::workspace::ActiveModel { id: ActiveValue::Set(ws.id), status: ActiveValue::Set(status.to_string()), - usage_id: ActiveValue::Set(None), deleted_at: ActiveValue::Set(Some(now.into())), ..Default::default() } @@ -2303,6 +2333,23 @@ impl Conductor { } }; + self.update_org_has_running_workspace(ws.organization_id) + .await?; + + Ok(()) + } + + async fn update_org_has_running_workspace(&self, org_id: Uuid) -> Result<()> { + let ws = self.db.get_org_running_workspace(org_id).await?; + if ws.is_none() { + entities::organization::ActiveModel { + id: ActiveValue::Set(org_id), + has_running_workspace: ActiveValue::Set(false), + ..Default::default() + } + .update(&self.db.conn) + .await?; + } Ok(()) } @@ -2424,10 +2471,17 @@ impl Conductor { user_agent, ) .await?; + if let Some(usage_id) = workspace.usage_id { + self.enterprise + .usage + .end_usage(&txn, usage_id, now.into()) + .await?; + } let update_ws = entities::workspace::ActiveModel { id: ActiveValue::Set(workspace.id), status: ActiveValue::Set(WorkspaceStatus::Stopping.to_string()), updated_at: ActiveValue::Set(Some(now.into())), + usage_id: ActiveValue::Set(None), ..Default::default() }; let ws = update_ws.update(&txn).await?; @@ -2520,6 +2574,15 @@ impl Conductor { .update(&txn) .await?; txn.commit().await?; + + entities::organization::ActiveModel { + id: ActiveValue::Set(ws.organization_id), + has_running_workspace: ActiveValue::Set(true), + ..Default::default() + } + .update(&self.db.conn) + .await?; + status } Err(e) => { @@ -2563,23 +2626,14 @@ impl Conductor { match result { Ok(_) => { let status = WorkspaceStatus::Stopped; - let txn = self.db.conn.begin().await?; - if let Some(usage_id) = ws.usage_id { - self.enterprise - .usage - .end_usage(&txn, usage_id, now.into()) - .await?; - } entities::workspace::ActiveModel { id: ActiveValue::Set(ws.id), status: ActiveValue::Set(status.to_string()), updated_at: ActiveValue::Set(Some(now.into())), - usage_id: ActiveValue::Set(None), ..Default::default() } - .update(&txn) + .update(&self.db.conn) .await?; - txn.commit().await?; } Err(e) => { let e = if let ApiError::InternalError(e) = e { @@ -2600,6 +2654,9 @@ impl Conductor { } }; + self.update_org_has_running_workspace(ws.organization_id) + .await?; + Ok(()) } diff --git a/lapdev-db/src/api.rs b/lapdev-db/src/api.rs index 04c19d9..f286960 100644 --- a/lapdev-db/src/api.rs +++ b/lapdev-db/src/api.rs @@ -26,6 +26,7 @@ pub const LAPDEV_CLUSTER_NOT_INITIATED: &str = "lapdev-cluster-not-initiated"; const LAPDEV_API_AUTH_TOKEN_KEY: &str = "lapdev-api-auth-token-key"; const LAPDEV_DEFAULT_USAGE_LIMIT: &str = "lapdev-default-org-usage-limit"; const LAPDEV_DEFAULT_RUNNING_WORKSPACE_LIMIT: &str = "lapdev-default-org-running-workspace-limit"; +const LAPDEV_USER_CREATION_WEBHOOK: &str = "lapdev-user-creation-webhook"; #[derive(Clone)] pub struct DbApi { @@ -145,6 +146,10 @@ impl DbApi { self.get_config(LAPDEV_BASE_HOSTNAME).await } + pub async fn get_user_creation_webhook(&self) -> Result { + self.get_config(LAPDEV_USER_CREATION_WEBHOOK).await + } + pub async fn is_container_isolated(&self) -> Result { Ok(entities::config::Entity::find() .filter(entities::config::Column::Name.eq(LAPDEV_ISOLATE_CONTAINER)) @@ -223,6 +228,32 @@ impl DbApi { Ok(model) } + pub async fn get_org_running_workspace( + &self, + org_id: Uuid, + ) -> Result> { + let model = workspace::Entity::find() + .filter(entities::workspace::Column::OrganizationId.eq(org_id)) + .filter(entities::workspace::Column::Status.eq(WorkspaceStatus::Running.to_string())) + .filter(entities::workspace::Column::DeletedAt.is_null()) + .one(&self.conn) + .await?; + Ok(model) + } + + pub async fn get_org_running_workspaces( + &self, + org_id: Uuid, + ) -> Result> { + let model = workspace::Entity::find() + .filter(entities::workspace::Column::OrganizationId.eq(org_id)) + .filter(entities::workspace::Column::Status.eq(WorkspaceStatus::Running.to_string())) + .filter(entities::workspace::Column::DeletedAt.is_null()) + .all(&self.conn) + .await?; + Ok(model) + } + pub async fn validate_ssh_public_key( &self, user_id: Uuid, @@ -303,6 +334,7 @@ impl DbApi { last_auto_stop_check: ActiveValue::Set(None), usage_limit: ActiveValue::Set(default_usage_limit), running_workspace_limit: ActiveValue::Set(default_running_workspace_limit), + has_running_workspace: ActiveValue::Set(false), } .insert(txn) .await?; diff --git a/lapdev-db/src/entities/organization.rs b/lapdev-db/src/entities/organization.rs index 8a13c42..74606bf 100644 --- a/lapdev-db/src/entities/organization.rs +++ b/lapdev-db/src/entities/organization.rs @@ -16,6 +16,7 @@ pub struct Model { pub last_auto_stop_check: Option, pub usage_limit: i64, pub running_workspace_limit: i32, + pub has_running_workspace: bool, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/lapdev-db/src/migration/m20231130_151650_create_organization_table.rs b/lapdev-db/src/migration/m20231130_151650_create_organization_table.rs index 6a0d8ac..05839a5 100644 --- a/lapdev-db/src/migration/m20231130_151650_create_organization_table.rs +++ b/lapdev-db/src/migration/m20231130_151650_create_organization_table.rs @@ -42,6 +42,11 @@ impl MigrationTrait for Migration { .integer() .not_null(), ) + .col( + ColumnDef::new(Organization::HasRunningWorkspace) + .boolean() + .not_null(), + ) .to_owned(), ) .await?; @@ -49,9 +54,10 @@ impl MigrationTrait for Migration { manager .create_index( Index::create() - .name("organization_deleted_at_last_auto_stop_check_idx") + .name("organization_deleted_at_has_running_workspace_last_auto_stop_check_idx") .table(Organization::Table) .col(Organization::DeletedAt) + .col(Organization::HasRunningWorkspace) .col(Organization::LastAutoStopCheck) .to_owned(), ) @@ -74,4 +80,5 @@ enum Organization { LastAutoStopCheck, UsageLimit, RunningWorkspaceLimit, + HasRunningWorkspace, } diff --git a/lapdev-enterprise/src/auto_start_stop.rs b/lapdev-enterprise/src/auto_start_stop.rs index 45c97df..655c82e 100644 --- a/lapdev-enterprise/src/auto_start_stop.rs +++ b/lapdev-enterprise/src/auto_start_stop.rs @@ -32,6 +32,9 @@ impl AutoStartStop { .column(entities::organization::Column::Id) .from(entities::organization::Entity) .and_where(entities::organization::Column::DeletedAt.is_null()) + .and_where( + entities::organization::Column::HasRunningWorkspace.eq(true), + ) .cond_where( Condition::any() .add( @@ -59,7 +62,7 @@ impl AutoStartStop { pub async fn organization_auto_stop_workspaces( &self, - org: entities::organization::Model, + org: &entities::organization::Model, ) -> Result> { let workspaces = match (org.auto_stop, org.allow_workspace_change_auto_stop) { (None, false) => { @@ -210,7 +213,11 @@ pub mod tests { auto_stop: ActiveValue::Set(Some(3600)), allow_workspace_change_auto_start: ActiveValue::Set(true), allow_workspace_change_auto_stop: ActiveValue::Set(false), - ..Default::default() + usage_limit: ActiveValue::Set(108000), + running_workspace_limit: ActiveValue::Set(3), + has_running_workspace: ActiveValue::Set(false), + deleted_at: ActiveValue::Set(None), + last_auto_stop_check: ActiveValue::Set(None), } .insert(&db.conn) .await @@ -255,7 +262,7 @@ pub mod tests { let worksapces = enterprise .auto_start_stop - .organization_auto_stop_workspaces(org.clone()) + .organization_auto_stop_workspaces(&org) .await .unwrap(); assert!(worksapces.is_empty()); @@ -301,7 +308,7 @@ pub mod tests { let worksapces = enterprise .auto_start_stop - .organization_auto_stop_workspaces(org.clone()) + .organization_auto_stop_workspaces(&org) .await .unwrap(); assert_eq!(worksapces.len(), 1); @@ -317,7 +324,7 @@ pub mod tests { let worksapces = enterprise .auto_start_stop - .organization_auto_stop_workspaces(org.clone()) + .organization_auto_stop_workspaces(&org) .await .unwrap(); assert_eq!(worksapces.len(), 0); @@ -333,7 +340,7 @@ pub mod tests { let worksapces = enterprise .auto_start_stop - .organization_auto_stop_workspaces(org.clone()) + .organization_auto_stop_workspaces(&org) .await .unwrap(); assert_eq!(worksapces.len(), 1);