From fa74dd1ec9c29ab925da3e730742aa4cf6ab99b6 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Fri, 9 Feb 2024 11:28:21 -0800 Subject: [PATCH] refactor(scheduler): extract run_scheduler_now to handle scheduler error, instead of unwrapping (#1421) --- ee/tabby-webserver/src/cron/scheduler.rs | 117 +++++++++++++---------- 1 file changed, 64 insertions(+), 53 deletions(-) diff --git a/ee/tabby-webserver/src/cron/scheduler.rs b/ee/tabby-webserver/src/cron/scheduler.rs index 50c3c14aa548..c891889022cc 100644 --- a/ee/tabby-webserver/src/cron/scheduler.rs +++ b/ee/tabby-webserver/src/cron/scheduler.rs @@ -1,8 +1,9 @@ -use std::{process::Stdio, sync::Arc, time::Duration}; +use std::{process::Stdio, sync::Arc}; +use anyhow::{Context, Result}; use tokio::io::AsyncBufReadExt; use tokio_cron_scheduler::Job; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use crate::schema::{job::JobService, worker::WorkerService}; @@ -13,8 +14,7 @@ pub async fn scheduler_job( ) -> anyhow::Result { let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(())); - // let job = Job::new_async("0 1/10 * * * *", move |uuid, mut scheduler| { - let job = Job::new_one_shot_async(Duration::from_secs(1), move |uuid, mut scheduler| { + let job = Job::new_async("0 1/10 * * * *", move |uuid, mut scheduler| { let worker = worker.clone(); let job = job.clone(); let scheduler_mutex = scheduler_mutex.clone(); @@ -24,55 +24,8 @@ pub async fn scheduler_job( return; }; - info!("Running scheduler job..."); - let exe = std::env::current_exe().unwrap(); - let job_id = job.create_job_run("scheduler".to_owned()).await.unwrap(); - - let mut child = tokio::process::Command::new(exe) - .arg("scheduler") - .arg("--now") - .arg("--url") - .arg(format!("localhost:{local_port}")) - .arg("--token") - .arg(worker.read_registration_token().await.unwrap()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .unwrap(); - - { - // Pipe stdout - let job = job.clone(); - let job_id = job_id.clone(); - let stdout = child.stdout.take().unwrap(); - tokio::spawn(async move { - let stdout = tokio::io::BufReader::new(stdout); - let mut stdout = stdout.lines(); - while let Ok(Some(line)) = stdout.next_line().await { - println!("{line}"); - let _ = job.update_job_stdout(&job_id, line + "\n").await; - } - }); - } - - { - // Pipe stderr - let stderr = child.stderr.take().unwrap(); - let job = job.clone(); - let job_id = job_id.clone(); - tokio::spawn(async move { - let stderr = tokio::io::BufReader::new(stderr); - let mut stdout = stderr.lines(); - while let Ok(Some(line)) = stdout.next_line().await { - eprintln!("{line}"); - let _ = job.update_job_stderr(&job_id, line + "\n").await; - } - }); - } - if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) { - let _ = job.complete_job_run(&job_id, exit_code).await; - } else { - let _ = job.complete_job_run(&job_id, -1).await; + if let Err(err) = run_scheduler_now(job, worker, local_port).await { + error!("Failed to run scheduler job, reason: `{}`", err); } if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await { @@ -86,3 +39,61 @@ pub async fn scheduler_job( Ok(job) } + +async fn run_scheduler_now( + job: Arc, + worker: Arc, + local_port: u16, +) -> Result<()> { + info!("Running scheduler job..."); + let exe = std::env::current_exe()?; + let job_id = job.create_job_run("scheduler".to_owned()).await?; + + let mut child = tokio::process::Command::new(exe) + .arg("scheduler") + .arg("--now") + .arg("--url") + .arg(format!("localhost:{local_port}")) + .arg("--token") + .arg(worker.read_registration_token().await?) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + { + // Pipe stdout + let job = job.clone(); + let job_id = job_id.clone(); + let stdout = child.stdout.take().context("Failed to acquire stdout")?; + tokio::spawn(async move { + let stdout = tokio::io::BufReader::new(stdout); + let mut stdout = stdout.lines(); + while let Ok(Some(line)) = stdout.next_line().await { + println!("{line}"); + let _ = job.update_job_stdout(&job_id, line + "\n").await; + } + }); + } + + { + // Pipe stderr + let stderr = child.stderr.take().context("Failed to acquire stderr")?; + let job = job.clone(); + let job_id = job_id.clone(); + tokio::spawn(async move { + let stderr = tokio::io::BufReader::new(stderr); + let mut stdout = stderr.lines(); + while let Ok(Some(line)) = stdout.next_line().await { + eprintln!("{line}"); + let _ = job.update_job_stderr(&job_id, line + "\n").await; + } + }); + } + if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) { + job.complete_job_run(&job_id, exit_code).await?; + } else { + job.complete_job_run(&job_id, -1).await?; + } + + Ok(()) +}