Skip to content

Commit

Permalink
Merge pull request #2 from geofmureithi/chore/clippy_n_improvs
Browse files Browse the repository at this point in the history
Chore/clippy n improvs
  • Loading branch information
geofmureithi authored Jul 9, 2022
2 parents ac44d24 + a4ee423 commit 98641e1
Show file tree
Hide file tree
Showing 25 changed files with 191 additions and 129 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apalis"
version = "0.3.0"
version = "0.3.1"
authors = ["Geoffrey Mureithi <[email protected]>"]
description = "Simple, extensible multithreaded background job processing for Rust"
repository = "https://github.com/geofmureithi/apalis"
Expand Down Expand Up @@ -51,20 +51,20 @@ docsrs = ["document-features"]


[dependencies.apalis-redis]
version = "0.3.0"
version = "0.3.1"
optional = true
default-features = false
path = "./packages/apalis-redis"

[dependencies.apalis-sql]
version = "0.3.0"
version = "0.3.1"
features = ["migrate"]
optional = true
default-features = false
path = "./packages/apalis-sql"

[dependencies.apalis-core]
version = "0.3.0"
version = "0.3.1"
optional = true
default-features = false
path = "./packages/apalis-core"
Expand All @@ -84,7 +84,7 @@ all-features = true
criterion = { version = "0.3", features=["async_tokio"] }
serde = "1"
tokio = { version = "1", features =["macros"] }
apalis-redis = { version = "0.3.0", path = "./packages/apalis-redis" }
apalis-redis = { version = "0.3.1", path = "./packages/apalis-redis" }

[[bench]]
name = "redis_benchmark"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ To get started, just add to Cargo.toml

```toml
[dependencies]
apalis = { version = "0.3.0", features = ["redis"] }
apalis = { version = "0.3.1", features = ["redis"] }
```

