From 188ba6447dda77f9d6bf80757116d39de5191764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A1bio=20Ferreira?= Date: Sun, 24 Sep 2023 20:00:50 +0100 Subject: [PATCH] feat: broadcast news through websockets when created --- Cargo.lock | 3 + docker-compose.yaml | 1 + frontend/src/App.tsx | 2 +- .../src/components/WebsocketComponent.tsx | 43 ++++++++++-- frontend/src/services/api.ts | 2 +- frontend/src/setupProxy.js | 2 +- news/Cargo.toml | 3 + news/src/app.rs | 36 +++++++--- news/src/config.rs | 3 + news/src/lib.rs | 1 + news/src/main.rs | 29 ++++++++- news/src/news_created_subscriber.rs | 65 +++++++++++++++++++ .../repositories/subscription_repository.rs | 12 ++++ utils/src/broker.rs | 6 +- 14 files changed, 186 insertions(+), 22 deletions(-) create mode 100644 news/src/news_created_subscriber.rs diff --git a/Cargo.lock b/Cargo.lock index eb1de78..650d7cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1452,8 +1452,10 @@ dependencies = [ name = "news" version = "0.1.0" dependencies = [ + "actix", "actix-rt", "actix-web", + "actix-web-actors", "async-trait", "bytes", "chrono", @@ -1463,6 +1465,7 @@ dependencies = [ "log", "mockall", "mockito", + "rdkafka", "reqwest", "serde", "serde_json", diff --git a/docker-compose.yaml b/docker-compose.yaml index 118e87c..7db1f46 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -111,6 +111,7 @@ services: CORS_ORIGIN: http://localhost:3000 PORT: 8001 JWT_SECRET: users-secret-1234 + KAFKA_URL: kafka:29092 depends_on: postgres: condition: service_healthy diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 5a10cf0..36b885d 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -73,8 +73,8 @@ function App() {
- +
)} diff --git a/frontend/src/components/WebsocketComponent.tsx b/frontend/src/components/WebsocketComponent.tsx index 1f39b51..f347b7c 100644 --- a/frontend/src/components/WebsocketComponent.tsx +++ b/frontend/src/components/WebsocketComponent.tsx @@ -1,19 +1,50 @@ -import React, { useEffect } from 'react'; -import api from '../services/api'; +import React, { useCallback, useEffect, useState } from 'react'; +import api, { News } from '../services/api'; interface Props {} const WebSocketComponent = (_: Props) => { - useEffect(() => { - const connDestruct = api.connectWs((event) => { + const [lastNews, setLastNews] = useState([]); + + const addLastNews = useCallback( + (event: MessageEvent) => { + try { + const news = JSON.parse(event.data); + + setLastNews([news, ...lastNews]); + } catch (err) { + console.log(err); + } console.log('WebSocket message received:', event.data); - }); + }, + [lastNews] + ); + + useEffect(() => { + const connDestruct = api.connectWs(addLastNews); // Clean up the WebSocket connection when the component unmounts return connDestruct; }, []); - return ; + return ( + <> + {lastNews.length !== 0 ? ( + <> +

Last News

+ {lastNews.map((news) => { + return ( +

+ {news.title} ({news.author}) {news.publish_date} +

+ ); + })} + + ) : ( + <> + )} + + ); }; export default WebSocketComponent; diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index d8505b3..a437493 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -124,7 +124,7 @@ class API { } connectWs(onMessage: (event: MessageEvent) => void) { - const socket = new WebSocket('ws://localhost:8000/connect-ws'); + const socket = new WebSocket('ws://localhost:8001/connect-ws'); // const socket = new WebSocket(`ws://${window.location.host}/connect-ws`); let interval: string | number | NodeJS.Timer | undefined; diff --git a/frontend/src/setupProxy.js b/frontend/src/setupProxy.js index babd4b2..5119874 100644 --- a/frontend/src/setupProxy.js +++ b/frontend/src/setupProxy.js @@ -56,7 +56,7 @@ module.exports = function (app) { '/connect-ws', createProxyMiddleware({ ws: true, - target: 'http://users:8000', + target: 'http://news:8001', changeOrigin: true, }) ); diff --git a/news/Cargo.toml b/news/Cargo.toml index 4fba57c..3cf1b22 100644 --- a/news/Cargo.toml +++ b/news/Cargo.toml @@ -30,3 +30,6 @@ async-trait = "0.1.73" actix-rt = "2.9.0" serde_json = "1.0.107" validator = { version = "0.16.1", features = ["derive"] } +actix = "0.13.1" +actix-web-actors = "4.2.0" +rdkafka = "0.34.0" diff --git a/news/src/app.rs b/news/src/app.rs index 6073f08..d58e6ca 100644 --- a/news/src/app.rs +++ b/news/src/app.rs @@ -7,19 +7,28 @@ use crate::{ }; use log::{debug, error, info}; use tokio::{sync::mpsc, task}; -use utils::error::CommonError; +use utils::{ + broker::{self, KafkaProducer}, + error::CommonError, +}; #[derive(Clone)] pub struct App { pub feed_repo: Arc, pub news_repo: Arc, + pub kafka_producer: KafkaProducer, } impl App { - pub fn new(feed_repo: Arc, news_repo: Arc) -> App { + pub fn new( + feed_repo: Arc, + news_repo: Arc, + kafka_producer: KafkaProducer, + ) -> App { App { feed_repo, news_repo, + kafka_producer, } } @@ -47,13 +56,22 @@ impl App { if let Ok(None) = db_news { let result = self.news_repo.create(&news); - if let Err(err) = result { - error!("failed creating new {:?}: {}", news, CommonError::from(err)); - } else { - info!( - "News with title {} of feed {} inserted!", - news.title, news.feed_id - ); + match result { + Ok(news) => { + info!( + "News with title {} of feed {} inserted!", + news.title, news.feed_id + ); + let _ = broker::send_message_to_topic( + self.kafka_producer.clone(), + "news_created".to_string(), + serde_json::to_string(&news).unwrap(), + ) + .await; + } + Err(err) => { + error!("failed creating new {:?}: {}", news, CommonError::from(err)); + } } } } diff --git a/news/src/config.rs b/news/src/config.rs index 6d0ffa7..b13cf5e 100644 --- a/news/src/config.rs +++ b/news/src/config.rs @@ -7,6 +7,7 @@ pub struct Config { pub logs_path: String, pub server_port: String, pub jwt_secret: String, + pub kafka_url: String, } impl JwtMiddlewareConfig for Config { @@ -22,6 +23,7 @@ impl Config { let logs_path = std::env::var("LOGS_PATH").unwrap_or_else(|_| String::from("")); let server_port = std::env::var("PORT").unwrap_or_else(|_| String::from("8000")); let jwt_secret = std::env::var("JWT_SECRET").expect("JWT_SECRET must be set"); + let kafka_url = std::env::var("KAFKA_URL").expect("KAFKA_URL must be set"); Config { cors_origin, @@ -29,6 +31,7 @@ impl Config { logs_path, server_port, jwt_secret, + kafka_url, } } } diff --git a/news/src/lib.rs b/news/src/lib.rs index 81fb8ea..01b4eaa 100644 --- a/news/src/lib.rs +++ b/news/src/lib.rs @@ -2,6 +2,7 @@ pub mod app; pub mod config; pub mod handlers; pub mod models; +pub mod news_created_subscriber; pub mod repositories; pub mod schema; pub mod scrapper; diff --git a/news/src/main.rs b/news/src/main.rs index eca3d78..28aa1f9 100644 --- a/news/src/main.rs +++ b/news/src/main.rs @@ -1,10 +1,12 @@ extern crate log; +use actix::{Actor, Addr}; use actix_web::body::MessageBody; use actix_web::dev::{ServiceFactory, ServiceRequest, ServiceResponse}; use actix_web::{error::Error as ActixError, web, App as ActixApp, HttpServer}; use log::info; use news::handlers::subscriptions::{create_subscription, delete_subscription, get_subscriptions}; +use news::news_created_subscriber::setup_news_created_subscriber; use news::repositories::feed_repository::FeedDieselRepository; use news::repositories::news_repository::NewsDieselRepository; use news::repositories::subscription_repository::{ @@ -13,7 +15,10 @@ use news::repositories::subscription_repository::{ use std::thread; use std::{error::Error, sync::Arc}; use tokio_cron_scheduler::{Job, JobScheduler}; +use utils::broker::{self}; use utils::http::middlewares::jwt_auth::JwtMiddlewareConfig; +use utils::http::websockets::ws_handler::get_ws; +use utils::http::websockets::ws_server::WebsocketServer; use utils::{db::connect_db, http::utils::build_server, logger::init_logger}; use news::{ @@ -24,12 +29,14 @@ use news::{ repositories::{feed_repository::FeedRepository, news_repository::NewsRepository}, }; -#[tokio::main] +#[actix_web::main] async fn main() { let config = Config::init(); init_logger(config.logs_path.clone()); + let kafka_producer = broker::create_producer(config.kafka_url.clone()); + let db_pool = connect_db(config.database_url.clone()); let feed_repository: Arc = @@ -40,7 +47,11 @@ async fn main() { SubscriptionsDieselRepository::new(Arc::new(db_pool.clone())), ); - let app = App::new(feed_repository.clone(), news_repository.clone()); + let app = App::new( + feed_repository.clone(), + news_repository.clone(), + kafka_producer, + ); info!("Setting up cronjobs"); @@ -52,12 +63,23 @@ async fn main() { info!("Starting API server in port {}", server_port.clone()); + let ws_server = WebsocketServer::new().start(); + + let config_clone = config.clone(); + let ws_server_clone = ws_server.clone(); + let subscription_repo_clone = subscription_repository.clone(); + + actix_rt::spawn(async move { + setup_news_created_subscriber(&config_clone, ws_server_clone, subscription_repo_clone).await + }); + let server_result = HttpServer::new(move || { setup_http_server( &config, feed_repository.clone(), news_repository.clone(), subscription_repository.clone(), + ws_server.clone(), ) }) .bind(format!("0.0.0.0:{}", server_port.clone())); @@ -99,6 +121,7 @@ fn setup_http_server( feed_repo: Arc, news_repo: Arc, subscription_repo: Arc, + ws_server: Addr, ) -> ActixApp< impl ServiceFactory< ServiceRequest, @@ -116,9 +139,11 @@ fn setup_http_server( .app_data(web::Data::from(subscription_repo.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::from(jwt_config.clone())) + .app_data(web::Data::new(ws_server.clone())) .service(get_news) .service(get_feeds) .service(get_subscriptions) .service(create_subscription) .service(delete_subscription) + .service(get_ws) } diff --git a/news/src/news_created_subscriber.rs b/news/src/news_created_subscriber.rs new file mode 100644 index 0000000..4aac275 --- /dev/null +++ b/news/src/news_created_subscriber.rs @@ -0,0 +1,65 @@ +use crate::{ + config::Config, models::news::News, + repositories::subscription_repository::SubscriptionRepository, +}; +use actix::Addr; +use log::{debug, error, info}; +use rdkafka::consumer::Consumer; +use rdkafka::consumer::StreamConsumer; +use rdkafka::Message; +use std::{error::Error, sync::Arc}; +use utils::{ + broker, + http::websockets::ws_server::{SessionMessage, WebsocketServer}, +}; + +pub async fn setup_news_created_subscriber( + config: &Config, + ws: Addr, + subscription_repo: Arc, +) -> Result<(), Box> { + let consumer: StreamConsumer = broker::create_consumer(config.kafka_url.clone()); + + consumer + .subscribe(&["news_created"]) + .expect("Error subscribing to topic"); + + // Consume messages from the subscribed topic + loop { + match consumer.recv().await { + Ok(message) => { + let payload = message.payload_view::(); + match payload { + Some(Ok(payload)) => { + info!("Received message: {}", payload); + let news: News = + serde_json::from_str(payload).expect("Failed to convert JSON string"); + + let result = subscription_repo.list_by_feed(news.feed_id); + + match result { + Ok(subscriptions) => { + for s in subscriptions { + debug!("sending message to socket {}", s.user_id); + ws.do_send(SessionMessage { + id: s.user_id.to_string(), + message: payload.to_string(), + }); + } + } + Err(err) => { + error!( + "failed getting subscriptions from database: {}", + err.message + ); + } + } + } + Some(Err(_)) => error!("Error deserializing message payload"), + None => error!("Empty message payload"), + } + } + Err(_) => error!("Error deserializing message payload"), + } + } +} diff --git a/news/src/repositories/subscription_repository.rs b/news/src/repositories/subscription_repository.rs index 4f689ea..1544965 100644 --- a/news/src/repositories/subscription_repository.rs +++ b/news/src/repositories/subscription_repository.rs @@ -12,6 +12,7 @@ use crate::schema::subscriptions; #[automock] pub trait SubscriptionRepository: Send + Sync { fn list_by_user(&self, user_id: Uuid) -> Result, DatabaseError>; + fn list_by_feed(&self, feed_id: Uuid) -> Result, DatabaseError>; fn find_by_id( &self, feed_id: Uuid, @@ -74,6 +75,17 @@ impl SubscriptionRepository for SubscriptionsDieselRepository { }) } + fn list_by_feed(&self, feed_id: Uuid) -> Result, DatabaseError> { + let mut conn = self.pool.get().unwrap(); + + subscriptions::table + .filter(subscriptions::feed_id.eq(feed_id)) + .load::(&mut conn) + .map_err(|err| DatabaseError { + message: err.to_string(), + }) + } + fn delete(&self, feed_id: Uuid, user_id: Uuid) -> Result { let mut conn = self.pool.get().unwrap(); diff --git a/utils/src/broker.rs b/utils/src/broker.rs index 9b0f077..1a29ccd 100644 --- a/utils/src/broker.rs +++ b/utils/src/broker.rs @@ -11,7 +11,9 @@ use rdkafka::{ ClientConfig, }; -pub fn create_producer(kafka_url: String) -> FutureProducer { +pub type KafkaProducer = FutureProducer; + +pub fn create_producer(kafka_url: String) -> KafkaProducer { ClientConfig::new() .set("bootstrap.servers", kafka_url) .set("message.timeout.ms", "5000") @@ -20,7 +22,7 @@ pub fn create_producer(kafka_url: String) -> FutureProducer { } pub async fn send_message_to_topic( - producer: FutureProducer, + producer: KafkaProducer, topic: String, message: String, ) -> Result<(i32, i64), (KafkaError, OwnedMessage)> {