Skip to content

Commit

Permalink
Ensure only one clock can run. Add env setting for database checkout …
Browse files Browse the repository at this point in the history
…timeout.
  • Loading branch information
levkk committed Nov 18, 2024
1 parent 05c68b9 commit d9511b8
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 14 deletions.
9 changes: 8 additions & 1 deletion rwf/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,14 @@ impl DatabaseConfig {
}

fn default_checkout_timeout() -> usize {
5 * 1000
match var("RWF_DATABASE_CHECKOUT_TIMEOUT") {
Ok(timeout) => match timeout.parse() {
Ok(timeout) => timeout,
Err(_) => 5 * 1000,
},

Err(_) => 5 * 1000,
}
}

/// Maximum amount of time to wait for a connection
Expand Down
36 changes: 24 additions & 12 deletions rwf/src/job/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@
//!
//! This is also known as a cron.
//!
//! #### TODO
//!
//! Currently, there is no synchronization between multiple instances of
//! this clock. This cron isn't yet distributed, so only spawn one instance
//! of this clock in your app infrastructure.
use super::{Cron, Error, Job, JobHandler};
use crate::colors::MaybeColorize;
use crate::{colors::MaybeColorize, model::Pool};

use std::sync::Arc;
use time::OffsetDateTime;

use serde::Serialize;
use tokio::time::{interval, Duration};
use std::time::Instant;
use tokio::time::{sleep, Duration};
use tracing::{error, info};

static LOCK: i64 = 4334345490663;

/// A job that runs on a schedule.
pub struct ScheduledJob {
job: JobHandler,
Expand Down Expand Up @@ -75,15 +73,21 @@ impl Clock {
}

/// Run the clock. This blocks forever.
pub async fn run(&self) {
info!("Clock started");
pub async fn run(&self) -> Result<(), Error> {
info!("Clock is waiting for lock");

let mut lock = Pool::connection().await?;
lock.leak();

lock.client()
.execute(&format!("SELECT pg_advisory_lock({})", LOCK), &[])
.await?;

let mut clock = interval(Duration::from_secs(1));
info!("Clock is running");

loop {
clock.tick().await;
let start = Instant::now();
let now = OffsetDateTime::now_utc();

let jobs = self.jobs.clone();

tokio::spawn(async move {
Expand All @@ -102,6 +106,14 @@ impl Clock {
}
}
});

// Make sure we still have a lock.
lock.query_cached(&format!("SELECT pg_advisory_lock({})", LOCK), &[])
.await?;

// Clock should strive to run once a second.
let remaining = Duration::from_secs(1).saturating_sub(start.elapsed());
sleep(remaining).await;
}
}
}
7 changes: 6 additions & 1 deletion rwf/src/job/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ impl Worker {

if let Some(clock) = self.clock.clone() {
tokio::spawn(async move {
clock.run().await;
loop {
if let Err(err) = clock.run().await {
error!("Clock crashed, restarting in 1 second. Error: {:?}", err);
sleep(Duration::from_secs(1)).await;
}
}
});
}

Expand Down
37 changes: 37 additions & 0 deletions rwf/src/model/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub struct ConnectionGuard {
connection: Option<Connection>,
pool: Pool,
rollback: bool,
leaked: bool,
}

impl ConnectionGuard {
Expand All @@ -95,6 +96,7 @@ impl ConnectionGuard {
connection: Some(connection),
pool,
rollback: false,
leaked: false,
}
}

Expand All @@ -111,12 +113,30 @@ impl ConnectionGuard {
pub fn connection_mut(&mut self) -> &mut Connection {
self.connection.as_mut().unwrap()
}

/// Take this connection from the pool forever. The pool will pretend
/// like this connection never existed.
///
/// ### Note
///
/// Leaking too many connections can increase the number of open connections
/// to your database beyond acceptable limits.
pub fn leak(&mut self) {
if !self.leaked {
self.pool.leak(self.connection());
self.leaked = true;
}
}
}

impl Drop for ConnectionGuard {
/// Return the connection to the pool, automatically
/// rolling back any unfinished transaction.
fn drop(&mut self) {
if self.leaked {
return;
}

if let Some(mut connection) = self.connection.take() {
connection.used();

Expand Down Expand Up @@ -377,6 +397,14 @@ impl Pool {
self.checkin_notify.notify_one();
}

/// Take the connection from the pool forever.
///
/// The caller is responsible for closing the connection. The pool
/// will pretend like this connection never existed.
fn leak(&self, _connection: &Connection) {
self.inner.lock().expected -= 1;
}

#[allow(dead_code)]
fn maintenance(&self) {
let now = Instant::now();
Expand Down Expand Up @@ -418,10 +446,14 @@ impl Drop for Pool {

#[cfg(test)]
mod test {
use std::env;

use super::*;

#[tokio::test]
async fn test_pool() -> Result<(), Error> {
env::set_var("RWF_DATABASE_CHECKOUT_TIMEOUT", "500");

let pool = Pool::from_env();
let conn = pool.get().await?;
let row = conn.client().query("SELECT 1", &[]).await?;
Expand All @@ -437,6 +469,11 @@ mod test {
assert!(pool.get().await.is_err());

drop(conn);

let mut conn = pool.get().await?;
assert!(pool.get().await.is_err());

conn.leak();
assert!(pool.get().await.is_ok());

Ok(())
Expand Down

0 comments on commit d9511b8

Please sign in to comment.