Skip to content

Commit

Permalink
Improve error handling for sqlx backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Johnabell committed Jun 13, 2024
1 parent 94240b3 commit d29664e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
14 changes: 5 additions & 9 deletions rexecutor-sqlx/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ use rexecutor::{
use tokio::sync::mpsc;
use tracing::instrument;

use crate::{stream::ReadyJobStream, RexecutorPgBackend};
use crate::{stream::ReadyJobStream, RexecutorPgBackend, map_err};

impl RexecutorPgBackend {
fn handle_update(result: sqlx::Result<u64>, job_id: JobId) -> Result<(), BackendError> {
match result {
Ok(0) => Err(BackendError::JobNotFound(job_id)),
Ok(1) => Ok(()),
Ok(_) => Err(BackendError::BadState),
// TODO fix this
Err(_error) => Err(BackendError::BadState),
Err(error) => Err(map_err(error)),

Check warning on line 24 in rexecutor-sqlx/src/backend.rs

View check run for this annotation

Codecov / codecov/patch

rexecutor-sqlx/src/backend.rs#L24

Added line #L24 was not covered by tests
}
}
}
Expand Down Expand Up @@ -59,8 +58,7 @@ impl Backend for RexecutorPgBackend {
} else {
self.insert_job(job).await
}
// TODO error handling
.map_err(|_| BackendError::BadState)
.map_err(map_err)
}
async fn mark_job_complete(&self, id: JobId) -> Result<(), BackendError> {
let result = self._mark_job_complete(id).await;
Expand Down Expand Up @@ -102,7 +100,7 @@ impl Backend for RexecutorPgBackend {
async fn prune_jobs(&self, spec: &PruneSpec) -> Result<(), BackendError> {
self.delete_from_spec(spec)
.await
.map_err(|_| BackendError::BadState)
.map_err(map_err)
}
async fn rerun_job(&self, id: JobId) -> Result<(), BackendError> {
let result = self.rerun(id).await;
Expand All @@ -116,7 +114,7 @@ impl Backend for RexecutorPgBackend {
async fn query<'a>(&self, query: Query<'a>) -> Result<Vec<Job>, BackendError> {
self.run_query(query)
.await
.map_err(|_| BackendError::BadState)?
.map_err(map_err)?
.into_iter()
.map(TryFrom::try_from)
.collect()
Expand Down Expand Up @@ -216,8 +214,6 @@ mod test {
backend: RexecutorPgBackend::from_pool(pool).await.unwrap()
);

// TODO: add tests for ignoring running, cancelled, complete, and discarded jobs

#[sqlx::test]
async fn load_job_mark_as_executing_for_executor_returns_none_when_db_empty(pool: PgPool) {
let backend: RexecutorPgBackend = pool.into();
Expand Down
20 changes: 16 additions & 4 deletions rexecutor-sqlx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,25 @@ struct Notification {

use types::*;

fn map_err(error: sqlx::Error) -> BackendError {
match error {
sqlx::Error::Io(err) => BackendError::Io(err),
sqlx::Error::Tls(err) => BackendError::Io(std::io::Error::other(err)),
sqlx::Error::Protocol(err) => BackendError::Io(std::io::Error::other(err)),
sqlx::Error::AnyDriverError(err) => BackendError::Io(std::io::Error::other(err)),
sqlx::Error::PoolTimedOut => BackendError::Io(std::io::Error::other(error)),
sqlx::Error::PoolClosed => BackendError::Io(std::io::Error::other(error)),
_ => BackendError::BadState,

Check warning on line 53 in rexecutor-sqlx/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

rexecutor-sqlx/src/lib.rs#L45-L53

Added lines #L45 - L53 were not covered by tests
}
}

Check warning on line 55 in rexecutor-sqlx/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

rexecutor-sqlx/src/lib.rs#L55

Added line #L55 was not covered by tests

impl RexecutorPgBackend {
/// Creates a new [`RexecutorPgBackend`] from a db connection string.
pub async fn from_db_url(db_url: &str) -> Result<Self, BackendError> {
let pool = PgPoolOptions::new()
.connect(db_url)
.await
.map_err(|_| BackendError::BadState)?;
.map_err(map_err)?;

Check warning on line 63 in rexecutor-sqlx/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

rexecutor-sqlx/src/lib.rs#L63

Added line #L63 was not covered by tests
Self::from_pool(pool).await
}
/// Create a new [`RexecutorPgBackend`] from an existing [`PgPool`].
Expand All @@ -59,11 +71,11 @@ impl RexecutorPgBackend {
};
let mut listener = PgListener::connect_with(&this.pool)
.await
.map_err(|_| BackendError::BadState)?;
.map_err(map_err)?;
listener
.listen("public.rexecutor_scheduled")
.await
.map_err(|_| BackendError::BadState)?;
.map_err(map_err)?;

tokio::spawn({
let subscribers = this.subscribers.clone();
Expand Down Expand Up @@ -97,7 +109,7 @@ impl RexecutorPgBackend {
sqlx::migrate!()
.run(&self.pool)
.await
.map_err(|_| BackendError::BadState)
.map_err(|err| BackendError::Io(std::io::Error::other(err)))

Check warning on line 112 in rexecutor-sqlx/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

rexecutor-sqlx/src/lib.rs#L112

Added line #L112 was not covered by tests
}

async fn load_job_mark_as_executing_for_executor(
Expand Down
5 changes: 4 additions & 1 deletion rexecutor/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,10 @@ pub enum BackendError {
/// No jobs was found matching the criteria provided.
#[error("Job not found: {0}")]
JobNotFound(JobId),
// TODO do we need some sort of IO error here
/// There was an error doing IO with the backend
#[error(transparent)]
Io(std::io::Error),

}

#[cfg(test)]
Expand Down

0 comments on commit d29664e

Please sign in to comment.