## Usage
Expand Down
2 changes: 1 addition & 1 deletion examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn push_email(
let mut storage = storage.clone();
let res = storage.push(email.into_inner()).await;
match res {
Ok(()) => HttpResponse::Ok().body(format!("Email added to queue")),
Ok(()) => HttpResponse::Ok().body("Email added to queue".to_string()),
Err(e) => HttpResponse::InternalServerError().body(format!("{}", e)),
}
}
Expand Down
23 changes: 12 additions & 11 deletions examples/postgres/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ use apalis::{
use email_service::{send_email, Email};

async fn produce_jobs(storage: &PostgresStorage<Email>) {
// The programatic way
let mut storage = storage.clone();
for i in 0..10 {
storage
.push(Email {
to: format!("test{}@example.com", i),
text: "Test backround job from Apalis".to_string(),
subject: "Background email job".to_string(),
})
.await
.unwrap();
}
storage
.push(Email {
to: "[email protected]".to_string(),
text: "Test backround job from Apalis".to_string(),
subject: "Background email job".to_string(),
})
.await
.expect("Unable to push job");
// The sql way
tracing::info!("You can also add jobs via sql query, run this: \n Select apalis.push_job('apalis::Email', json_build_object('subject', 'Test Apalis', 'to', '[email protected]', 'text', 'Lorem Ipsum'));")
}

struct TracingListener;
impl WorkerListener for TracingListener {
fn on_event(&self, worker_id: &String, event: &WorkerEvent) {
fn on_event(&self, worker_id: &str, event: &WorkerEvent) {
tracing::info!(worker_id = ?worker_id, event = ?event, "Received message from worker")
}
}
Expand Down
8 changes: 4 additions & 4 deletions examples/rest-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ where
let mut storage = storage.clone();
let res = storage.push(job.into_inner()).await;
match res {
Ok(()) => HttpResponse::Ok().body(format!("Job added to queue")),
Ok(()) => HttpResponse::Ok().body("Job added to queue".to_string()),
Err(e) => HttpResponse::InternalServerError().body(format!("{}", e)),
}
}
Expand Down Expand Up @@ -185,7 +185,7 @@ impl StorageApiBuilder {
Self {
scope: self.scope.service(
Scope::new(J::NAME)
.app_data(web::Data::new(storage.clone()))
.app_data(web::Data::new(storage))
.route("", web::get().to(get_jobs::<J, S>)) // Fetch jobs in queue
.route("/workers", web::get().to(get_workers::<J, S>)) // Fetch jobs in queue
.route("/job", web::put().to(push_job::<J, S>)) // Allow add jobs via api
Expand All @@ -195,7 +195,7 @@ impl StorageApiBuilder {
}
}

fn to_scope(self) -> Scope {
fn build(self) -> Scope {
async fn fetch_queues(queues: web::Data<QueueList>) -> HttpResponse {
let mut queue_result = Vec::new();
for queue in &queues.set {
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn main() -> std::io::Result<()> {
.add_storage(sqlite.clone())
.add_storage(pg.clone())
.add_storage(mysql.clone())
.to_scope(),
.build(),
),
)
})
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
[package]
name = "apalis-core"
version = "0.3.0"
version = "0.3.1"
authors = ["Geoffrey Mureithi <[email protected]>"]
edition = "2018"
license = "MIT"
description = "Core for Apalis: simple, extensible multithreaded background processing for Rust"
readme = "../../README.md"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ trait JobDecodable
where
Self: Sized,
{
/// Decode the given Redis value into a message
/// Decode the given value into a message
///
/// In the default implementation, the string value is decoded by assuming
/// it was encoded through the Msgpack encoding.
fn decode_job(value: &Vec<u8>) -> Result<Self, JobError>;
fn decode_job(value: &[u8]) -> Result<Self, JobError>;
}

/// Job objects that can be encoded to a string to be stored in Storage.
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/layers/prometheus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where
.as_ref()
.ok()
.map(|res| format!("{}", res))
.unwrap_or("Error".to_string());
.unwrap_or_else(|| "Error".to_string());

let labels = [
("name", this.operation.to_string()),
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/layers/sentry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ where
event.event_id = event_id;
Some(event)
});
scope.set_tag("job_type", format!("{}", job_details.job_type));
scope.set_tag("job_type", job_details.job_type.to_string());
let mut details = std::collections::BTreeMap::new();
details.insert(String::from("job_id"), job_details.job_id.into());
details.insert(
Expand Down
12 changes: 8 additions & 4 deletions packages/apalis-core/src/layers/tracing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ impl TraceLayer {
}
}

impl Default for TraceLayer {
fn default() -> Self {
Self::new()
}
}

impl<MakeSpan, OnRequest, OnResponse, OnFailure>
TraceLayer<MakeSpan, OnRequest, OnResponse, OnFailure>
{
Expand Down Expand Up @@ -335,15 +341,13 @@ where
self.inner.call(req)
};

let future = ResponseFuture {
ResponseFuture {
inner: job,
span,
on_response: Some(self.on_response.clone()),
on_failure: Some(self.on_failure.clone()),
start,
};

future
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions packages/apalis-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ pub trait Storage: Clone {
async fn reenqueue_active(&mut self, _job_ids: Vec<String>) -> StorageResult<()> {
Ok(())
}

/// This method is not implemented yet but its self explanatory
#[doc(hidden)]
async fn is_empty(&self) -> StorageResult<bool> {
unimplemented!()
}
}

/// Each [Worker] sends heartbeat messages to storage
Expand Down
12 changes: 6 additions & 6 deletions packages/apalis-core/src/storage/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where
let start = Instant::now() + Duration::from_millis(17);
ctx.notify_with(HeartbeatStream::new(
pulse.clone(),
interval_at(start, duration.clone()),
interval_at(start, *duration),
));
}
}
Expand All @@ -129,7 +129,7 @@ where

fn consume(&mut self) -> JobStreamResult<Self::Job> {
self.storage
.stream(self.id.to_string(), self.config.fetch_interval.clone())
.stream(self.id.to_string(), self.config.fetch_interval)
}

fn service(&mut self) -> &mut Self::Service {
Expand All @@ -154,9 +154,9 @@ where
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(&job.inner(), &job, &WorkerError::Storage(e));
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(&job.inner(), &job, instant.elapsed());
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;

if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
Expand Down Expand Up @@ -205,7 +205,7 @@ where
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(&job.inner(), &job, &WorkerError::Storage(e));
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Expand All @@ -215,7 +215,7 @@ where
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(&job.inner(), &job, &WorkerError::Storage(e));
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}

Expand Down
8 changes: 1 addition & 7 deletions packages/apalis-core/src/worker/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,10 @@ impl Clone for Broker {
}
}



impl Broker {
/// Default constructor for broker
pub fn global() -> &'static Broker {
INSTANCE.get_or_init(|| {

Broker(Arc::new(DashMap::new()))

})
INSTANCE.get_or_init(|| Broker(Arc::new(DashMap::new())))
}

fn message_entry(&'_ self, id: TypeId) -> Entry<'_, TypeId, Vec<(TypeId, BrokerRecipient)>> {
Expand Down
2 changes: 0 additions & 2 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use {deadlock::WorkerId, std::any::type_name};
/// Allows communication between [Worker] and [Actor]
#[cfg(feature = "broker")]
pub mod broker;
#[cfg(feature = "broker")]
use broker::Broker;

#[cfg(feature = "broker")]
mod deadlock;
Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-core/src/worker/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,20 @@ impl Monitor<JoinHandle<Recipient<WorkerManagement>>> {
/// Start monitor listening for Ctrl + C
pub async fn run(self) -> std::io::Result<()> {
let res = self.run_without_signals().await;

log::debug!("Listening shut down command (ctrl + c)");
tokio::signal::ctrl_c()
.await
.expect("failed to listen for event");

println!("received ctrl-c event");
log::debug!("Workers shutdown complete");
res
}
}

/// Represents behaviour for listening to workers events via [Monitor]
pub trait WorkerListener: Send {
/// Called when an event is thrown by a worker
fn on_event(&self, worker_id: &String, event: &WorkerEvent);
fn on_event(&self, worker_id: &str, event: &WorkerEvent);
}

impl<K> Monitor<K> {
Expand Down
5 changes: 3 additions & 2 deletions packages/apalis-redis/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
[package]
name = "apalis-redis"
version = "0.3.0"
version = "0.3.1"
authors = ["Geoffrey Mureithi <[email protected]>"]
edition = "2018"
readme = "../../README.md"

license = "MIT"
description = "Redis Storage for Apalis: simple and reliable background processing based on Actix Actors"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
apalis-core = { path = "../../packages/apalis-core", version = "0.3.0", default-features = false }
apalis-core = { path = "../../packages/apalis-core", version = "0.3.1", default-features = false }
redis = { version = "0.21" , features = ["tokio-comp"] }
serde = "1"
log = "0.4"
Expand Down
Loading

0 comments on commit 98641e1

Please sign in to comment.