diff --git a/news-scrapper/src/http_fetcher.rs b/news-scrapper/src/http_fetcher.rs new file mode 100644 index 0000000..35721c0 --- /dev/null +++ b/news-scrapper/src/http_fetcher.rs @@ -0,0 +1,107 @@ +use async_trait::async_trait; +use bytes::Bytes; +use reqwest::get; +use utils::error::{CommonError, HttpError}; + +use crate::scrapper::RssFetcher; + +pub struct HttpFetcher {} + +impl Default for HttpFetcher { + fn default() -> Self { + Self::new() + } +} + +impl HttpFetcher { + pub fn new() -> Self { + HttpFetcher {} + } +} + +#[async_trait] +impl RssFetcher for HttpFetcher { + async fn fetch(&self, fetch_url: String) -> Result { + http_request(fetch_url).await.map_err(|e| e.into()) + } +} + +pub async fn http_request(url: String) -> Result { + // Send an HTTP GET request to a URL + let response = get(url).await.map_err(|v| HttpError { + message: format!("failed to send request: {}", v), + })?; + + // Check if the request was successful + if response.status().is_success() { + // Read the response body as a string + let body = response.bytes().await.map_err(|v| HttpError { + message: format!("failed to read response body: {}", v), + })?; + + return Ok(body); + } + + Err(HttpError { + message: format!("Request was not successful: {}", response.status().as_str()), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_http_request_success() { + let mut server = mockito::Server::new(); + + // Arrange + let expected_body = "Hello, World!"; + let _m = server + .mock("GET", "/") + .with_body(expected_body) + .with_status(200) + .create(); + + // Act + let result = http_request(server.url().to_string()).await; + + // Assert + assert!(result.is_ok()); + assert_eq!(result.unwrap(), expected_body.as_bytes().to_vec()); + } + + #[tokio::test] + async fn test_http_request_failure() { + let mut server = mockito::Server::new(); + + // Arrange + let _m = server.mock("GET", "/").with_status(500).create(); + + // Act + let result = http_request(server.url().to_string()).await; + + // Assert + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().message, + format!("Request was not successful: {}", 500) + ); + } + + #[tokio::test] + async fn test_http_request_error() { + // Arrange + let url = "invalid url"; + + // Act + let result = http_request(url.to_string()).await; + + // Assert + assert!(result.is_err()); + assert!(result + .unwrap_err() + .message + .contains("failed to send request:")); + } +} diff --git a/news-scrapper/src/lib.rs b/news-scrapper/src/lib.rs new file mode 100644 index 0000000..6f3429e --- /dev/null +++ b/news-scrapper/src/lib.rs @@ -0,0 +1,4 @@ +pub mod config; +pub mod http_fetcher; +pub mod news_ingestor; +pub mod scrapper; diff --git a/news-scrapper/src/main.rs b/news-scrapper/src/main.rs index 54ff408..3175ec9 100644 --- a/news-scrapper/src/main.rs +++ b/news-scrapper/src/main.rs @@ -1,6 +1,9 @@ -mod config; -mod news_ingestor; -mod scrapper; +use news_scrapper::{ + config::Config, + http_fetcher::HttpFetcher, + news_ingestor::NewsIngestor, + scrapper::{RssFetcher, RssScrapper}, +}; use std::{error::Error, sync::Arc, thread}; use tokio_cron_scheduler::{Job, JobScheduler}; use utils::{ @@ -22,7 +25,7 @@ use utils::{ #[tokio::main] async fn main() { - let config = config::Config::init(); + let config = Config::init(); init_logger(config.logs_path.clone()); @@ -46,9 +49,11 @@ async fn main() { events_service.clone(), )); - let feeds_scrapper = Arc::new(scrapper::RssScrapper::default()); + let rss_fetcher: Arc = Arc::new(HttpFetcher::default()); + + let feeds_scrapper = Arc::new(RssScrapper::new(rss_fetcher.clone())); - let ingestor = news_ingestor::NewsIngestor::new(service, feeds_scrapper); + let ingestor = NewsIngestor::new(service, feeds_scrapper); if let Err(err) = setup_cronjobs(&ingestor).await { panic!("failed setup cronjobs: {}", err); @@ -57,7 +62,7 @@ async fn main() { thread::park(); } -async fn setup_cronjobs(ingestor: &news_ingestor::NewsIngestor) -> Result<(), Box> { +async fn setup_cronjobs(ingestor: &NewsIngestor) -> Result<(), Box> { let ingestor = ingestor.clone(); let sched = JobScheduler::new().await?; diff --git a/news-scrapper/src/scrapper.rs b/news-scrapper/src/scrapper.rs index e6c9cd7..3be7232 100644 --- a/news-scrapper/src/scrapper.rs +++ b/news-scrapper/src/scrapper.rs @@ -1,5 +1,8 @@ +use std::sync::Arc; + use async_trait::async_trait; -use bytes::{Buf, Bytes}; +use bytes::Buf; +use bytes::Bytes; use feed_rs::model::Feed; use feed_rs::parser; use futures::stream::FuturesUnordered; @@ -7,38 +10,54 @@ use futures::stream::StreamExt; use log::debug; use log::error; use mockall::automock; -use reqwest::get; use tokio::sync::mpsc::Sender; use tokio::sync::Semaphore; use utils::error::CommonError; -use utils::error::HttpError; +use utils::error::ASYNC_OPERATIONS_ERROR_CODE; +use utils::error::SERIALIZATION_ERROR_CODE; use utils::news::models::feed::Feed as RssFeed; use utils::news::models::news::News; #[automock] #[async_trait] pub trait FeedsScrapper: Send + Sync { - async fn scrap_all(&self, feeds: Vec, tx: Sender>) -> Result<(), String>; + async fn scrap_all( + &self, + feeds: Vec, + tx: Sender>, + ) -> Result<(), CommonError>; +} + +#[automock] +#[async_trait] +pub trait RssFetcher: Send + Sync { + async fn fetch(&self, fetch_url: String) -> Result; } #[derive(Clone)] -pub struct RssScrapper {} +pub struct RssScrapper { + fetcher: Arc, +} #[async_trait] impl FeedsScrapper for RssScrapper { - async fn scrap_all(&self, feeds: Vec, tx: Sender>) -> Result<(), String> { + async fn scrap_all( + &self, + feeds: Vec, + tx: Sender>, + ) -> Result<(), CommonError> { let max_concurrency = 5; // Define the maximum concurrency limit let semaphore = Semaphore::new(max_concurrency); let mut tasks = FuturesUnordered::new(); for current_feed in &feeds { - let permit = semaphore - .acquire() - .await - .expect("Semaphore acquisition failed"); - let task = Self::scrap_with_retry(current_feed.clone(), permit); + let permit = semaphore.acquire().await.map_err(|e| CommonError { + message: format!("Semaphore acquisition failed: {}", e), + code: ASYNC_OPERATIONS_ERROR_CODE, + })?; + let task = self.scrap_with_retry(current_feed.clone(), permit); tasks.push(task); } @@ -46,14 +65,16 @@ impl FeedsScrapper for RssScrapper { match result { Ok((rss_feed, feed)) => { debug!("Got new entries for feed {}", rss_feed.title); - let mut news = Vec::new(); + let news = feed + .entries + .into_iter() + .map(|feed_news| { + let mut news_entry: News = feed_news.into(); + news_entry.feed_id = rss_feed.id; - for feed_news in feed.entries { - let mut news_entry: News = feed_news.into(); - news_entry.feed_id = rss_feed.id; - - news.push(news_entry) - } + news_entry + }) + .collect(); if let Err(err) = tx.send(news).await { error!( @@ -72,25 +93,20 @@ impl FeedsScrapper for RssScrapper { } } -impl Default for RssScrapper { - fn default() -> Self { - Self::new() - } -} - impl RssScrapper { - pub fn new() -> Self { - RssScrapper {} + pub fn new(fetcher: Arc) -> Self { + RssScrapper { fetcher } } async fn scrap_with_retry( + &self, rss_feed: RssFeed, permit: tokio::sync::SemaphorePermit<'_>, ) -> Result<(RssFeed, Feed), (RssFeed, CommonError)> { let mut retry_count = 0; loop { - let result = RssScrapper::scrap(rss_feed.url.to_string()).await; + let result = self.scrap(rss_feed.url.to_string()).await; if let Ok(feed) = result { drop(permit); // Release the semaphore permit @@ -106,92 +122,312 @@ impl RssScrapper { } } - async fn scrap(feed_url: String) -> Result { - let xml = http_request(feed_url.clone()).await?; + async fn scrap(&self, feed_url: String) -> Result { + let xml = self.fetcher.fetch(feed_url).await?; parser::parse(xml.reader()).map_err(|err| CommonError { message: err.to_string(), - code: 3, + code: SERIALIZATION_ERROR_CODE, }) } } -async fn http_request(url: String) -> Result { - // Send an HTTP GET request to a URL - let response = get(url).await.map_err(|v| HttpError { - message: format!("failed to send request: {}", v), - })?; - - // Check if the request was successful - if response.status().is_success() { - // Read the response body as a string - let body = response.bytes().await.map_err(|v| HttpError { - message: format!("failed to read response body: {}", v), - })?; - - return Ok(body); - } - - Err(HttpError { - message: format!("Request was not successful: {}", response.status().as_str()), - }) -} - #[cfg(test)] mod tests { use super::*; + use tokio::sync::mpsc; + use tokio::sync::Semaphore; + use tokio::task; + use utils::news::models::feed::Feed as RssFeed; + use uuid::Uuid; #[tokio::test] - async fn test_http_request_success() { - let mut server = mockito::Server::new(); + async fn test_scrap_success() { + let feed_url = "https://example.com/feed.xml"; + + // Create an instance of the Fetcher mock + let mut fetcher = MockRssFetcher::new(); + + fetcher.expect_fetch().returning(|_| Ok(r#"Crime Junkie"#.into())); + + let fetcher_wrapped = Arc::new(fetcher); - // Arrange - let expected_body = "Hello, World!"; - let _m = server - .mock("GET", "/") - .with_body(expected_body) - .with_status(200) - .create(); + // Create an instance of RssScrapper and set the fetcher + let scrapper = RssScrapper { + fetcher: fetcher_wrapped.clone(), + }; - // Act - let result = http_request(server.url().to_string()).await; + // Call the scrap method with the feed_url + let result = scrapper.scrap(feed_url.to_string()).await; - // Assert + // Assert that the result is Ok assert!(result.is_ok()); - assert_eq!(result.unwrap(), expected_body.as_bytes().to_vec()); + + // Assert that the parser result is equal to the feed in the scrap result + assert_eq!( + result.unwrap().title.unwrap().content, + "Crime Junkie".to_string() + ); } #[tokio::test] - async fn test_http_request_failure() { - let mut server = mockito::Server::new(); + async fn test_scrap_error() { + let feed_url = "https://example.com/feed.xml"; + + // Create an instance of the Fetcher mock + let mut fetcher = MockRssFetcher::new(); + + fetcher + .expect_fetch() + .returning(|_| Ok(r#"randomresponse"#.into())); + + let fetcher = Arc::new(fetcher); - // Arrange - let _m = server.mock("GET", "/").with_status(500).create(); + // Create an instance of RssScrapper and set the fetcher + let scrapper = RssScrapper::new(fetcher); - // Act - let result = http_request(server.url().to_string()).await; + // Call the scrap method with the feed_url + let result = scrapper.scrap(feed_url.to_string()).await; - // Assert + // Assert that the result is Ok assert!(result.is_err()); + + let err = result.err().unwrap(); + + // Assert that the parser result is equal to the feed in the scrap result assert_eq!( - result.unwrap_err().message, - format!("Request was not successful: {}", 500) + err.message, + "unable to parse feed: no root element".to_string() ); + assert_eq!(err.code, SERIALIZATION_ERROR_CODE); + } + + #[tokio::test] + async fn test_scrap_retry_success_after_first_fail() { + // Create an instance of the Fetcher mock + let mut fetcher = MockRssFetcher::new(); + + fetcher + .expect_fetch() + .returning(|_| { + Err(CommonError { + message: "timeout".to_string(), + code: 1, + }) + }) + .once(); + fetcher.expect_fetch().returning(|_| Ok(r#"Crime Junkie"#.into())); + + let fetcher = Arc::new(fetcher); + + // Create an instance of RssScrapper and set the fetcher + let scrapper = RssScrapper::new(fetcher); + + // Call the scrap method with the feed_url + let semaphore = Semaphore::new(1); + let permit = semaphore.acquire().await.unwrap(); + let rss_feed = RssFeed { + id: Uuid::new_v4(), + url: "https://example.com/rss".to_string(), + author: "".to_string(), + title: "".to_string(), + }; + let result = scrapper.scrap_with_retry(rss_feed, permit).await; + + // Assert that the result is Ok + assert!(result.is_ok()); + + let rss_feed = result.clone().unwrap().0.clone(); + let xml_feed = result.unwrap().1.clone(); + assert_eq!(rss_feed.url, "https://example.com/rss".to_string()); + assert_eq!(xml_feed.title.unwrap().content, "Crime Junkie".to_string()); } #[tokio::test] - async fn test_http_request_error() { - // Arrange - let url = "invalid url"; + async fn test_scrap_retry_success() { + // Create an instance of the Fetcher mock + let mut fetcher = MockRssFetcher::new(); + + fetcher.expect_fetch().returning(|_| Ok(r#"Crime Junkie"#.into())); + + let fetcher = Arc::new(fetcher); + + // Create an instance of RssScrapper and set the fetcher + let scrapper = RssScrapper::new(fetcher); + + // Call the scrap method with the feed_url + let semaphore = Semaphore::new(1); + let permit = semaphore.acquire().await.unwrap(); + let rss_feed = RssFeed { + id: Uuid::new_v4(), + url: "https://example.com/rss".to_string(), + author: "".to_string(), + title: "".to_string(), + }; + let result = scrapper.scrap_with_retry(rss_feed, permit).await; + + // Assert that the result is Ok + assert!(result.is_ok()); - // Act - let result = http_request(url.to_string()).await; + let rss_feed = result.clone().unwrap().0.clone(); + let xml_feed = result.unwrap().1.clone(); + assert_eq!(rss_feed.url, "https://example.com/rss".to_string()); + assert_eq!(xml_feed.title.unwrap().content, "Crime Junkie".to_string()); + } - // Assert + #[tokio::test] + async fn test_scrap_retry_error() { + // Create an instance of the Fetcher mock + let mut fetcher = MockRssFetcher::new(); + + fetcher.expect_fetch().returning(|_| { + Err(CommonError { + message: "timeout".to_string(), + code: 1, + }) + }); + + let fetcher = Arc::new(fetcher); + + // Create an instance of RssScrapper and set the fetcher + let scrapper = RssScrapper::new(fetcher); + + // Call the scrap method with the feed_url + let semaphore = Semaphore::new(1); + let permit = semaphore.acquire().await.unwrap(); + let rss_feed = RssFeed { + id: Uuid::new_v4(), + url: "https://example.com/rss".to_string(), + author: "".to_string(), + title: "".to_string(), + }; + let result = scrapper.scrap_with_retry(rss_feed, permit).await; + + // Assert that the result is Ok assert!(result.is_err()); - assert!(result - .unwrap_err() - .message - .contains("failed to send request:")); + + assert_eq!(result.err().unwrap().1.message, "timeout".to_string()); + } + + #[tokio::test] + async fn test_scrap_all() { + let feeds = vec![ + RssFeed { + id: Uuid::new_v4(), + url: "https://example.com/rss1".to_string(), + author: "".to_string(), + title: "".to_string(), + }, + RssFeed { + id: Uuid::new_v4(), + url: "https://example.com/rss2".to_string(), + author: "".to_string(), + title: "".to_string(), + }, + ]; + + let (tx, mut rx) = mpsc::channel::>(10); + + // Create an instance of the Fetcher mock + let mut fetcher = MockRssFetcher::new(); + + fetcher.expect_fetch().returning(|_| Ok(r#" + + Crime Junkie + + MURDERED: Deanna Cook + + + MYSTERIOUS DEATH OF: Morgan Patten + + + "#.into())).once(); + fetcher.expect_fetch().returning(|_| Ok(r#" + + Crime Junkie + + MURDERED: Deanna Cook + + + "#.into())); + + let fetcher = Arc::new(fetcher); + + // Create an instance of RssScrapper and set the fetcher + let scrapper = RssScrapper::new(fetcher); + + // Call the function under test + task::spawn(async move { + let result = scrapper.scrap_all(feeds, tx).await; + assert!(result.is_ok()); + }); + + // Assert the expected number of news received + let mut total_news = 0; + while let Some(news) = rx.recv().await { + total_news += news.len(); + } + + assert_eq!(total_news, 3 /*expected number of news received*/); + } + + #[tokio::test] + async fn test_scrap_all_one_feed_not_synced() { + let feeds = vec![ + RssFeed { + id: Uuid::new_v4(), + url: "https://example.com/rss1".to_string(), + author: "".to_string(), + title: "".to_string(), + }, + RssFeed { + id: Uuid::new_v4(), + url: "https://example.com/rss2".to_string(), + author: "".to_string(), + title: "".to_string(), + }, + ]; + + let (tx, mut rx) = mpsc::channel::>(10); + + // Create an instance of the Fetcher mock + let mut fetcher = MockRssFetcher::new(); + + fetcher.expect_fetch().returning(|_| Ok(r#" + + Crime Junkie + + MURDERED: Deanna Cook + + + MYSTERIOUS DEATH OF: Morgan Patten + + + "#.into())).once(); + fetcher.expect_fetch().returning(|_| { + Err(CommonError { + message: "timeout".to_string(), + code: 0, + }) + }); + + let fetcher = Arc::new(fetcher); + + // Create an instance of RssScrapper and set the fetcher + let scrapper = RssScrapper::new(fetcher); + + // Call the function under test + task::spawn(async move { + let result = scrapper.scrap_all(feeds, tx).await; + assert!(result.is_ok()); + }); + + // Assert the expected number of news received + let mut total_news = 0; + while let Some(news) = rx.recv().await { + total_news += news.len(); + } + + assert_eq!(total_news, 2 /*expected number of news received*/); } } diff --git a/utils/src/error.rs b/utils/src/error.rs index 20c94fa..552a046 100644 --- a/utils/src/error.rs +++ b/utils/src/error.rs @@ -7,6 +7,7 @@ pub const HTTP_ERROR_CODE: u32 = 3; pub const AUTH_TOKEN_ENCODING_CODE: u32 = 4; pub const WS_ERROR_CODE: u32 = 5; pub const SERIALIZATION_ERROR_CODE: u32 = 6; +pub const ASYNC_OPERATIONS_ERROR_CODE: u32 = 7; #[derive(Debug, Serialize, Clone, PartialEq)] pub struct CommonError {