Skip to content

Commit

Permalink
fix: pass in a reference to prevent mutation
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Nov 23, 2024
1 parent e147592 commit 3f647d0
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 15 deletions.
2 changes: 1 addition & 1 deletion examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where

type Layer = AckLayer<Self, Req, RedisMqContext, Res>;

fn poll<Svc>(mut self, _worker_id: Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(mut self, _worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.get_buffer_size());
let stream: RequestStream<Request<Req, RedisMqContext>> = Box::pin(rx);
let layer = AckLayer::new(self.clone());
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub trait Backend<Req, Res> {
/// Returns a poller that is ready for streaming
fn poll<Svc: Service<Req, Response = Res>>(
self,
worker: Worker<Context>,
worker: &Worker<Context>,
) -> Poller<Self::Stream, Self::Layer>;
}
/// A codec allows backends to encode and decode data
Expand Down Expand Up @@ -266,7 +266,7 @@ pub mod test_utils {
let worker_id = WorkerId::new("test-worker");
let worker = Worker::new(worker_id, crate::worker::Context::default());
let b = backend.clone();
let mut poller = b.poll::<S>(worker);
let mut poller = b.poll::<S>(&worker);
let (stop_tx, mut stop_rx) = channel::<()>(1);

let (mut res_tx, res_rx) = channel(10);
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl<T: Send + 'static + Sync, Res> Backend<Request<T, ()>, Res> for MemoryStora

type Layer = Identity;

fn poll<Svc>(self, _worker: Worker<worker::Context>) -> Poller<Self::Stream> {
fn poll<Svc>(self, _worker: &Worker<worker::Context>) -> Poller<Self::Stream> {
let stream = self.inner.map(|r| Ok(Some(r))).boxed();
Poller {
stream: BackendStream::new(stream, self.controller),
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<T, Res, Ctx> Backend<Request<T, Ctx>, Res> for RequestStream<Request<T, Ctx

type Layer = Identity;

fn poll<Svc>(self, _worker: Worker<Context>) -> Poller<Self::Stream> {
fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
Poller {
stream: self,
heartbeat: Box::pin(futures::future::pending()),
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl<S, P> Worker<Ready<S, P>> {
};
let backend = self.state.backend;
let service = self.state.service;
let poller = backend.poll::<S>(worker.clone());
let poller = backend.poll::<S>(&worker);
let stream = poller.stream;
let heartbeat = poller.heartbeat.boxed();
let layer = poller.layer;
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-cron/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where

type Layer = Identity;

fn poll<Svc>(self, _worker: Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let stream = self.into_stream();
Poller::new(stream, futures::future::pending())
}
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,15 @@ where

fn poll<Svc: Service<Request<T, RedisContext>>>(
mut self,
worker: Worker<apalis_core::worker::Context>,
worker: &Worker<apalis_core::worker::Context>,
) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.buffer_size);
let (ack, ack_rx) = mpsc::channel(self.config.buffer_size);
let layer = AckLayer::new(ack);
let controller = self.controller.clone();
let config = self.config.clone();
let stream: RequestStream<Request<T, RedisContext>> = Box::pin(rx);
let worker = worker.clone();
let heartbeat = async move {
let mut reenqueue_orphaned_stm =
apalis_core::interval::interval(config.poll_interval).fuse();
Expand Down
5 changes: 3 additions & 2 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ where

type Layer = AckLayer<MysqlStorage<Req, C>, Req, SqlContext, Res>;

fn poll<Svc>(self, worker: Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let layer = AckLayer::new(self.clone());
let config = self.config.clone();
let controller = self.controller.clone();
Expand Down Expand Up @@ -461,6 +461,7 @@ where
apalis_core::sleep(config.keep_alive).await;
}
};
let w = worker.clone();
let reenqueue_beat = async move {
loop {
let dead_since = Utc::now()
Expand All @@ -476,7 +477,7 @@ where
)
.await
{
worker.emit(Event::Error(Box::new(
w.emit(Event::Error(Box::new(
MysqlPollError::ReenqueueOrphanedError(e),
)));
}
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-sql/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,15 @@ where

type Layer = AckLayer<PostgresStorage<T, C>, T, SqlContext, Res>;

fn poll<Svc>(mut self, worker: Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(mut self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let layer = AckLayer::new(self.clone());
let subscription = self.subscription.clone();
let config = self.config.clone();
let controller = self.controller.clone();
let (mut tx, rx) = mpsc::channel(self.config.buffer_size);
let ack_notify = self.ack_notify.clone();
let pool = self.pool.clone();
let worker = worker.clone();
let heartbeat = async move {
let mut keep_alive_stm = apalis_core::interval::interval(config.keep_alive).fuse();
let mut reenqueue_orphaned_stm =
Expand Down
8 changes: 4 additions & 4 deletions packages/apalis-sql/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ impl<T: Serialize + DeserializeOwned + Sync + Send + Unpin + 'static, Res>
type Stream = BackendStream<RequestStream<Request<T, SqlContext>>>;
type Layer = AckLayer<SqliteStorage<T>, T, SqlContext, Res>;

fn poll<Svc>(mut self, worker: Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(mut self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let layer = AckLayer::new(self.clone());
let config = self.config.clone();
let controller = self.controller.clone();
Expand All @@ -479,14 +479,14 @@ impl<T: Serialize + DeserializeOwned + Sync + Send + Unpin + 'static, Res>
let heartbeat = async move {
loop {
let now: i64 = Utc::now().timestamp();
if let Err(e) = self.keep_alive_at::<Self::Layer>(worker.id(), now).await {
worker.emit(Event::Error(Box::new(SqlitePollError::KeepAliveError(e))));
if let Err(e) = self.keep_alive_at::<Self::Layer>(w.id(), now).await {
w.emit(Event::Error(Box::new(SqlitePollError::KeepAliveError(e))));
}
apalis_core::sleep(Duration::from_secs(30)).await;
}
}
.boxed();

let w = worker.clone();
let reenqueue_beat = async move {
loop {
let dead_since = Utc::now()
Expand Down

0 comments on commit 3f647d0

Please sign in to comment.