From d7bfbd6ba45658943d5eecc2cfe7602fc8a7c00c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A1bio=20Ferreira?= Date: Thu, 28 Sep 2023 17:20:21 +0100 Subject: [PATCH] refactor: move news scrapping to own module --- Cargo.lock | 221 ++------------- Cargo.toml | 2 +- Makefile | 1 + docker-compose.yaml | 21 ++ docker.prod/Dockerfile.news-scrapper | 54 ++++ k8s/deployment.yaml | 26 ++ news-scrapper/Cargo.toml | 20 ++ news-scrapper/Dockerfile.dev | 46 ++++ news-scrapper/src/config.rs | 20 ++ news-scrapper/src/main.rs | 76 ++++++ news-scrapper/src/news_ingestor.rs | 112 ++++++++ {news => news-scrapper}/src/scrapper.rs | 67 +++-- news/Cargo.toml | 5 +- news/src/app.rs | 80 ------ news/src/handlers/feeds.rs | 6 +- news/src/handlers/news.rs | 6 +- news/src/handlers/subscriptions.rs | 6 +- news/src/lib.rs | 7 +- news/src/main.rs | 99 +++---- news/src/models/news.rs | 30 --- news/src/news_created_subscriber.rs | 65 ----- news/src/news_websocket_processor.rs | 184 +++++++++++++ users/Cargo.toml | 9 +- users/src/app.rs | 7 +- users/src/handlers/user.rs | 20 +- users/src/services/event_service.rs | 7 +- users/src/services/user_service.rs | 4 +- users/tests/handlers/user_test.rs | 12 +- .../repositories/user_repository_test.rs | 2 +- utils/Cargo.toml | 5 + utils/src/broker.rs | 6 +- utils/src/error.rs | 67 +++++ utils/src/http/websockets/mod.rs | 1 + utils/src/http/websockets/ws_sender.rs | 28 ++ utils/src/lib.rs | 6 + utils/src/news/events.rs | 1 + utils/src/news/mod.rs | 5 + {news/src => utils/src/news}/models/feed.rs | 2 +- {news/src => utils/src/news}/models/mod.rs | 0 utils/src/news/models/news.rs | 66 +++++ .../src/news}/models/subscription.rs | 4 +- .../src/news}/repositories/feed_repository.rs | 8 +- .../src/news}/repositories/mod.rs | 0 .../src/news}/repositories/news_repository.rs | 8 +- .../repositories/subscription_repository.rs | 8 +- {news/src => utils/src/news}/schema.rs | 0 utils/src/news/services/events_service.rs | 39 +++ utils/src/news/services/mod.rs | 2 + utils/src/news/services/news_service.rs | 255 ++++++++++++++++++ utils/src/pipeline/consumer.rs | 43 +++ utils/src/pipeline/data_pipeline.rs | 33 +++ utils/src/pipeline/mod.rs | 3 + utils/src/pipeline/processor.rs | 8 + 53 files changed, 1272 insertions(+), 541 deletions(-) create mode 100644 docker.prod/Dockerfile.news-scrapper create mode 100644 news-scrapper/Cargo.toml create mode 100644 news-scrapper/Dockerfile.dev create mode 100644 news-scrapper/src/config.rs create mode 100644 news-scrapper/src/main.rs create mode 100644 news-scrapper/src/news_ingestor.rs rename {news => news-scrapper}/src/scrapper.rs (75%) delete mode 100644 news/src/app.rs delete mode 100644 news/src/models/news.rs delete mode 100644 news/src/news_created_subscriber.rs create mode 100644 news/src/news_websocket_processor.rs create mode 100644 utils/src/http/websockets/ws_sender.rs create mode 100644 utils/src/news/events.rs create mode 100644 utils/src/news/mod.rs rename {news/src => utils/src/news}/models/feed.rs (90%) rename {news/src => utils/src/news}/models/mod.rs (100%) create mode 100644 utils/src/news/models/news.rs rename {news/src => utils/src/news}/models/subscription.rs (80%) rename {news/src => utils/src/news}/repositories/feed_repository.rs (93%) rename {news/src => utils/src/news}/repositories/mod.rs (100%) rename {news/src => utils/src/news}/repositories/news_repository.rs (95%) rename {news/src => utils/src/news}/repositories/subscription_repository.rs (95%) rename {news/src => utils/src/news}/schema.rs (100%) create mode 100644 utils/src/news/services/events_service.rs create mode 100644 utils/src/news/services/mod.rs create mode 100644 utils/src/news/services/news_service.rs create mode 100644 utils/src/pipeline/consumer.rs create mode 100644 utils/src/pipeline/data_pipeline.rs create mode 100644 utils/src/pipeline/mod.rs create mode 100644 utils/src/pipeline/processor.rs diff --git a/Cargo.lock b/Cargo.lock index 07a0de3..640ae0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -661,17 +661,6 @@ dependencies = [ "syn 2.0.31", ] -[[package]] -name = "diesel_migrations" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6036b3f0120c5961381b570ee20a02432d7e2d27ea60de9578799cf9156914ac" -dependencies = [ - "diesel", - "migrations_internals", - "migrations_macros", -] - [[package]] name = "diesel_table_macro_syntax" version = "0.1.0" @@ -759,12 +748,6 @@ dependencies = [ "libc", ] -[[package]] -name = "fallible-iterator" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" - [[package]] name = "fastrand" version = "2.0.0" @@ -789,12 +772,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "finl_unicode" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" - [[package]] name = "flate2" version = "1.0.27" @@ -1015,15 +992,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" -[[package]] -name = "hmac" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" -dependencies = [ - "digest", -] - [[package]] name = "http" version = "0.2.9" @@ -1303,42 +1271,12 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "md-5" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" -dependencies = [ - "digest", -] - [[package]] name = "memchr" version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" -[[package]] -name = "migrations_internals" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f23f71580015254b020e856feac3df5878c2c7a8812297edd6c0a485ac9dada" -dependencies = [ - "serde", - "toml", -] - -[[package]] -name = "migrations_macros" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cce3325ac70e67bbab5bd837a31cae01f1a6db64e0e744a33cb03a543469ef08" -dependencies = [ - "migrations_internals", - "proc-macro2", - "quote", -] - [[package]] name = "mime" version = "0.3.17" @@ -1450,22 +1388,37 @@ dependencies = [ "chrono", "diesel", "feed-rs", - "futures", "log", "mockall", "mockito", "rdkafka", - "reqwest", "rstest", "serde", "serde_json", "tokio", - "tokio-cron-scheduler", "utils", "uuid", "validator", ] +[[package]] +name = "news-scrapper" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "feed-rs", + "futures", + "log", + "mockall", + "mockito", + "reqwest", + "tokio", + "tokio-cron-scheduler", + "utils", + "uuid", +] + [[package]] name = "nom" version = "7.1.3" @@ -1693,24 +1646,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" -[[package]] -name = "phf" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" -dependencies = [ - "phf_shared", -] - -[[package]] -name = "phf_shared" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" -dependencies = [ - "siphasher", -] - [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1729,35 +1664,6 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" -[[package]] -name = "postgres-protocol" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" -dependencies = [ - "base64 0.21.3", - "byteorder", - "bytes", - "fallible-iterator", - "hmac", - "md-5", - "memchr", - "rand", - "sha2", - "stringprep", -] - -[[package]] -name = "postgres-types" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" -dependencies = [ - "bytes", - "fallible-iterator", - "postgres-protocol", -] - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2188,15 +2094,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_spanned" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186" -dependencies = [ - "serde", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2220,17 +2117,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sha2" -version = "0.10.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -2305,17 +2191,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "stringprep" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb41d74e231a107a1b4ee36bd1214b11285b77768d2e3824aedafa988fd36ee6" -dependencies = [ - "finl_unicode", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "subtle" version = "2.5.0" @@ -2499,32 +2374,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-postgres" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "fallible-iterator", - "futures-channel", - "futures-util", - "log", - "parking_lot 0.12.1", - "percent-encoding", - "phf", - "pin-project-lite", - "postgres-protocol", - "postgres-types", - "rand", - "socket2 0.5.3", - "tokio", - "tokio-util", - "whoami", -] - [[package]] name = "tokio-util" version = "0.7.8" @@ -2539,26 +2388,11 @@ dependencies = [ "tracing", ] -[[package]] -name = "toml" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17e963a819c331dcacd7ab957d80bc2b9a9c1e71c804826d2f283dd65306542" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit", -] - [[package]] name = "toml_datetime" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" -dependencies = [ - "serde", -] [[package]] name = "toml_edit" @@ -2567,8 +2401,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" dependencies = [ "indexmap 2.0.0", - "serde", - "serde_spanned", "toml_datetime", "winnow", ] @@ -2675,8 +2507,6 @@ dependencies = [ "async-trait", "chrono", "diesel", - "diesel_migrations", - "futures", "jsonwebtoken", "log", "mockall", @@ -2686,7 +2516,6 @@ dependencies = [ "serde", "serde_json", "tokio", - "tokio-postgres", "utils", "uuid", "validator", @@ -2701,15 +2530,19 @@ dependencies = [ "actix-http", "actix-web", "actix-web-actors", + "async-trait", "chrono", "diesel", "env_logger", + "feed-rs", "jsonwebtoken", "log", + "mockall", "rand", "rdkafka", "serde", "serde_json", + "tokio", "uuid", ] @@ -2868,16 +2701,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "whoami" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" -dependencies = [ - "wasm-bindgen", - "web-sys", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index c50b169..7b9c5cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] -members = ["users", "utils", "news"] +members = ["users", "utils", "news", "news-scrapper"] diff --git a/Makefile b/Makefile index 34b5ec2..c96f963 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,7 @@ test: build-prod: docker build -t ffff/rust-users-prod -f ./docker.prod/Dockerfile.users . docker build -t ffff/rust-news-prod -f ./docker.prod/Dockerfile.news . + docker build -t ffff/rust-news-scrapper-prod -f ./docker.prod/Dockerfile.news-scrapper . docker build -t ffff/rust-fe-prod -f ./docker.prod/Dockerfile.fe . docker build -t ffff/rust-users-migrations-prod -f ./docker.prod/Dockerfile.users-migrations . docker build -t ffff/rust-news-migrations-prod -f ./docker.prod/Dockerfile.news-migrations . diff --git a/docker-compose.yaml b/docker-compose.yaml index 2df887c..723c12b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -120,6 +120,27 @@ services: news-migrations: condition: service_completed_successfully + news-scrapper: + build: + context: . + dockerfile: ./news-scrapper/Dockerfile.dev + volumes: + - ./news-scrapper/src:/app/news-scrapper/src + - ./utils/src:/app/utils/src + - ./logs/news-scrapper:/var/log/app + environment: + DATABASE_URL: "postgres://myuser:mypassword@postgres:5432/mydb?options=-c%20search_path%3Dnews" + LOGS_PATH: /var/log/app/stdout.log + RUST_LOG: debug + KAFKA_URL: kafka:29092 + depends_on: + postgres: + condition: service_healthy + kafka: + condition: service_healthy + news-migrations: + condition: service_completed_successfully + news-migrations: build: context: ./news diff --git a/docker.prod/Dockerfile.news-scrapper b/docker.prod/Dockerfile.news-scrapper new file mode 100644 index 0000000..9ac8c79 --- /dev/null +++ b/docker.prod/Dockerfile.news-scrapper @@ -0,0 +1,54 @@ +ARG RUST_VERSION=1.72.0 +ARG APP_NAME=news-scrapper +FROM rust:${RUST_VERSION}-slim-bullseye AS build +ARG APP_NAME +WORKDIR /app + +# dependencies to compile the Kafka client +RUN apt-get update && apt-get install -y build-essential \ + curl \ + openssl libssl-dev \ + pkg-config \ + python \ + valgrind \ + zlib1g-dev + +# dependencies to compile diesel client +RUN apt-get install -y \ + libpq-dev + +RUN echo "[workspace]\n\ + members = [\n\ + \"news-scrapper\",\n\ + \"utils\"\n\ + ]" > ./Cargo.toml + +COPY ./news-scrapper/Cargo.toml ./news-scrapper/ +COPY ./utils/Cargo.toml ./utils/ + +RUN mkdir news-scrapper/src +RUN echo "fn main() {}" > ./news-scrapper/src/main.rs + +RUN mkdir utils/src +RUN echo "fn main() {}" > ./utils/src/main.rs + +RUN cargo build --release + +COPY ./utils/src ./utils/src +COPY ./news-scrapper/src ./news-scrapper/src + +RUN cargo build --release + +RUN cp ./target/release/$APP_NAME /bin/server + + +FROM debian:bullseye-slim AS final + +RUN apt-get update && apt-get install -y \ + libpq-dev \ + openssl libssl-dev \ + ca-certificates + +COPY --from=build /bin/server /bin/ + +CMD ["/bin/server"] diff --git a/k8s/deployment.yaml b/k8s/deployment.yaml index 680fdc6..621830d 100644 --- a/k8s/deployment.yaml +++ b/k8s/deployment.yaml @@ -182,3 +182,29 @@ spec: imagePullPolicy: IfNotPresent ports: - containerPort: 80 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: news-scrapper-deployment +spec: + replicas: 1 + selector: + matchLabels: + app: news-scrapper + template: + metadata: + labels: + app: news-scrapper + spec: + containers: + - name: news-scrapper-container + image: ffff/rust-news-scrapper-prod + imagePullPolicy: IfNotPresent + env: + - name: DATABASE_URL + value: 'postgres://myuser:mypassword@postgres-service:5432/mydb?options=-c%20search_path%3Dnews' + - name: KAFKA_URL + value: 'kafka-service:9092' + - name: RUST_LOG + value: 'debug' diff --git a/news-scrapper/Cargo.toml b/news-scrapper/Cargo.toml new file mode 100644 index 0000000..3216771 --- /dev/null +++ b/news-scrapper/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "news-scrapper" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.73" +bytes = "1.5.0" +feed-rs = "1.3.0" +futures = "0.3.28" +log = "0.4.20" +mockall = "0.11.4" +mockito = "1.2.0" +reqwest = "0.11.20" +tokio = "1.32.0" +tokio-cron-scheduler = "0.9.4" +utils = { path = "../utils", features = ["broker", "database", "news"] } +uuid = { version = "1.4.1", features = ["v4", "serde"] } diff --git a/news-scrapper/Dockerfile.dev b/news-scrapper/Dockerfile.dev new file mode 100644 index 0000000..8f2c0c3 --- /dev/null +++ b/news-scrapper/Dockerfile.dev @@ -0,0 +1,46 @@ +ARG RUST_VERSION=1.72.0 +ARG APP_NAME=news-scrapper +FROM rust:${RUST_VERSION}-slim-bullseye AS build +ARG APP_NAME +WORKDIR /app + +# dependencies to compile the Kafka client +RUN apt-get update && apt-get install -y build-essential \ + curl \ + openssl libssl-dev \ + pkg-config \ + python \ + valgrind \ + zlib1g-dev + +# dependencies to compile diesel client +RUN apt-get install -y \ + libpq-dev + +RUN cargo install cargo-watch + +RUN echo "[workspace]\n\ + members = [\n\ + \"news-scrapper\",\n\ + \"utils\"\n\ + ]" > ./Cargo.toml + +COPY ./news/Cargo.toml ./news-scrapper/ +COPY ./utils/Cargo.toml ./utils/ + +RUN mkdir news-scrapper/src +RUN echo "fn main() {}" > ./news-scrapper/src/main.rs + +RUN mkdir utils/src +RUN echo "fn main() {}" > ./utils/src/main.rs + +RUN cargo build + +COPY ./news-scrapper/src ./news-scrapper/src +# COPY ./news/tests ./news/tests + +COPY ./utils/src ./utils/src + +EXPOSE 8000 + +CMD ["cargo","watch","-x","run"] diff --git a/news-scrapper/src/config.rs b/news-scrapper/src/config.rs new file mode 100644 index 0000000..0036400 --- /dev/null +++ b/news-scrapper/src/config.rs @@ -0,0 +1,20 @@ +#[derive(Debug, Clone)] +pub struct Config { + pub database_url: String, + pub logs_path: String, + pub kafka_url: String, +} + +impl Config { + pub fn init() -> Config { + let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + let logs_path = std::env::var("LOGS_PATH").unwrap_or_else(|_| String::from("")); + let kafka_url = std::env::var("KAFKA_URL").expect("KAFKA_URL must be set"); + + Config { + database_url, + logs_path, + kafka_url, + } + } +} diff --git a/news-scrapper/src/main.rs b/news-scrapper/src/main.rs new file mode 100644 index 0000000..54ff408 --- /dev/null +++ b/news-scrapper/src/main.rs @@ -0,0 +1,76 @@ +mod config; +mod news_ingestor; +mod scrapper; +use std::{error::Error, sync::Arc, thread}; +use tokio_cron_scheduler::{Job, JobScheduler}; +use utils::{ + broker, + db::connect_db, + logger::init_logger, + news::{ + repositories::{ + feed_repository::{FeedDieselRepository, FeedRepository}, + news_repository::{NewsDieselRepository, NewsRepository}, + subscription_repository::{SubscriptionRepository, SubscriptionsDieselRepository}, + }, + services::{ + events_service::{EventService, KafkaEventService}, + news_service::{NewsService, Service}, + }, + }, +}; + +#[tokio::main] +async fn main() { + let config = config::Config::init(); + + init_logger(config.logs_path.clone()); + + let kafka_producer = broker::create_producer(config.kafka_url.clone()); + + let db_pool = connect_db(config.database_url.clone()); + + let feed_repository: Arc = + Arc::new(FeedDieselRepository::new(Arc::new(db_pool.clone()))); + let news_repository: Arc = + Arc::new(NewsDieselRepository::new(Arc::new(db_pool.clone()))); + let subscription_repository: Arc = Arc::new( + SubscriptionsDieselRepository::new(Arc::new(db_pool.clone())), + ); + let events_service: Arc = Arc::new(KafkaEventService::new(kafka_producer)); + + let service: Arc = Arc::new(Service::new( + feed_repository.clone(), + news_repository.clone(), + subscription_repository.clone(), + events_service.clone(), + )); + + let feeds_scrapper = Arc::new(scrapper::RssScrapper::default()); + + let ingestor = news_ingestor::NewsIngestor::new(service, feeds_scrapper); + + if let Err(err) = setup_cronjobs(&ingestor).await { + panic!("failed setup cronjobs: {}", err); + }; + + thread::park(); +} + +async fn setup_cronjobs(ingestor: &news_ingestor::NewsIngestor) -> Result<(), Box> { + let ingestor = ingestor.clone(); + + let sched = JobScheduler::new().await?; + + let scrap_news_job = Job::new_async("0 * * * * *", move |_uuid, _l| { + let ingestor = ingestor.clone(); + Box::pin(async move { + ingestor.ingest().await; + }) + })?; + sched.add(scrap_news_job).await?; + + sched.start().await?; + + Ok(()) +} diff --git a/news-scrapper/src/news_ingestor.rs b/news-scrapper/src/news_ingestor.rs new file mode 100644 index 0000000..db4750f --- /dev/null +++ b/news-scrapper/src/news_ingestor.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; + +use log::{debug, error}; +use tokio::{sync::mpsc, task}; +use utils::news::{models::news::News, services::news_service::NewsService}; + +use crate::scrapper::FeedsScrapper; + +#[derive(Clone)] +pub struct NewsIngestor { + pub news_service: Arc, + pub feeds_scrapper: Arc, +} + +impl NewsIngestor { + pub fn new( + news_service: Arc, + feeds_scrapper: Arc, + ) -> NewsIngestor { + NewsIngestor { + news_service, + feeds_scrapper, + } + } + + pub async fn ingest(&self) { + debug!("start scrapping feeds"); + let result = self.news_service.list_feeds().await; + + let feeds = match result { + Ok(feeds) => feeds, + Err(err) => { + log::error!("failed to list feeds: {}", err); + return; + } + }; + + const BUFFER_SIZE: usize = 10; + let (tx, mut rx) = mpsc::channel::>(BUFFER_SIZE); + + let feeds_scrapper = self.feeds_scrapper.clone(); + + task::spawn(async move { + let result = feeds_scrapper.scrap_all(feeds, tx).await; + if let Err(err) = result { + error!("failed scrapping feeds: {}", err); + } + }); + + while let Some(news) = rx.recv().await { + for news_item in news { + if let Err(err) = self.news_service.insert_news(&news_item).await { + error!("failed inserting news: {}", err); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use utils::{ + error::{CommonError, DATABASE_ERROR_CODE}, + news::{models::feed::Feed, services::news_service::MockNewsService}, + }; + + use crate::scrapper::MockFeedsScrapper; + + use super::*; + + #[tokio::test] + async fn test_news_ingestor_ingest_success() { + let mut news_service = MockNewsService::new(); + let mut feeds_scrapper = MockFeedsScrapper::new(); + + // Set up the mock behaviors for the NewsService and FeedsScrapper + news_service.expect_list_feeds().returning(|| { + Ok(vec![Feed { + author: "coingraph".to_string(), + title: "Coingraph".to_string(), + url: "".to_string(), + id: uuid::Uuid::new_v4(), + }]) + }); + feeds_scrapper.expect_scrap_all().returning(|_, _| Ok(())); + + let news_service = Arc::new(news_service); + let feeds_scrapper = Arc::new(feeds_scrapper); + + let news_ingestor = NewsIngestor::new(news_service, feeds_scrapper); + news_ingestor.ingest().await; + } + + #[tokio::test] + async fn test_news_ingestor_ingest_list_feeds_error() { + let mut news_service = MockNewsService::new(); + let feeds_scrapper = Arc::new(MockFeedsScrapper::new()); + + // Set up the mock behavior for the NewsService when list_feeds fails + news_service.expect_list_feeds().returning(|| { + Err(CommonError { + message: "db is down".to_string(), + code: DATABASE_ERROR_CODE, + }) + }); + + let news_service = Arc::new(news_service); + + let news_ingestor = NewsIngestor::new(news_service, feeds_scrapper); + news_ingestor.ingest().await; + } +} diff --git a/news/src/scrapper.rs b/news-scrapper/src/scrapper.rs similarity index 75% rename from news/src/scrapper.rs rename to news-scrapper/src/scrapper.rs index c394512..caa66af 100644 --- a/news/src/scrapper.rs +++ b/news-scrapper/src/scrapper.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use bytes::{Buf, Bytes}; use feed_rs::model::Feed; use feed_rs::parser; @@ -5,21 +6,28 @@ use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use log::debug; use log::error; +use mockall::automock; use reqwest::get; use tokio::sync::mpsc::Sender; use tokio::sync::Semaphore; use utils::error::CommonError; use utils::error::HttpError; -use uuid::Uuid; -use crate::models::feed::Feed as RssFeed; -use crate::models::news::News; +use utils::news::models::feed::Feed as RssFeed; +use utils::news::models::news::News; + +#[automock] +#[async_trait] +pub trait FeedsScrapper: Send + Sync { + async fn scrap_all(&self, feeds: Vec, tx: Sender>) -> Result<(), String>; +} #[derive(Clone)] -pub struct Scrapper {} +pub struct RssScrapper {} -impl Scrapper { - pub async fn scrap_all(feeds: Vec, tx: Sender>) -> Result<(), String> { +#[async_trait] +impl FeedsScrapper for RssScrapper { + async fn scrap_all(&self, feeds: Vec, tx: Sender>) -> Result<(), String> { let max_concurrency = 5; // Define the maximum concurrency limit let semaphore = Semaphore::new(max_concurrency); @@ -41,35 +49,10 @@ impl Scrapper { let mut news = Vec::new(); for feed_news in feed.entries { - let mut author = "".to_string(); - let mut url = "".to_string(); - let mut title = "".to_string(); - let mut publish_date = chrono::Utc::now().naive_local().date(); - - if !feed_news.authors.is_empty() { - author = feed_news.authors[0].name.clone(); - } - - if let Some(source) = feed_news.source { - url = source; - } - - if let Some(news_title) = feed_news.title { - title = news_title.content.to_string(); - } - - if let Some(date) = feed_news.published { - publish_date = date.naive_local().date(); - } - - news.push(News { - id: Uuid::new_v4(), - author, - url, - title, - feed_id: rss_feed.id, - publish_date: Some(publish_date), - }) + let mut news_entry: News = feed_news.into(); + news_entry.feed_id = rss_feed.id; + + news.push(news_entry) } if let Err(err) = tx.send(news).await { @@ -87,6 +70,18 @@ impl Scrapper { Ok(()) } +} + +impl Default for RssScrapper { + fn default() -> Self { + Self::new() + } +} + +impl RssScrapper { + pub fn new() -> Self { + RssScrapper {} + } async fn scrap_with_retry( rss_feed: RssFeed, @@ -95,7 +90,7 @@ impl Scrapper { let mut retry_count = 0; loop { - let result = Scrapper::scrap(&rss_feed).await; + let result = RssScrapper::scrap(&rss_feed).await; if let Ok(feed) = result { drop(permit); // Release the semaphore permit diff --git a/news/Cargo.toml b/news/Cargo.toml index be52e81..006a2ad 100644 --- a/news/Cargo.toml +++ b/news/Cargo.toml @@ -14,15 +14,12 @@ diesel = { version = "2.1.1", features = [ "chrono", ] } feed-rs = "1.3.0" -futures = "0.3.28" -reqwest = "0.11.20" serde = { version = "1.0.188", features = ["derive"] } tokio = { version = "1.32.0", features = ["full"] } uuid = { version = "1.4.1", features = ["v4", "serde"] } -utils = { path = "../utils", features = ["broker", "database"] } +utils = { path = "../utils", features = ["broker", "database", "news"] } chrono = "0.4.31" log = "0.4.20" -tokio-cron-scheduler = "0.9.4" actix-web = "4.4.0" mockito = "1.2.0" mockall = "0.11.4" diff --git a/news/src/app.rs b/news/src/app.rs deleted file mode 100644 index d58e6ca..0000000 --- a/news/src/app.rs +++ /dev/null @@ -1,80 +0,0 @@ -use std::sync::Arc; - -use crate::{ - models::news::News, - repositories::{feed_repository::FeedRepository, news_repository::NewsRepository}, - scrapper::Scrapper, -}; -use log::{debug, error, info}; -use tokio::{sync::mpsc, task}; -use utils::{ - broker::{self, KafkaProducer}, - error::CommonError, -}; - -#[derive(Clone)] -pub struct App { - pub feed_repo: Arc, - pub news_repo: Arc, - pub kafka_producer: KafkaProducer, -} - -impl App { - pub fn new( - feed_repo: Arc, - news_repo: Arc, - kafka_producer: KafkaProducer, - ) -> App { - App { - feed_repo, - news_repo, - kafka_producer, - } - } - - pub async fn scrap_feeds(&self) { - debug!("start scrapping feeds"); - let result = self.feed_repo.list(); - - let feeds = result.unwrap(); - - const BUFFER_SIZE: usize = 10; - let (tx, mut rx) = mpsc::channel::>(BUFFER_SIZE); - - task::spawn(async move { - let result = Scrapper::scrap_all(feeds, tx).await; - if let Err(err) = result { - error!("failed scrapping feeds: {}", err); - } - }); - - while let Some(news) = rx.recv().await { - for news in news { - let db_news = self - .news_repo - .find_by_fields(Some(news.title.clone()), Some(news.feed_id)); - - if let Ok(None) = db_news { - let result = self.news_repo.create(&news); - match result { - Ok(news) => { - info!( - "News with title {} of feed {} inserted!", - news.title, news.feed_id - ); - let _ = broker::send_message_to_topic( - self.kafka_producer.clone(), - "news_created".to_string(), - serde_json::to_string(&news).unwrap(), - ) - .await; - } - Err(err) => { - error!("failed creating new {:?}: {}", news, CommonError::from(err)); - } - } - } - } - } - } -} diff --git a/news/src/handlers/feeds.rs b/news/src/handlers/feeds.rs index 9ce7c4e..d567bd4 100644 --- a/news/src/handlers/feeds.rs +++ b/news/src/handlers/feeds.rs @@ -2,7 +2,7 @@ use actix_web::{get, web, HttpResponse}; use log::error; use utils::error::CommonError; -use crate::repositories::feed_repository::FeedRepository; +use utils::news::repositories::feed_repository::FeedRepository; #[get("/feeds")] async fn get_feeds(feed_repo: web::Data) -> HttpResponse { @@ -29,8 +29,8 @@ mod tests { use utils::http::test_utils::HttpTestCase; use uuid::Uuid; - use crate::models::feed::Feed; - use crate::repositories::feed_repository::MockFeedRepository; + use utils::news::models::feed::Feed; + use utils::news::repositories::feed_repository::MockFeedRepository; struct GetFeedsTestCase { http_case: HttpTestCase, diff --git a/news/src/handlers/news.rs b/news/src/handlers/news.rs index d786880..abbf5c2 100644 --- a/news/src/handlers/news.rs +++ b/news/src/handlers/news.rs @@ -3,7 +3,7 @@ use actix_web::{get, web, HttpRequest, HttpResponse}; use log::error; use utils::{error::CommonError, http::middlewares::jwt_auth::JwtMiddleware}; -use crate::repositories::news_repository::NewsRepository; +use utils::news::repositories::news_repository::NewsRepository; #[get("/news")] async fn get_news( @@ -38,8 +38,8 @@ mod tests { use utils::http::test_utils::HttpTestCase; use uuid::Uuid; - use crate::models::news::News; - use crate::repositories::news_repository::MockNewsRepository; + use utils::news::models::news::News; + use utils::news::repositories::news_repository::MockNewsRepository; struct GetNewsTestCase { pub service_result: Result, DatabaseError>, diff --git a/news/src/handlers/subscriptions.rs b/news/src/handlers/subscriptions.rs index 44689a9..156f19b 100644 --- a/news/src/handlers/subscriptions.rs +++ b/news/src/handlers/subscriptions.rs @@ -7,8 +7,8 @@ use utils::{error::CommonError, http::middlewares::jwt_auth::JwtMiddleware}; use uuid::Uuid; use validator::Validate; -use crate::models::subscription::Subscription; -use crate::repositories::subscription_repository::SubscriptionRepository; +use utils::news::models::subscription::Subscription; +use utils::news::repositories::subscription_repository::SubscriptionRepository; #[derive(Debug, Serialize, Deserialize, Validate)] pub struct CreateSubscriptionPayload { @@ -116,7 +116,7 @@ mod tests { use utils::http::test_utils::HttpTestCase; use uuid::Uuid; - use crate::repositories::subscription_repository::MockSubscriptionRepository; + use utils::news::repositories::subscription_repository::MockSubscriptionRepository; struct GetSubscriptionsTestCase { http_case: HttpTestCase, diff --git a/news/src/lib.rs b/news/src/lib.rs index 01b4eaa..75504ea 100644 --- a/news/src/lib.rs +++ b/news/src/lib.rs @@ -1,8 +1,3 @@ -pub mod app; pub mod config; pub mod handlers; -pub mod models; -pub mod news_created_subscriber; -pub mod repositories; -pub mod schema; -pub mod scrapper; +pub mod news_websocket_processor; diff --git a/news/src/main.rs b/news/src/main.rs index 9f15859..62954d4 100644 --- a/news/src/main.rs +++ b/news/src/main.rs @@ -6,28 +6,26 @@ use actix_web::dev::{ServiceFactory, ServiceRequest, ServiceResponse}; use actix_web::{error::Error as ActixError, web, App as ActixApp, HttpServer}; use log::info; use news::handlers::subscriptions::{create_subscription, delete_subscription, get_subscriptions}; -use news::news_created_subscriber::setup_news_created_subscriber; -use news::repositories::feed_repository::FeedDieselRepository; -use news::repositories::news_repository::NewsDieselRepository; -use news::repositories::subscription_repository::{ - SubscriptionRepository, SubscriptionsDieselRepository, -}; +use news::news_websocket_processor::NewsWebsocketProcessor; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use std::sync::Arc; use std::thread; -use std::{error::Error, sync::Arc}; -use tokio_cron_scheduler::{Job, JobScheduler}; -use utils::broker::{self}; -use utils::http::services::auth_service::JwtAuthService; +use utils::broker; +use utils::http::services::auth_service::{AuthService, JwtAuthService}; use utils::http::websockets::ws_handler::get_ws; +use utils::http::websockets::ws_sender::WsSenderWrapper; use utils::http::websockets::ws_server::WebsocketServer; +use utils::news::events::NEWS_CREATED_EVENT; +use utils::news::repositories::feed_repository::{FeedDieselRepository, FeedRepository}; +use utils::news::repositories::news_repository::{NewsDieselRepository, NewsRepository}; +use utils::news::repositories::subscription_repository::{ + SubscriptionRepository, SubscriptionsDieselRepository, +}; +use utils::pipeline::consumer::KafkaConsumer; +use utils::pipeline::data_pipeline::DataPipeline; use utils::{db::connect_db, http::utils::build_server, logger::init_logger}; -use news::{ - app::App, - config::Config, - handlers::feeds::get_feeds, - handlers::news::get_news, - repositories::{feed_repository::FeedRepository, news_repository::NewsRepository}, -}; +use news::{config::Config, handlers::feeds::get_feeds, handlers::news::get_news}; #[actix_web::main] async fn main() { @@ -35,8 +33,6 @@ async fn main() { init_logger(config.logs_path.clone()); - let kafka_producer = broker::create_producer(config.kafka_url.clone()); - let db_pool = connect_db(config.database_url.clone()); let feed_repository: Arc = @@ -47,31 +43,14 @@ async fn main() { SubscriptionsDieselRepository::new(Arc::new(db_pool.clone())), ); - let app = App::new( - feed_repository.clone(), - news_repository.clone(), - kafka_producer, - ); - - info!("Setting up cronjobs"); - - if let Err(err) = setup_cronjobs(&app).await { - panic!("failed setup cronjobs: {}", err); - }; - let server_port = config.server_port.clone(); info!("Starting API server in port {}", server_port.clone()); let ws_server = WebsocketServer::new().start(); + let consumer = broker::create_consumer(config.kafka_url.to_string()); - let config_clone = config.clone(); - let ws_server_clone = ws_server.clone(); - let subscription_repo_clone = subscription_repository.clone(); - - actix_rt::spawn(async move { - setup_news_created_subscriber(&config_clone, ws_server_clone, subscription_repo_clone).await - }); + setup_news_created_pipeline(consumer, &ws_server, subscription_repository.clone()); let server_result = HttpServer::new(move || { setup_http_server( @@ -98,24 +77,6 @@ async fn main() { thread::park(); } -async fn setup_cronjobs(app: &App) -> Result<(), Box> { - let app = app.clone(); - - let sched = JobScheduler::new().await?; - - let scrap_news_job = Job::new_async("0 * * * * *", move |_uuid, _l| { - let app = app.clone(); - Box::pin(async move { - app.scrap_feeds().await; - }) - })?; - sched.add(scrap_news_job).await?; - - sched.start().await?; - - Ok(()) -} - fn setup_http_server( config: &Config, feed_repo: Arc, @@ -131,17 +92,16 @@ fn setup_http_server( Error = ActixError, >, > { - let jwt_config = Arc::new(config.clone()); - let auth_service = Arc::new(JwtAuthService::new(config.jwt_secret.clone())); + let auth_service: Arc = + Arc::new(JwtAuthService::new(config.jwt_secret.clone())); build_server(config.cors_origin.clone()) .app_data(web::Data::from(feed_repo.clone())) .app_data(web::Data::from(news_repo.clone())) .app_data(web::Data::from(subscription_repo.clone())) .app_data(web::Data::new(config.clone())) - .app_data(web::Data::from(jwt_config.clone())) .app_data(web::Data::new(ws_server.clone())) - .app_data(web::Data::new(auth_service)) + .app_data(web::Data::from(auth_service.clone())) .service(get_news) .service(get_feeds) .service(get_subscriptions) @@ -149,3 +109,22 @@ fn setup_http_server( .service(delete_subscription) .service(get_ws) } + +fn setup_news_created_pipeline( + consumer: StreamConsumer, + ws_server: &Addr, + subscription_repo: Arc, +) { + let ws_sender = WsSenderWrapper::new(ws_server.clone()); + + actix_rt::spawn(async move { + consumer + .subscribe(&[NEWS_CREATED_EVENT]) + .expect("Error subscribing to topic"); + let processor = NewsWebsocketProcessor::new(&ws_sender, subscription_repo); + let consumer = KafkaConsumer::new(consumer); + let pipeline = DataPipeline::new(&consumer, &processor); + + pipeline.start().await; + }); +} diff --git a/news/src/models/news.rs b/news/src/models/news.rs deleted file mode 100644 index 5b21909..0000000 --- a/news/src/models/news.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::models::feed::Feed; -use crate::schema::news; -use chrono::NaiveDate; -use diesel::prelude::*; -use serde::{Deserialize, Serialize}; -use utils::serializer::serde_naive_date; - -#[derive( - Debug, - Clone, - Serialize, - Deserialize, - Selectable, - Queryable, - Insertable, - PartialEq, - Identifiable, - Associations, -)] -#[diesel(belongs_to(Feed))] -#[diesel(table_name = news)] -pub struct News { - pub id: uuid::Uuid, - pub author: String, - pub url: String, - pub title: String, - #[serde(with = "serde_naive_date")] - pub publish_date: Option, - pub feed_id: uuid::Uuid, -} diff --git a/news/src/news_created_subscriber.rs b/news/src/news_created_subscriber.rs deleted file mode 100644 index 4aac275..0000000 --- a/news/src/news_created_subscriber.rs +++ /dev/null @@ -1,65 +0,0 @@ -use crate::{ - config::Config, models::news::News, - repositories::subscription_repository::SubscriptionRepository, -}; -use actix::Addr; -use log::{debug, error, info}; -use rdkafka::consumer::Consumer; -use rdkafka::consumer::StreamConsumer; -use rdkafka::Message; -use std::{error::Error, sync::Arc}; -use utils::{ - broker, - http::websockets::ws_server::{SessionMessage, WebsocketServer}, -}; - -pub async fn setup_news_created_subscriber( - config: &Config, - ws: Addr, - subscription_repo: Arc, -) -> Result<(), Box> { - let consumer: StreamConsumer = broker::create_consumer(config.kafka_url.clone()); - - consumer - .subscribe(&["news_created"]) - .expect("Error subscribing to topic"); - - // Consume messages from the subscribed topic - loop { - match consumer.recv().await { - Ok(message) => { - let payload = message.payload_view::(); - match payload { - Some(Ok(payload)) => { - info!("Received message: {}", payload); - let news: News = - serde_json::from_str(payload).expect("Failed to convert JSON string"); - - let result = subscription_repo.list_by_feed(news.feed_id); - - match result { - Ok(subscriptions) => { - for s in subscriptions { - debug!("sending message to socket {}", s.user_id); - ws.do_send(SessionMessage { - id: s.user_id.to_string(), - message: payload.to_string(), - }); - } - } - Err(err) => { - error!( - "failed getting subscriptions from database: {}", - err.message - ); - } - } - } - Some(Err(_)) => error!("Error deserializing message payload"), - None => error!("Empty message payload"), - } - } - Err(_) => error!("Error deserializing message payload"), - } - } -} diff --git a/news/src/news_websocket_processor.rs b/news/src/news_websocket_processor.rs new file mode 100644 index 0000000..ccce548 --- /dev/null +++ b/news/src/news_websocket_processor.rs @@ -0,0 +1,184 @@ +use async_trait::async_trait; +use log::{debug, info}; +use std::sync::Arc; +use utils::{ + error::{CommonError, DatabaseError, SerializationError}, + http::websockets::{ws_sender::WebsocketServerSender, ws_server::SessionMessage}, + news::{models::news::News, repositories::subscription_repository::SubscriptionRepository}, + pipeline::processor::Processor, +}; + +pub struct NewsWebsocketProcessor<'a> { + websocket_server: &'a dyn WebsocketServerSender, + subscription_repo: Arc, +} + +impl<'a> NewsWebsocketProcessor<'a> { + pub fn new( + websocket_server: &'a dyn WebsocketServerSender, + subscription_repo: Arc, + ) -> Self { + NewsWebsocketProcessor { + websocket_server, + subscription_repo, + } + } +} + +#[async_trait] +impl<'a> Processor for NewsWebsocketProcessor<'a> { + async fn process(&self, payload: &str) -> Result<(), CommonError> { + info!("Received message: {}", payload); + let news: News = serde_json::from_str(payload).map_err(|err| { + SerializationError::new( + format!("Failed to convert JSON string: {}", err.to_string()).as_str(), + ) + })?; + + match self.subscription_repo.list_by_feed(news.feed_id) { + Ok(subscriptions) => { + for s in subscriptions { + debug!("sending message to socket {}", s.user_id); + self.websocket_server + .do_send(SessionMessage { + id: s.user_id.to_string(), + message: payload.to_string(), + }) + .await?; + } + + Ok(()) + } + Err(err) => Err(DatabaseError::new( + format!( + "failed getting subscriptions from database: {}", + err.message + ) + .as_str(), + ) + .into()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockall::predicate::*; + use std::str::FromStr; + use utils::{ + http::websockets::ws_sender::MockWebsocketServerSender, + news::models::subscription::Subscription, + news::repositories::subscription_repository::MockSubscriptionRepository, + }; + + #[tokio::test] + async fn test_process_success() { + // Create mock objects + let mut mock_websocket_server = MockWebsocketServerSender::new(); + let mut mock_subscription_repo = MockSubscriptionRepository::new(); + + // Define expected inputs and outputs + let payload = r#"{"id":"f130494f-711e-4bb3-940a-3d50bb65e521","author":"author1","url":"","title":"news test","feed_id":"63a0ae94-1ad8-45fd-acc6-9c68f58e28af","publish_date":"2023-05-13"}"#; + let subscriptions = vec![ + Subscription { + feed_id: uuid::Uuid::from_str("63a0ae94-1ad8-45fd-acc6-9c68f58e28af").unwrap(), + user_id: uuid::Uuid::from_str("9454decf-b36d-436e-96e1-f31a9a2f3d68").unwrap(), + }, + Subscription { + feed_id: uuid::Uuid::from_str("63a0ae94-1ad8-45fd-acc6-9c68f58e28af").unwrap(), + user_id: uuid::Uuid::from_str("9537e337-241e-4d3c-8776-b43fc1050010").unwrap(), + }, + ]; + + // Set up expectations + mock_websocket_server + .expect_do_send() + .times(2) + .withf(move |message| { + message.id == "9454decf-b36d-436e-96e1-f31a9a2f3d68" + || message.id == "9537e337-241e-4d3c-8776-b43fc1050010" + }) + .return_const(Ok(())); + mock_subscription_repo + .expect_list_by_feed() + .with(eq(uuid::Uuid::from_str( + "63a0ae94-1ad8-45fd-acc6-9c68f58e28af", + ) + .unwrap())) + .return_const(Ok(subscriptions)); + + // Create the processor instance + let processor = + NewsWebsocketProcessor::new(&mock_websocket_server, Arc::new(mock_subscription_repo)); + + // Perform the test + let result = processor.process(payload).await; + + // Check the result + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_process_failed_deserialization() { + // Create mock objects + let mock_websocket_server = MockWebsocketServerSender::new(); + let mock_subscription_repo = MockSubscriptionRepository::new(); + + // Define expected input + let payload = "invalid json"; + + // Create the processor instance + let processor = + NewsWebsocketProcessor::new(&mock_websocket_server, Arc::new(mock_subscription_repo)); + + // Perform the test + let result = processor.process(payload).await; + + // Check the result + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Error: Failed to convert JSON string: expected value at line 1 column 1, Code: 6" + ); + } + + #[tokio::test] + async fn test_process_failed_database() { + // Create mock objects + let mut mock_websocket_server = MockWebsocketServerSender::new(); + let mut mock_subscription_repo = MockSubscriptionRepository::new(); + + // Define expected inputs and outputs + let payload = r#"{"id":"f130494f-711e-4bb3-940a-3d50bb65e521","author":"author1","url":"","title":"news test","feed_id":"63a0ae94-1ad8-45fd-acc6-9c68f58e28af","publish_date":"2023-05-13"}"#; + + let error_message = "Failed to fetch subscriptions"; + + // Set up expectations + mock_websocket_server.expect_do_send().times(0); // No calls to do_send() expected + mock_subscription_repo + .expect_list_by_feed() + .with(eq(uuid::Uuid::from_str( + "63a0ae94-1ad8-45fd-acc6-9c68f58e28af", + ) + .unwrap())) + .return_const(Err(DatabaseError::new(error_message))); + + // Create the processor instance + let processor = + NewsWebsocketProcessor::new(&mock_websocket_server, Arc::new(mock_subscription_repo)); + + // Perform the test + let result = processor.process(payload).await; + + // Check the result + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + format!( + "Error: failed getting subscriptions from database: {}, Code: 1", + error_message + ), + ); + } +} diff --git a/users/Cargo.toml b/users/Cargo.toml index 00b88d0..240f6dd 100644 --- a/users/Cargo.toml +++ b/users/Cargo.toml @@ -8,14 +8,12 @@ edition = "2021" [dependencies] actix-web = "4.4" -tokio-postgres = "0.7.6" tokio = { version = "1.24.2", features = ["full"] } actix-web-actors = "4.2.0" actix = "0.13.1" log = "0.4.20" -diesel = { version = "2.1.1", features = ["postgres","r2d2","uuid"] } -diesel_migrations = "2.1.0" -uuid = { version="1.4.1", features=["v4","serde"] } +diesel = { version = "2.1.1", features = ["postgres", "r2d2", "uuid"] } +uuid = { version = "1.4.1", features = ["v4", "serde"] } serde = "1.0.188" serde_json = "1.0.105" actix-rt = "2.9.0" @@ -25,9 +23,8 @@ chrono = "0.4.28" rand = "0.8.5" jsonwebtoken = "8.3.0" argon2 = "0.5.2" -utils={ path = "../utils", features = ["broker","database"] } +utils = { path = "../utils", features = ["broker", "database"] } validator = { version = "0.16.1", features = ["derive"] } -futures = "0.3.28" mockall = "0.11.4" rstest = "0.18.2" actix-threadpool = "0.3.3" diff --git a/users/src/app.rs b/users/src/app.rs index 92888b5..1b7599b 100644 --- a/users/src/app.rs +++ b/users/src/app.rs @@ -4,7 +4,7 @@ use actix::Addr; use actix_web::body::MessageBody; use actix_web::dev::{ServiceFactory, ServiceRequest, ServiceResponse}; use actix_web::{error::Error, web, App}; -use utils::http::services::auth_service::JwtAuthService; +use utils::http::services::auth_service::{AuthService, JwtAuthService}; use utils::http::websockets::ws_handler::get_ws; use utils::http::websockets::ws_server::WebsocketServer; use utils::{broker, db, http::utils::build_server}; @@ -42,7 +42,8 @@ pub fn setup_app( let events_service = Arc::new(KafkaEventService::new(kafka_producer.clone())); let user_service: Arc = Arc::new(UserServiceImpl::new(user_repo, events_service)); - let auth_service = Arc::new(JwtAuthService::new(config.jwt_secret.clone())); + let auth_service: Arc = + Arc::new(JwtAuthService::new(config.jwt_secret.clone())); let jwt_config = Arc::new(config.clone()); @@ -52,7 +53,7 @@ pub fn setup_app( .app_data(web::Data::new(ws_server.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::from(jwt_config.clone())) - .app_data(web::Data::new(auth_service)) + .app_data(web::Data::from(auth_service.clone())) .service(get_index) .service(get_health) .service(get_ws) diff --git a/users/src/handlers/user.rs b/users/src/handlers/user.rs index 1a3063c..556c8a0 100644 --- a/users/src/handlers/user.rs +++ b/users/src/handlers/user.rs @@ -171,7 +171,7 @@ mod tests { let user_service: Arc = Arc::new(user_service); - let mut app = test::init_service( + let app = test::init_service( App::new() .app_data(web::Data::from(user_service.clone())) .service(get_users), @@ -179,7 +179,7 @@ mod tests { .await; let req = test::TestRequest::get().uri("/users").to_request(); - let resp = test::call_service(&mut app, req).await; + let resp = test::call_service(&app, req).await; assert_eq!(resp.status(), case.expected_status); @@ -228,7 +228,7 @@ mod tests { let user_service: Arc = Arc::new(user_service); - let mut app = test::init_service( + let app = test::init_service( App::new() .app_data(web::Data::from(user_service.clone())) .service(get_user_by_id), @@ -238,7 +238,7 @@ mod tests { let req = test::TestRequest::get() .uri(format!("/users/{}", case.id).as_str()) .to_request(); - let resp = test::call_service(&mut app, req).await; + let resp = test::call_service(&app, req).await; assert_eq!(resp.status(), case.expected_status); @@ -306,7 +306,7 @@ mod tests { let user_service: Arc = Arc::new(user_service); - let mut app = test::init_service( + let app = test::init_service( App::new() .app_data(web::Data::from(user_service.clone())) .service(create_user), @@ -318,7 +318,7 @@ mod tests { .insert_header(("content-type", "application/json")) .set_payload(serde_json::to_vec(&case.payload).unwrap()) .to_request(); - let resp = test::call_service(&mut app, req).await; + let resp = test::call_service(&app, req).await; assert_eq!(resp.status(), case.expected_status); @@ -385,7 +385,7 @@ mod tests { let user_service: Arc = Arc::new(user_service); - let mut app = test::init_service( + let app = test::init_service( App::new() .app_data(web::Data::from(user_service.clone())) .service(update_user), @@ -397,7 +397,7 @@ mod tests { .insert_header(("content-type", "application/json")) .set_payload(serde_json::to_vec(&case.payload).unwrap()) .to_request(); - let resp = test::call_service(&mut app, req).await; + let resp = test::call_service(&app, req).await; assert_eq!(resp.status(), case.expected_status); @@ -440,7 +440,7 @@ mod tests { let user_service: Arc = Arc::new(user_service); - let mut app = test::init_service( + let app = test::init_service( App::new() .app_data(web::Data::from(user_service.clone())) .service(delete_user), @@ -450,7 +450,7 @@ mod tests { let req = test::TestRequest::delete() .uri(format!("/users/{}", case.id).as_str()) .to_request(); - let resp = test::call_service(&mut app, req).await; + let resp = test::call_service(&app, req).await; assert_eq!(resp.status(), case.expected_status); diff --git a/users/src/services/event_service.rs b/users/src/services/event_service.rs index ff23034..245f8d3 100644 --- a/users/src/services/event_service.rs +++ b/users/src/services/event_service.rs @@ -26,11 +26,8 @@ impl EventService for KafkaEventService { async fn user_created(&self, user: User) -> Result<(), BrokerError> { let json_string = serde_json::to_string(&user).unwrap(); - let delivery_status = broker::send_message_to_topic( - self.producer.clone(), - String::from("user_created"), - json_string, - ); + let delivery_status = + broker::send_message_to_topic(self.producer.clone(), "user_created", json_string); match delivery_status.await { Err((err, _)) => { diff --git a/users/src/services/user_service.rs b/users/src/services/user_service.rs index 1d84435..6ba6701 100644 --- a/users/src/services/user_service.rs +++ b/users/src/services/user_service.rs @@ -397,7 +397,7 @@ mod tests { let service_result = case.service_result.clone(); repo_mock .expect_update() - .with(eq(case.id.clone()), eq(case.update.clone())) + .with(eq(case.id), eq(case.update.clone())) .returning(move |_, _| service_result.clone()); // Create the UserServiceImpl with the mocks @@ -436,7 +436,7 @@ mod tests { let service_result = case.service_result.clone(); repo_mock .expect_delete() - .with(eq(case.id.clone())) + .with(eq(case.id)) .returning(move |_| service_result.clone()); // Create the UserServiceImpl with the mocks diff --git a/users/tests/handlers/user_test.rs b/users/tests/handlers/user_test.rs index 892ad31..760e2eb 100644 --- a/users/tests/handlers/user_test.rs +++ b/users/tests/handlers/user_test.rs @@ -24,7 +24,7 @@ async fn user_create() { let app = setup_app(&config, ws_server.clone()); - let mut app_server = test::init_service(app).await; + let app_server = test::init_service(app).await; let req = test::TestRequest::post() .uri("/users") @@ -37,7 +37,7 @@ async fn user_create() { .unwrap(), ) .to_request(); - let resp = test::call_service(&mut app_server, req).await; + let resp = test::call_service(&app_server, req).await; assert_eq!(resp.status(), StatusCode::OK); @@ -61,7 +61,7 @@ async fn user_create() { .unwrap(), ) .to_request(); - let resp = test::call_service(&mut app_server, req).await; + let resp = test::call_service(&app_server, req).await; assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); let body = test::read_body(resp).await; @@ -69,7 +69,7 @@ async fn user_create() { // get list of users let req = test::TestRequest::get().uri("/users").to_request(); - let resp = test::call_service(&mut app_server, req).await; + let resp = test::call_service(&app_server, req).await; assert_eq!(resp.status(), StatusCode::OK); let body = test::read_body(resp).await; @@ -80,7 +80,7 @@ async fn user_create() { let req = test::TestRequest::delete() .uri(format!("/users/{}", user.id).as_str()) .to_request(); - let resp = test::call_service(&mut app_server, req).await; + let resp = test::call_service(&app_server, req).await; assert_eq!(resp.status(), StatusCode::OK); let body = test::read_body(resp).await; @@ -88,7 +88,7 @@ async fn user_create() { // get empty list of users let req = test::TestRequest::get().uri("/users").to_request(); - let resp = test::call_service(&mut app_server, req).await; + let resp = test::call_service(&app_server, req).await; assert_eq!(resp.status(), StatusCode::OK); let body = test::read_body(resp).await; diff --git a/users/tests/repositories/user_repository_test.rs b/users/tests/repositories/user_repository_test.rs index 60515c0..eaa72eb 100644 --- a/users/tests/repositories/user_repository_test.rs +++ b/users/tests/repositories/user_repository_test.rs @@ -43,7 +43,7 @@ async fn user_repo() { // user_id does not exist let result = user_repo.get_by_id(Uuid::new_v4()).await; - assert_eq!(result.is_err(), true); + assert!(result.is_err()); if let Err(err) = result { assert_eq!(err.message, String::from("NotFound")); } diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 2a68283..d8a1873 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -11,6 +11,7 @@ actix-cors = "0.6.4" actix-http = "3.4.0" actix-web = "4.4.0" actix-web-actors = "4.2.0" +async-trait = "0.1.73" chrono = "0.4.29" diesel = { version = "2.1.1", features = [ "postgres", @@ -18,15 +19,19 @@ diesel = { version = "2.1.1", features = [ "uuid", ], optional = true } env_logger = "0.10.0" +feed-rs = { version = "1.3.0", optional = true } jsonwebtoken = "8.3.0" log = "0.4.20" +mockall = "0.11.4" rand = "0.8.5" rdkafka = { version = "0.34.0", optional = true } serde = "1.0.188" serde_json = "1.0.107" +tokio = "1.32.0" uuid = { version = "1.4.1", features = ["v4", "serde"] } [features] default = [] database = ["dep:diesel"] broker = ["dep:rdkafka"] +news = ["dep:feed-rs"] diff --git a/utils/src/broker.rs b/utils/src/broker.rs index 8d81096..a3973b4 100644 --- a/utils/src/broker.rs +++ b/utils/src/broker.rs @@ -23,14 +23,14 @@ pub fn create_producer(kafka_url: String) -> KafkaProducer { pub async fn send_message_to_topic( producer: KafkaProducer, - topic: String, + topic: &str, message: String, ) -> Result<(i32, i64), (KafkaError, OwnedMessage)> { producer .send( - FutureRecord::to(topic.as_str()) + FutureRecord::to(topic) .payload(&message) - .key(topic.as_str()) + .key(topic) .headers(OwnedHeaders::new()), Duration::from_secs(120), ) diff --git a/utils/src/error.rs b/utils/src/error.rs index 5ba3e78..20c94fa 100644 --- a/utils/src/error.rs +++ b/utils/src/error.rs @@ -1,9 +1,12 @@ +use actix::MailboxError; use serde::Serialize; pub const DATABASE_ERROR_CODE: u32 = 1; pub const BROKER_ERROR_CODE: u32 = 2; pub const HTTP_ERROR_CODE: u32 = 3; pub const AUTH_TOKEN_ENCODING_CODE: u32 = 4; +pub const WS_ERROR_CODE: u32 = 5; +pub const SERIALIZATION_ERROR_CODE: u32 = 6; #[derive(Debug, Serialize, Clone, PartialEq)] pub struct CommonError { @@ -11,6 +14,15 @@ pub struct CommonError { pub code: u32, } +impl CommonError { + pub fn new(message: &str) -> Self { + CommonError { + message: message.to_string(), + code: 0, + } + } +} + impl std::fmt::Display for CommonError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Error: {}, Code: {}", self.message, self.code) @@ -22,6 +34,14 @@ pub struct DatabaseError { pub message: String, } +impl DatabaseError { + pub fn new(message: &str) -> Self { + DatabaseError { + message: message.to_string(), + } + } +} + impl From for CommonError { fn from(val: DatabaseError) -> Self { Self { @@ -36,6 +56,14 @@ pub struct BrokerError { pub message: String, } +impl BrokerError { + pub fn new(message: &str) -> Self { + BrokerError { + message: message.to_string(), + } + } +} + impl From for CommonError { fn from(val: BrokerError) -> Self { CommonError { @@ -50,6 +78,14 @@ pub struct HttpError { pub message: String, } +impl HttpError { + pub fn new(message: &str) -> Self { + HttpError { + message: message.to_string(), + } + } +} + impl From for CommonError { fn from(val: HttpError) -> Self { CommonError { @@ -58,3 +94,34 @@ impl From for CommonError { } } } + +impl From for CommonError { + fn from(val: MailboxError) -> Self { + CommonError { + message: val.to_string(), + code: WS_ERROR_CODE, + } + } +} + +#[derive(Debug, Clone)] +pub struct SerializationError { + pub message: String, +} + +impl SerializationError { + pub fn new(message: &str) -> Self { + SerializationError { + message: message.to_string(), + } + } +} + +impl From for CommonError { + fn from(val: SerializationError) -> Self { + CommonError { + message: val.message, + code: SERIALIZATION_ERROR_CODE, + } + } +} diff --git a/utils/src/http/websockets/mod.rs b/utils/src/http/websockets/mod.rs index d932d5d..134e168 100644 --- a/utils/src/http/websockets/mod.rs +++ b/utils/src/http/websockets/mod.rs @@ -1,3 +1,4 @@ pub mod ws_handler; +pub mod ws_sender; pub mod ws_server; pub mod ws_session; diff --git a/utils/src/http/websockets/ws_sender.rs b/utils/src/http/websockets/ws_sender.rs new file mode 100644 index 0000000..e5989ca --- /dev/null +++ b/utils/src/http/websockets/ws_sender.rs @@ -0,0 +1,28 @@ +use actix::{Addr, MailboxError}; +use async_trait::async_trait; +use mockall::automock; + +use super::ws_server::{SessionMessage, WebsocketServer}; + +#[automock] +#[async_trait] +pub trait WebsocketServerSender: Sync + Send { + async fn do_send(&self, m: SessionMessage) -> Result<(), MailboxError>; +} + +pub struct WsSenderWrapper { + websocket_server: Addr, +} + +impl WsSenderWrapper { + pub fn new(websocket_server: Addr) -> Self { + WsSenderWrapper { websocket_server } + } +} + +#[async_trait] +impl WebsocketServerSender for WsSenderWrapper { + async fn do_send(&self, message: SessionMessage) -> Result<(), MailboxError> { + self.websocket_server.send(message).await + } +} diff --git a/utils/src/lib.rs b/utils/src/lib.rs index ef27b56..e05954c 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -7,3 +7,9 @@ pub mod error; pub mod http; pub mod logger; pub mod serializer; + +#[cfg(feature = "news")] +pub mod news; + +#[cfg(feature = "broker")] +pub mod pipeline; diff --git a/utils/src/news/events.rs b/utils/src/news/events.rs new file mode 100644 index 0000000..300805c --- /dev/null +++ b/utils/src/news/events.rs @@ -0,0 +1 @@ +pub const NEWS_CREATED_EVENT: &str = "news_created"; diff --git a/utils/src/news/mod.rs b/utils/src/news/mod.rs new file mode 100644 index 0000000..1f4ffbc --- /dev/null +++ b/utils/src/news/mod.rs @@ -0,0 +1,5 @@ +pub mod events; +pub mod models; +pub mod repositories; +pub mod schema; +pub mod services; diff --git a/news/src/models/feed.rs b/utils/src/news/models/feed.rs similarity index 90% rename from news/src/models/feed.rs rename to utils/src/news/models/feed.rs index e40074d..b99d84b 100644 --- a/news/src/models/feed.rs +++ b/utils/src/news/models/feed.rs @@ -1,4 +1,4 @@ -use crate::schema::feeds; +use crate::news::schema::feeds; use diesel::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/news/src/models/mod.rs b/utils/src/news/models/mod.rs similarity index 100% rename from news/src/models/mod.rs rename to utils/src/news/models/mod.rs diff --git a/utils/src/news/models/news.rs b/utils/src/news/models/news.rs new file mode 100644 index 0000000..08907c5 --- /dev/null +++ b/utils/src/news/models/news.rs @@ -0,0 +1,66 @@ +use crate::news::models::feed::Feed; +use crate::news::schema::news; +use crate::serializer::serde_naive_date; +use chrono::NaiveDate; +use diesel::prelude::*; +use feed_rs::model::Entry; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive( + Debug, + Clone, + Serialize, + Deserialize, + Selectable, + Queryable, + Insertable, + PartialEq, + Identifiable, + Associations, +)] +#[diesel(belongs_to(Feed))] +#[diesel(table_name = news)] +pub struct News { + pub id: uuid::Uuid, + pub author: String, + pub url: String, + pub title: String, + #[serde(with = "serde_naive_date")] + pub publish_date: Option, + pub feed_id: uuid::Uuid, +} + +impl From for News { + fn from(feed_news: Entry) -> Self { + let mut author = "".to_string(); + let mut url = "".to_string(); + let mut title = "".to_string(); + let mut publish_date = chrono::Utc::now().naive_local().date(); + + if !feed_news.authors.is_empty() { + author = feed_news.authors[0].name.clone(); + } + + if let Some(source) = feed_news.source { + url = source; + } + + if let Some(news_title) = feed_news.title { + title = news_title.content.to_string(); + } + + if let Some(date) = feed_news.published { + publish_date = date.naive_local().date(); + } + + News { + id: Uuid::new_v4(), + author, + url, + title, + feed_id: Uuid::new_v4(), + publish_date: Some(publish_date), + } + } +} diff --git a/news/src/models/subscription.rs b/utils/src/news/models/subscription.rs similarity index 80% rename from news/src/models/subscription.rs rename to utils/src/news/models/subscription.rs index 35b13f5..6345e68 100644 --- a/news/src/models/subscription.rs +++ b/utils/src/news/models/subscription.rs @@ -1,5 +1,5 @@ -use crate::models::feed::Feed; -use crate::schema::subscriptions; +use crate::news::models::feed::Feed; +use crate::news::schema::subscriptions; use diesel::prelude::*; use serde::{Deserialize, Serialize}; diff --git a/news/src/repositories/feed_repository.rs b/utils/src/news/repositories/feed_repository.rs similarity index 93% rename from news/src/repositories/feed_repository.rs rename to utils/src/news/repositories/feed_repository.rs index de745e9..f7fffe4 100644 --- a/news/src/repositories/feed_repository.rs +++ b/utils/src/news/repositories/feed_repository.rs @@ -1,13 +1,13 @@ use std::sync::Arc; +use crate::db::PgPool; +use crate::error::DatabaseError; use diesel::prelude::*; use mockall::automock; -use utils::db::PgPool; -use utils::error::DatabaseError; use uuid::Uuid; -use crate::models::feed::Feed; -use crate::schema::feeds; +use crate::news::models::feed::Feed; +use crate::news::schema::feeds; #[automock] pub trait FeedRepository: Send + Sync { diff --git a/news/src/repositories/mod.rs b/utils/src/news/repositories/mod.rs similarity index 100% rename from news/src/repositories/mod.rs rename to utils/src/news/repositories/mod.rs diff --git a/news/src/repositories/news_repository.rs b/utils/src/news/repositories/news_repository.rs similarity index 95% rename from news/src/repositories/news_repository.rs rename to utils/src/news/repositories/news_repository.rs index 45a22ea..16f6700 100644 --- a/news/src/repositories/news_repository.rs +++ b/utils/src/news/repositories/news_repository.rs @@ -1,13 +1,13 @@ use std::sync::Arc; +use crate::db::PgPool; +use crate::error::DatabaseError; use diesel::prelude::*; use mockall::automock; -use utils::db::PgPool; -use utils::error::DatabaseError; use uuid::Uuid; -use crate::models::news::News; -use crate::schema::{feeds, news, subscriptions}; +use crate::news::models::news::News; +use crate::news::schema::{feeds, news, subscriptions}; #[automock] pub trait NewsRepository: Send + Sync { diff --git a/news/src/repositories/subscription_repository.rs b/utils/src/news/repositories/subscription_repository.rs similarity index 95% rename from news/src/repositories/subscription_repository.rs rename to utils/src/news/repositories/subscription_repository.rs index 1544965..f4fa0a2 100644 --- a/news/src/repositories/subscription_repository.rs +++ b/utils/src/news/repositories/subscription_repository.rs @@ -1,13 +1,13 @@ use std::sync::Arc; +use crate::db::PgPool; +use crate::error::DatabaseError; use diesel::prelude::*; use mockall::automock; -use utils::db::PgPool; -use utils::error::DatabaseError; use uuid::Uuid; -use crate::models::subscription::Subscription; -use crate::schema::subscriptions; +use crate::news::models::subscription::Subscription; +use crate::news::schema::subscriptions; #[automock] pub trait SubscriptionRepository: Send + Sync { diff --git a/news/src/schema.rs b/utils/src/news/schema.rs similarity index 100% rename from news/src/schema.rs rename to utils/src/news/schema.rs diff --git a/utils/src/news/services/events_service.rs b/utils/src/news/services/events_service.rs new file mode 100644 index 0000000..6115e45 --- /dev/null +++ b/utils/src/news/services/events_service.rs @@ -0,0 +1,39 @@ +use crate::{broker, error::BrokerError, news::events::NEWS_CREATED_EVENT}; +use async_trait::async_trait; +use mockall::automock; +use rdkafka::producer::FutureProducer; + +use crate::news::models::news::News; + +#[automock] +#[async_trait] +pub trait EventService: Send + Sync { + async fn news_created(&self, news: &News) -> Result<(), BrokerError>; +} + +pub struct KafkaEventService { + producer: FutureProducer, +} + +impl KafkaEventService { + pub fn new(producer: FutureProducer) -> Self { + KafkaEventService { producer } + } +} + +#[async_trait] +impl EventService for KafkaEventService { + async fn news_created(&self, news: &News) -> Result<(), BrokerError> { + let json_string = serde_json::to_string(news).map_err(|err| BrokerError { + message: err.to_string(), + })?; + + broker::send_message_to_topic(self.producer.clone(), NEWS_CREATED_EVENT, json_string) + .await + .map_err(|err| BrokerError { + message: err.0.to_string(), + })?; + + Ok(()) + } +} diff --git a/utils/src/news/services/mod.rs b/utils/src/news/services/mod.rs new file mode 100644 index 0000000..d08d9fd --- /dev/null +++ b/utils/src/news/services/mod.rs @@ -0,0 +1,2 @@ +pub mod events_service; +pub mod news_service; diff --git a/utils/src/news/services/news_service.rs b/utils/src/news/services/news_service.rs new file mode 100644 index 0000000..495ba78 --- /dev/null +++ b/utils/src/news/services/news_service.rs @@ -0,0 +1,255 @@ +use crate::error::CommonError; +use async_trait::async_trait; +use log::{info, warn}; +use mockall::automock; +use std::sync::Arc; + +use crate::news::{ + models::{feed::Feed, news::News}, + repositories::{ + feed_repository::FeedRepository, news_repository::NewsRepository, + subscription_repository::SubscriptionRepository, + }, +}; + +use super::events_service::EventService; + +#[automock] +#[async_trait] +pub trait NewsService: Send + Sync { + async fn list_feeds(&self) -> Result, CommonError>; + async fn insert_news(&self, news: &News) -> Result; +} + +pub struct Service { + pub feed_repo: Arc, + pub news_repo: Arc, + pub subscriptions_repo: Arc, + pub events_service: Arc, +} + +impl Service { + pub fn new( + feed_repo: Arc, + news_repo: Arc, + subscriptions_repo: Arc, + events_service: Arc, + ) -> Self { + Service { + feed_repo, + news_repo, + subscriptions_repo, + events_service, + } + } +} + +#[async_trait] +impl NewsService for Service { + async fn insert_news(&self, news: &News) -> Result { + let db_news = self + .news_repo + .find_by_fields(Some(news.title.clone()), Some(news.feed_id))?; + + return match db_news { + None => { + let news = self.news_repo.create(news)?; + info!( + "News with title {} of feed {} inserted!", + news.title, news.feed_id + ); + self.events_service.news_created(&news).await?; + Ok(news) + } + Some(news) => { + warn!("news already exists: {:?}", news); + Ok(news) + } + }; + } + + async fn list_feeds(&self) -> Result, CommonError> { + self.feed_repo.list().map_err(|err| err.into()) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + error::DATABASE_ERROR_CODE, + news::{ + models::news::News, + repositories::{ + feed_repository::MockFeedRepository, news_repository::MockNewsRepository, + subscription_repository::MockSubscriptionRepository, + }, + services::events_service::MockEventService, + }, + }; + + use super::*; + use crate::error::DatabaseError; + use mockall::predicate::*; + + #[tokio::test] + async fn test_insert_news_success() { + // Arrange + let mut news_repo = MockNewsRepository::new(); + let mut events_service = MockEventService::new(); + let feeds_repo = MockFeedRepository::new(); + let subscriptions_repo = MockSubscriptionRepository::new(); + + let news = News { + title: "Test News".to_string(), + feed_id: uuid::Uuid::new_v4(), + id: uuid::Uuid::new_v4(), + author: "author 1".to_string(), + url: "".to_string(), + publish_date: None, + }; + + let inserted_news = news.clone(); + news_repo + .expect_find_by_fields() + .with(eq(Some(news.title.clone())), eq(Some(news.feed_id))) + .times(1) + .returning(move |_, _| Ok(None)); + + news_repo + .expect_create() + .with(eq(news.clone())) + .times(1) + .return_once(move |_| Ok(inserted_news.clone())); + + let inserted_news = news.clone(); + events_service + .expect_news_created() + .with(eq(inserted_news.clone())) + .times(1) + .returning(|_| Ok(())); + + let service = Service::new( + Arc::new(feeds_repo), + Arc::new(news_repo), + Arc::new(subscriptions_repo), + Arc::new(events_service), + ); + + // Act + let result = service.insert_news(&news).await; + + // Assert + assert!(result.is_ok()); + assert_eq!(result.unwrap().title, "Test News"); + } + + #[tokio::test] + async fn test_insert_news_already_exists() { + // Arrange + let mut news_repo = MockNewsRepository::new(); + let events_service = MockEventService::new(); + let feeds_repo = MockFeedRepository::new(); + let subscriptions_repo = MockSubscriptionRepository::new(); + + let news = News { + title: "Test News".to_string(), + feed_id: uuid::Uuid::new_v4(), + id: uuid::Uuid::new_v4(), + author: "author 1".to_string(), + url: "".to_string(), + publish_date: None, + }; + let news_cloned = news.clone(); + news_repo + .expect_find_by_fields() + .with(eq(Some(news.title.clone())), eq(Some(news.feed_id))) + .times(1) + .returning(move |_, _| Ok(Some(news_cloned.clone()))); + + let service = Service::new( + Arc::new(feeds_repo), + Arc::new(news_repo), + Arc::new(subscriptions_repo), + Arc::new(events_service), + ); + + // Act + let result = service.insert_news(&news).await; + + // Assert + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_list_feeds_success() { + // Arrange + let mut feed_repo = MockFeedRepository::new(); + let expected_feeds = vec![ + Feed { + id: uuid::Uuid::new_v4(), + title: "Feed 1".to_string(), + author: "author1".to_string(), + url: "".to_string(), + }, + Feed { + id: uuid::Uuid::new_v4(), + title: "Feed 2".to_string(), + author: "author1".to_string(), + url: "".to_string(), + }, + ]; + let expected_feeds_cloned = expected_feeds.clone(); + feed_repo + .expect_list() + .times(1) + .return_once(move || Ok(expected_feeds_cloned.clone())); + + let service = Service::new( + Arc::new(feed_repo), + Arc::new(MockNewsRepository::new()), + Arc::new(MockSubscriptionRepository::new()), + Arc::new(MockEventService::new()), + ); + + // Act + let result = service.list_feeds().await; + + // Assert + assert!(result.is_ok()); + assert_eq!(result.unwrap(), expected_feeds); + } + + #[tokio::test] + async fn test_list_feeds_error() { + // Arrange + let mut feed_repo = MockFeedRepository::new(); + let expected_error = DatabaseError { + message: "Failed to list feeds".to_string(), + }; + + feed_repo + .expect_list() + .times(1) + .return_once(move || Err(expected_error)); + + let service = Service::new( + Arc::new(feed_repo), + Arc::new(MockNewsRepository::new()), + Arc::new(MockSubscriptionRepository::new()), + Arc::new(MockEventService::new()), + ); + + // Act + let result = service.list_feeds().await; + + // Assert + assert!(result.is_err()); + assert_eq!( + result.unwrap_err(), + CommonError { + message: "Failed to list feeds".to_string(), + code: DATABASE_ERROR_CODE + } + ); + } +} diff --git a/utils/src/pipeline/consumer.rs b/utils/src/pipeline/consumer.rs new file mode 100644 index 0000000..16c449f --- /dev/null +++ b/utils/src/pipeline/consumer.rs @@ -0,0 +1,43 @@ +use async_trait::async_trait; +use rdkafka::consumer::StreamConsumer; +use rdkafka::Message; + +use crate::error::{BrokerError, CommonError}; + +#[async_trait] +pub trait Consumer: Send + Sync { + async fn consume(&self) -> Result; +} + +pub struct KafkaConsumer { + consumer: StreamConsumer, +} + +impl KafkaConsumer { + pub fn new(consumer: StreamConsumer) -> Self { + KafkaConsumer { consumer } + } +} + +#[async_trait] +impl Consumer for KafkaConsumer { + async fn consume(&self) -> Result { + return match self.consumer.recv().await { + Ok(message) => match message.payload_view::() { + Some(Ok(payload)) => Ok(payload.to_string()), + Some(Err(err)) => Err(BrokerError { + message: format!("Error deserializing message payload: {}", err), + } + .into()), + None => Err(BrokerError { + message: "Empty message payload".to_string(), + } + .into()), + }, + Err(_) => Err(BrokerError { + message: "Error deserializing message payload".to_string(), + } + .into()), + }; + } +} diff --git a/utils/src/pipeline/data_pipeline.rs b/utils/src/pipeline/data_pipeline.rs new file mode 100644 index 0000000..7c5fa32 --- /dev/null +++ b/utils/src/pipeline/data_pipeline.rs @@ -0,0 +1,33 @@ +use log::error; + +use super::consumer::Consumer; +use super::processor::Processor; + +pub struct DataPipeline<'a> { + consumer: &'a dyn Consumer, + processor: &'a dyn Processor, +} + +impl<'a> DataPipeline<'a> { + pub fn new(consumer: &'a dyn Consumer, processor: &'a dyn Processor) -> Self { + DataPipeline { + consumer, + processor, + } + } +} + +impl<'a> DataPipeline<'a> { + pub async fn start(&self) { + loop { + match self.consumer.consume().await { + Ok(message) => { + if let Err(err) = self.processor.process(&message).await { + error!("failed processing message {}: {}", message, err); + } + } + Err(err) => error!("failed consuming message: {}", err), + } + } + } +} diff --git a/utils/src/pipeline/mod.rs b/utils/src/pipeline/mod.rs new file mode 100644 index 0000000..ab4ad2c --- /dev/null +++ b/utils/src/pipeline/mod.rs @@ -0,0 +1,3 @@ +pub mod consumer; +pub mod data_pipeline; +pub mod processor; diff --git a/utils/src/pipeline/processor.rs b/utils/src/pipeline/processor.rs new file mode 100644 index 0000000..fda8808 --- /dev/null +++ b/utils/src/pipeline/processor.rs @@ -0,0 +1,8 @@ +use async_trait::async_trait; + +use crate::error::CommonError; + +#[async_trait] +pub trait Processor: Send + Sync { + async fn process(&self, message: &str) -> Result<(), CommonError>; +}