Skip to content

Commit

Permalink
refactor(scheduler): extract run_scheduler_now to handle scheduler er…
Browse files Browse the repository at this point in the history
…ror, instead of unwrapping (#1421)
  • Loading branch information
wsxiaoys authored Feb 9, 2024
1 parent ee92a9f commit fa74dd1
Showing 1 changed file with 64 additions and 53 deletions.
117 changes: 64 additions & 53 deletions ee/tabby-webserver/src/cron/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{process::Stdio, sync::Arc, time::Duration};
use std::{process::Stdio, sync::Arc};

use anyhow::{Context, Result};
use tokio::io::AsyncBufReadExt;
use tokio_cron_scheduler::Job;
use tracing::{info, warn};
use tracing::{error, info, warn};

use crate::schema::{job::JobService, worker::WorkerService};

Expand All @@ -13,8 +14,7 @@ pub async fn scheduler_job(
) -> anyhow::Result<Job> {
let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(()));

// let job = Job::new_async("0 1/10 * * * *", move |uuid, mut scheduler| {
let job = Job::new_one_shot_async(Duration::from_secs(1), move |uuid, mut scheduler| {
let job = Job::new_async("0 1/10 * * * *", move |uuid, mut scheduler| {
let worker = worker.clone();
let job = job.clone();
let scheduler_mutex = scheduler_mutex.clone();
Expand All @@ -24,55 +24,8 @@ pub async fn scheduler_job(
return;
};

info!("Running scheduler job...");
let exe = std::env::current_exe().unwrap();
let job_id = job.create_job_run("scheduler".to_owned()).await.unwrap();

let mut child = tokio::process::Command::new(exe)
.arg("scheduler")
.arg("--now")
.arg("--url")
.arg(format!("localhost:{local_port}"))
.arg("--token")
.arg(worker.read_registration_token().await.unwrap())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();

{
// Pipe stdout
let job = job.clone();
let job_id = job_id.clone();
let stdout = child.stdout.take().unwrap();
tokio::spawn(async move {
let stdout = tokio::io::BufReader::new(stdout);
let mut stdout = stdout.lines();
while let Ok(Some(line)) = stdout.next_line().await {
println!("{line}");
let _ = job.update_job_stdout(&job_id, line + "\n").await;
}
});
}

{
// Pipe stderr
let stderr = child.stderr.take().unwrap();
let job = job.clone();
let job_id = job_id.clone();
tokio::spawn(async move {
let stderr = tokio::io::BufReader::new(stderr);
let mut stdout = stderr.lines();
while let Ok(Some(line)) = stdout.next_line().await {
eprintln!("{line}");
let _ = job.update_job_stderr(&job_id, line + "\n").await;
}
});
}
if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) {
let _ = job.complete_job_run(&job_id, exit_code).await;
} else {
let _ = job.complete_job_run(&job_id, -1).await;
if let Err(err) = run_scheduler_now(job, worker, local_port).await {
error!("Failed to run scheduler job, reason: `{}`", err);
}

if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
Expand All @@ -86,3 +39,61 @@ pub async fn scheduler_job(

Ok(job)
}

async fn run_scheduler_now(
job: Arc<dyn JobService>,
worker: Arc<dyn WorkerService>,
local_port: u16,
) -> Result<()> {
info!("Running scheduler job...");
let exe = std::env::current_exe()?;
let job_id = job.create_job_run("scheduler".to_owned()).await?;

let mut child = tokio::process::Command::new(exe)
.arg("scheduler")
.arg("--now")
.arg("--url")
.arg(format!("localhost:{local_port}"))
.arg("--token")
.arg(worker.read_registration_token().await?)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;

{
// Pipe stdout
let job = job.clone();
let job_id = job_id.clone();
let stdout = child.stdout.take().context("Failed to acquire stdout")?;
tokio::spawn(async move {
let stdout = tokio::io::BufReader::new(stdout);
let mut stdout = stdout.lines();
while let Ok(Some(line)) = stdout.next_line().await {
println!("{line}");
let _ = job.update_job_stdout(&job_id, line + "\n").await;
}
});
}

{
// Pipe stderr
let stderr = child.stderr.take().context("Failed to acquire stderr")?;
let job = job.clone();
let job_id = job_id.clone();
tokio::spawn(async move {
let stderr = tokio::io::BufReader::new(stderr);
let mut stdout = stderr.lines();
while let Ok(Some(line)) = stdout.next_line().await {
eprintln!("{line}");
let _ = job.update_job_stderr(&job_id, line + "\n").await;
}
});
}
if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) {
job.complete_job_run(&job_id, exit_code).await?;
} else {
job.complete_job_run(&job_id, -1).await?;
}

Ok(())
}

0 comments on commit fa74dd1

Please sign in to comment.