Skip to content

Commit

Permalink
clean up inactive workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang2821 committed Nov 4, 2024
1 parent 0e982de commit 30a07c3
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 89 deletions.
2 changes: 1 addition & 1 deletion lapdev-api/src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub async fn delete_workspace(

state
.conductor
.delete_workspace(ws, info.ip, info.user_agent)
.delete_workspace(&ws, info.ip, info.user_agent)
.await?;
Ok(StatusCode::NO_CONTENT.into_response())
}
Expand Down
4 changes: 3 additions & 1 deletion lapdev-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,13 @@ pub struct WorkspaceService {
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RunningWorkspace {
pub struct HostWorkspace {
pub id: Uuid,
pub ssh_port: Option<i32>,
pub ide_port: Option<i32>,
pub last_inactivity: Option<DateTime<FixedOffset>>,
pub created_at: DateTime<FixedOffset>,
pub updated_at: Option<DateTime<FixedOffset>>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
23 changes: 19 additions & 4 deletions lapdev-conductor/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use lapdev_common::{BuildTarget, PrebuildUpdateEvent, RunningWorkspace, WorkspaceUpdateEvent};
use lapdev_common::{BuildTarget, HostWorkspace, PrebuildUpdateEvent, WorkspaceUpdateEvent};
use lapdev_db::entities;
use lapdev_rpc::{error::ApiError, ConductorService};
use sea_orm::{ActiveModelTrait, ActiveValue};
Expand Down Expand Up @@ -70,24 +70,39 @@ impl ConductorService for ConductorRpc {
}
}

async fn running_workspaces(
async fn running_workspaces_on_host(
self,
_context: tarpc::context::Context,
) -> Result<Vec<RunningWorkspace>, ApiError> {
) -> Result<Vec<HostWorkspace>, ApiError> {
let workspaces = self
.conductor
.db
.get_running_workspaces_on_host(self.ws_host_id)
.await?;
let workspaces = workspaces
.into_iter()
.map(|ws| RunningWorkspace {
.map(|ws| HostWorkspace {
id: ws.id,
ssh_port: ws.ssh_port,
ide_port: ws.ide_port,
last_inactivity: ws.last_inactivity,
created_at: ws.created_at,
updated_at: ws.updated_at,
})
.collect();
Ok(workspaces)
}

async fn auto_delete_inactive_workspaces(self, _context: tarpc::context::Context) {
let conductor = self.conductor.clone();
let host_id = self.ws_host_id;
tokio::spawn(async move {
if let Err(e) = conductor
.auto_delete_inactive_workspaces_on_host(host_id)
.await
{
tracing::error!("auto delete inactive workspaces on host {host_id} error: {e:?}");
}
});
}
}
63 changes: 52 additions & 11 deletions lapdev-conductor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,9 +1169,9 @@ impl Conductor {
let ws = entities::workspace::ActiveModel {
id: ActiveValue::Set(workspace_id),
deleted_at: ActiveValue::Set(None),
updated_at: ActiveValue::Set(None),
name: ActiveValue::Set(name.clone()),
created_at: ActiveValue::Set(now.into()),
updated_at: ActiveValue::Set(Some(now.into())),
status: ActiveValue::Set(WorkspaceStatus::New.to_string()),
repo_url: ActiveValue::Set(repo.url.clone()),
repo_name: ActiveValue::Set(repo.name.clone()),
Expand Down Expand Up @@ -2059,15 +2059,27 @@ impl Conductor {
.do_create_workspace(&user, &ws, repo, &machine_type, ip, user_agent)
.await
{
let err = if let ApiError::InternalError(e) = e {
e
} else {
e.to_string()
};
tracing::error!("create workspace failed: {err}");
let _ = conductor
.update_workspace_status(&ws, WorkspaceStatus::Failed)
.await;
tracing::error!("create workspace failed: {e:?}");
if let Ok(ws) = conductor.db.get_workspace(ws.id).await {
if let Some(usage_id) = ws.usage_id {
let now = Utc::now();
if let Ok(txn) = conductor.db.conn.begin().await {
let _ = conductor
.enterprise
.usage
.end_usage(&txn, usage_id, now.into())
.await;
let _ = txn.commit().await;
}
}
}
let _ = entities::workspace::ActiveModel {
id: ActiveValue::Set(ws.id),
status: ActiveValue::Set(WorkspaceStatus::Failed.to_string()),
..Default::default()
}
.update(&conductor.db.conn)
.await;
}
});

Expand Down Expand Up @@ -2189,7 +2201,7 @@ impl Conductor {

pub async fn delete_workspace(
&self,
workspace: entities::workspace::Model,
workspace: &entities::workspace::Model,
ip: Option<String>,
user_agent: Option<String>,
) -> Result<(), ApiError> {
Expand Down Expand Up @@ -2235,6 +2247,7 @@ impl Conductor {
let update_ws = entities::workspace::ActiveModel {
id: ActiveValue::Set(workspace.id),
status: ActiveValue::Set(WorkspaceStatus::Deleting.to_string()),
updated_at: ActiveValue::Set(Some(now.into())),
usage_id: ActiveValue::Set(None),
..Default::default()
};
Expand Down Expand Up @@ -2332,6 +2345,7 @@ impl Conductor {
id: ActiveValue::Set(ws.id),
status: ActiveValue::Set(status.to_string()),
deleted_at: ActiveValue::Set(Some(now.into())),
updated_at: ActiveValue::Set(Some(now.into())),
..Default::default()
}
.update(&txn)
Expand Down Expand Up @@ -2375,6 +2389,7 @@ impl Conductor {
entities::workspace::ActiveModel {
id: ActiveValue::Set(ws.id),
status: ActiveValue::Set(status.to_string()),
updated_at: ActiveValue::Set(Some(now.into())),
..Default::default()
}
.update(&self.db.conn)
Expand Down Expand Up @@ -3330,6 +3345,32 @@ impl Conductor {
.await?;
Ok(())
}

pub async fn auto_delete_inactive_workspaces_on_host(
&self,
host_id: Uuid,
) -> Result<(), ApiError> {
let workspaces = self
.db
.get_inactive_workspaces_on_host(
host_id,
(Utc::now() - Duration::from_secs(14 * 24 * 60 * 60)).into(),
)
.await?;
for ws in workspaces {
if ws.compose_parent.is_none() {
tracing::info!(
"now delete ws {} due to inactivity, last updated at {:?}",
ws.name,
ws.updated_at
);
if let Err(e) = self.delete_workspace(&ws, None, None).await {
tracing::info!("delete inactive ws {} error: {e:?}", ws.name);
}
}
}
Ok(())
}
}

pub fn encode_pkcs8_pem(key: &KeyPair) -> Result<String> {
Expand Down
19 changes: 17 additions & 2 deletions lapdev-db/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{anyhow, Result};
use base64::{engine::general_purpose::STANDARD, Engine};
use chrono::Utc;
use chrono::{DateTime, FixedOffset, Utc};
use lapdev_common::{
AuthProvider, ProviderUser, UserRole, WorkspaceStatus, LAPDEV_BASE_HOSTNAME,
LAPDEV_ISOLATE_CONTAINER,
Expand Down Expand Up @@ -221,8 +221,23 @@ impl DbApi {
) -> Result<Vec<entities::workspace::Model>> {
let model = workspace::Entity::find()
.filter(entities::workspace::Column::HostId.eq(ws_host_id))
.filter(entities::workspace::Column::DeletedAt.is_null())
.filter(entities::workspace::Column::Status.eq(WorkspaceStatus::Running.to_string()))
.all(&self.conn)
.await?;
Ok(model)
}

pub async fn get_inactive_workspaces_on_host(
&self,
ws_host_id: Uuid,
last_updated_at: DateTime<FixedOffset>,
) -> Result<Vec<entities::workspace::Model>> {
let model = workspace::Entity::find()
.filter(entities::workspace::Column::HostId.eq(ws_host_id))
.filter(entities::workspace::Column::DeletedAt.is_null())
.filter(entities::workspace::Column::Status.eq(WorkspaceStatus::Stopped.to_string()))
.filter(entities::workspace::Column::UpdatedAt.lt(last_updated_at))
.all(&self.conn)
.await?;
Ok(model)
Expand All @@ -234,8 +249,8 @@ impl DbApi {
) -> Result<Option<entities::workspace::Model>> {
let model = workspace::Entity::find()
.filter(entities::workspace::Column::OrganizationId.eq(org_id))
.filter(entities::workspace::Column::Status.eq(WorkspaceStatus::Running.to_string()))
.filter(entities::workspace::Column::DeletedAt.is_null())
.filter(entities::workspace::Column::Status.eq(WorkspaceStatus::Running.to_string()))
.one(&self.conn)
.await?;
Ok(model)
Expand Down
8 changes: 5 additions & 3 deletions lapdev-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use futures::{
};
use lapdev_common::{
BuildTarget, ContainerInfo, CreateWorkspaceRequest, DeleteWorkspaceRequest, GitBranch,
PrebuildInfo, ProjectRequest, RepoBuildInfo, RepoBuildOutput, RepoBuildResult, RepoContent,
RunningWorkspace, StartWorkspaceRequest, StopWorkspaceRequest,
HostWorkspace, PrebuildInfo, ProjectRequest, RepoBuildInfo, RepoBuildOutput, RepoBuildResult,
RepoContent, StartWorkspaceRequest, StopWorkspaceRequest,
};
use serde::{Deserialize, Serialize};
use tarpc::transport::channel::UnboundedChannel;
Expand Down Expand Up @@ -111,7 +111,9 @@ pub trait ConductorService {

async fn update_build_repo_stderr(target: BuildTarget, line: String);

async fn running_workspaces() -> Result<Vec<RunningWorkspace>, ApiError>;
async fn running_workspaces_on_host() -> Result<Vec<HostWorkspace>, ApiError>;

async fn auto_delete_inactive_workspaces();
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
Loading

0 comments on commit 30a07c3

Please sign in to comment.