From d9511b83feaa9b9ff03a697ee49d3796db51890b Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 18 Nov 2024 14:59:05 -0800 Subject: [PATCH] Ensure only one clock can run. Add env setting for database checkout timeout. --- rwf/src/config.rs | 9 ++++++++- rwf/src/job/clock.rs | 36 ++++++++++++++++++++++++------------ rwf/src/job/worker.rs | 7 ++++++- rwf/src/model/pool/mod.rs | 37 +++++++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/rwf/src/config.rs b/rwf/src/config.rs index 8ac4868b..1ecf172e 100644 --- a/rwf/src/config.rs +++ b/rwf/src/config.rs @@ -436,7 +436,14 @@ impl DatabaseConfig { } fn default_checkout_timeout() -> usize { - 5 * 1000 + match var("RWF_DATABASE_CHECKOUT_TIMEOUT") { + Ok(timeout) => match timeout.parse() { + Ok(timeout) => timeout, + Err(_) => 5 * 1000, + }, + + Err(_) => 5 * 1000, + } } /// Maximum amount of time to wait for a connection diff --git a/rwf/src/job/clock.rs b/rwf/src/job/clock.rs index dc1a366d..de9f9d33 100644 --- a/rwf/src/job/clock.rs +++ b/rwf/src/job/clock.rs @@ -2,21 +2,19 @@ //! //! This is also known as a cron. //! -//! #### TODO -//! -//! Currently, there is no synchronization between multiple instances of -//! this clock. This cron isn't yet distributed, so only spawn one instance -//! of this clock in your app infrastructure. use super::{Cron, Error, Job, JobHandler}; -use crate::colors::MaybeColorize; +use crate::{colors::MaybeColorize, model::Pool}; use std::sync::Arc; use time::OffsetDateTime; use serde::Serialize; -use tokio::time::{interval, Duration}; +use std::time::Instant; +use tokio::time::{sleep, Duration}; use tracing::{error, info}; +static LOCK: i64 = 4334345490663; + /// A job that runs on a schedule. pub struct ScheduledJob { job: JobHandler, @@ -75,15 +73,21 @@ impl Clock { } /// Run the clock. This blocks forever. - pub async fn run(&self) { - info!("Clock started"); + pub async fn run(&self) -> Result<(), Error> { + info!("Clock is waiting for lock"); + + let mut lock = Pool::connection().await?; + lock.leak(); + + lock.client() + .execute(&format!("SELECT pg_advisory_lock({})", LOCK), &[]) + .await?; - let mut clock = interval(Duration::from_secs(1)); + info!("Clock is running"); loop { - clock.tick().await; + let start = Instant::now(); let now = OffsetDateTime::now_utc(); - let jobs = self.jobs.clone(); tokio::spawn(async move { @@ -102,6 +106,14 @@ impl Clock { } } }); + + // Make sure we still have a lock. + lock.query_cached(&format!("SELECT pg_advisory_lock({})", LOCK), &[]) + .await?; + + // Clock should strive to run once a second. + let remaining = Duration::from_secs(1).saturating_sub(start.elapsed()); + sleep(remaining).await; } } } diff --git a/rwf/src/job/worker.rs b/rwf/src/job/worker.rs index 86c4d0b7..e1264ff9 100644 --- a/rwf/src/job/worker.rs +++ b/rwf/src/job/worker.rs @@ -59,7 +59,12 @@ impl Worker { if let Some(clock) = self.clock.clone() { tokio::spawn(async move { - clock.run().await; + loop { + if let Err(err) = clock.run().await { + error!("Clock crashed, restarting in 1 second. Error: {:?}", err); + sleep(Duration::from_secs(1)).await; + } + } }); } diff --git a/rwf/src/model/pool/mod.rs b/rwf/src/model/pool/mod.rs index f4202306..11d4610f 100644 --- a/rwf/src/model/pool/mod.rs +++ b/rwf/src/model/pool/mod.rs @@ -80,6 +80,7 @@ pub struct ConnectionGuard { connection: Option, pool: Pool, rollback: bool, + leaked: bool, } impl ConnectionGuard { @@ -95,6 +96,7 @@ impl ConnectionGuard { connection: Some(connection), pool, rollback: false, + leaked: false, } } @@ -111,12 +113,30 @@ impl ConnectionGuard { pub fn connection_mut(&mut self) -> &mut Connection { self.connection.as_mut().unwrap() } + + /// Take this connection from the pool forever. The pool will pretend + /// like this connection never existed. + /// + /// ### Note + /// + /// Leaking too many connections can increase the number of open connections + /// to your database beyond acceptable limits. + pub fn leak(&mut self) { + if !self.leaked { + self.pool.leak(self.connection()); + self.leaked = true; + } + } } impl Drop for ConnectionGuard { /// Return the connection to the pool, automatically /// rolling back any unfinished transaction. fn drop(&mut self) { + if self.leaked { + return; + } + if let Some(mut connection) = self.connection.take() { connection.used(); @@ -377,6 +397,14 @@ impl Pool { self.checkin_notify.notify_one(); } + /// Take the connection from the pool forever. + /// + /// The caller is responsible for closing the connection. The pool + /// will pretend like this connection never existed. + fn leak(&self, _connection: &Connection) { + self.inner.lock().expected -= 1; + } + #[allow(dead_code)] fn maintenance(&self) { let now = Instant::now(); @@ -418,10 +446,14 @@ impl Drop for Pool { #[cfg(test)] mod test { + use std::env; + use super::*; #[tokio::test] async fn test_pool() -> Result<(), Error> { + env::set_var("RWF_DATABASE_CHECKOUT_TIMEOUT", "500"); + let pool = Pool::from_env(); let conn = pool.get().await?; let row = conn.client().query("SELECT 1", &[]).await?; @@ -437,6 +469,11 @@ mod test { assert!(pool.get().await.is_err()); drop(conn); + + let mut conn = pool.get().await?; + assert!(pool.get().await.is_err()); + + conn.leak(); assert!(pool.get().await.is_ok()); Ok(())