Skip to content

Commit

Permalink
feat: broadcast news through websockets when created
Browse files Browse the repository at this point in the history
  • Loading branch information
fabioDMFerreira committed Sep 24, 2023
1 parent 4045499 commit 188ba64
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 22 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ services:
CORS_ORIGIN: http://localhost:3000
PORT: 8001
JWT_SECRET: users-secret-1234
KAFKA_URL: kafka:29092
depends_on:
postgres:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ function App() {
<div>
<Logout logout={logout} />
<UserInfo />
<Feeds />
<WebSocketComponent />
<Feeds />
</div>
)}
</div>
Expand Down
43 changes: 37 additions & 6 deletions frontend/src/components/WebsocketComponent.tsx
Original file line number Diff line number Diff line change
@@ -1,19 +1,50 @@
import React, { useEffect } from 'react';
import api from '../services/api';
import React, { useCallback, useEffect, useState } from 'react';
import api, { News } from '../services/api';

interface Props {}

const WebSocketComponent = (_: Props) => {
useEffect(() => {
const connDestruct = api.connectWs((event) => {
const [lastNews, setLastNews] = useState<News[]>([]);

const addLastNews = useCallback(
(event: MessageEvent<any>) => {
try {
const news = JSON.parse(event.data);

setLastNews([news, ...lastNews]);
} catch (err) {
console.log(err);
}
console.log('WebSocket message received:', event.data);
});
},
[lastNews]
);

useEffect(() => {
const connDestruct = api.connectWs(addLastNews);

// Clean up the WebSocket connection when the component unmounts
return connDestruct;
}, []);

return <span aria-description="websocket-placeholder"></span>;
return (
<>
{lastNews.length !== 0 ? (
<>
<h2>Last News</h2>
{lastNews.map((news) => {
return (
<p>
{news.title} ({news.author}) {news.publish_date}
</p>
);
})}
</>
) : (
<></>
)}
</>
);
};

export default WebSocketComponent;
2 changes: 1 addition & 1 deletion frontend/src/services/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class API {
}

connectWs(onMessage: (event: MessageEvent<any>) => void) {
const socket = new WebSocket('ws://localhost:8000/connect-ws');
const socket = new WebSocket('ws://localhost:8001/connect-ws');
// const socket = new WebSocket(`ws://${window.location.host}/connect-ws`);

let interval: string | number | NodeJS.Timer | undefined;
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/setupProxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ module.exports = function (app) {
'/connect-ws',
createProxyMiddleware({
ws: true,
target: 'http://users:8000',
target: 'http://news:8001',
changeOrigin: true,
})
);
Expand Down
3 changes: 3 additions & 0 deletions news/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ async-trait = "0.1.73"
actix-rt = "2.9.0"
serde_json = "1.0.107"
validator = { version = "0.16.1", features = ["derive"] }
actix = "0.13.1"
actix-web-actors = "4.2.0"
rdkafka = "0.34.0"
36 changes: 27 additions & 9 deletions news/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,28 @@ use crate::{
};
use log::{debug, error, info};
use tokio::{sync::mpsc, task};
use utils::error::CommonError;
use utils::{
broker::{self, KafkaProducer},
error::CommonError,
};

#[derive(Clone)]
pub struct App {
pub feed_repo: Arc<dyn FeedRepository>,
pub news_repo: Arc<dyn NewsRepository>,
pub kafka_producer: KafkaProducer,
}

impl App {
pub fn new(feed_repo: Arc<dyn FeedRepository>, news_repo: Arc<dyn NewsRepository>) -> App {
pub fn new(
feed_repo: Arc<dyn FeedRepository>,
news_repo: Arc<dyn NewsRepository>,
kafka_producer: KafkaProducer,
) -> App {
App {
feed_repo,
news_repo,
kafka_producer,
}
}

Expand Down Expand Up @@ -47,13 +56,22 @@ impl App {

if let Ok(None) = db_news {
let result = self.news_repo.create(&news);
if let Err(err) = result {
error!("failed creating new {:?}: {}", news, CommonError::from(err));
} else {
info!(
"News with title {} of feed {} inserted!",
news.title, news.feed_id
);
match result {
Ok(news) => {
info!(
"News with title {} of feed {} inserted!",
news.title, news.feed_id
);
let _ = broker::send_message_to_topic(
self.kafka_producer.clone(),
"news_created".to_string(),
serde_json::to_string(&news).unwrap(),
)
.await;
}
Err(err) => {
error!("failed creating new {:?}: {}", news, CommonError::from(err));
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions news/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub struct Config {
pub logs_path: String,
pub server_port: String,
pub jwt_secret: String,
pub kafka_url: String,
}

impl JwtMiddlewareConfig for Config {
Expand All @@ -22,13 +23,15 @@ impl Config {
let logs_path = std::env::var("LOGS_PATH").unwrap_or_else(|_| String::from(""));
let server_port = std::env::var("PORT").unwrap_or_else(|_| String::from("8000"));
let jwt_secret = std::env::var("JWT_SECRET").expect("JWT_SECRET must be set");
let kafka_url = std::env::var("KAFKA_URL").expect("KAFKA_URL must be set");

Config {
cors_origin,
database_url,
logs_path,
server_port,
jwt_secret,
kafka_url,
}
}
}
1 change: 1 addition & 0 deletions news/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod app;
pub mod config;
pub mod handlers;
pub mod models;
pub mod news_created_subscriber;
pub mod repositories;
pub mod schema;
pub mod scrapper;
29 changes: 27 additions & 2 deletions news/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
extern crate log;

use actix::{Actor, Addr};
use actix_web::body::MessageBody;
use actix_web::dev::{ServiceFactory, ServiceRequest, ServiceResponse};
use actix_web::{error::Error as ActixError, web, App as ActixApp, HttpServer};
use log::info;
use news::handlers::subscriptions::{create_subscription, delete_subscription, get_subscriptions};
use news::news_created_subscriber::setup_news_created_subscriber;
use news::repositories::feed_repository::FeedDieselRepository;
use news::repositories::news_repository::NewsDieselRepository;
use news::repositories::subscription_repository::{
Expand All @@ -13,7 +15,10 @@ use news::repositories::subscription_repository::{
use std::thread;
use std::{error::Error, sync::Arc};
use tokio_cron_scheduler::{Job, JobScheduler};
use utils::broker::{self};
use utils::http::middlewares::jwt_auth::JwtMiddlewareConfig;
use utils::http::websockets::ws_handler::get_ws;
use utils::http::websockets::ws_server::WebsocketServer;
use utils::{db::connect_db, http::utils::build_server, logger::init_logger};

use news::{
Expand All @@ -24,12 +29,14 @@ use news::{
repositories::{feed_repository::FeedRepository, news_repository::NewsRepository},
};

#[tokio::main]
#[actix_web::main]
async fn main() {
let config = Config::init();

init_logger(config.logs_path.clone());

let kafka_producer = broker::create_producer(config.kafka_url.clone());

let db_pool = connect_db(config.database_url.clone());

let feed_repository: Arc<dyn FeedRepository> =
Expand All @@ -40,7 +47,11 @@ async fn main() {
SubscriptionsDieselRepository::new(Arc::new(db_pool.clone())),
);

let app = App::new(feed_repository.clone(), news_repository.clone());
let app = App::new(
feed_repository.clone(),
news_repository.clone(),
kafka_producer,
);

info!("Setting up cronjobs");

Expand All @@ -52,12 +63,23 @@ async fn main() {

info!("Starting API server in port {}", server_port.clone());

let ws_server = WebsocketServer::new().start();

let config_clone = config.clone();
let ws_server_clone = ws_server.clone();
let subscription_repo_clone = subscription_repository.clone();

actix_rt::spawn(async move {
setup_news_created_subscriber(&config_clone, ws_server_clone, subscription_repo_clone).await
});

let server_result = HttpServer::new(move || {
setup_http_server(
&config,
feed_repository.clone(),
news_repository.clone(),
subscription_repository.clone(),
ws_server.clone(),
)
})
.bind(format!("0.0.0.0:{}", server_port.clone()));
Expand Down Expand Up @@ -99,6 +121,7 @@ fn setup_http_server(
feed_repo: Arc<dyn FeedRepository>,
news_repo: Arc<dyn NewsRepository>,
subscription_repo: Arc<dyn SubscriptionRepository>,
ws_server: Addr<WebsocketServer>,
) -> ActixApp<
impl ServiceFactory<
ServiceRequest,
Expand All @@ -116,9 +139,11 @@ fn setup_http_server(
.app_data(web::Data::from(subscription_repo.clone()))
.app_data(web::Data::new(config.clone()))
.app_data(web::Data::from(jwt_config.clone()))
.app_data(web::Data::new(ws_server.clone()))
.service(get_news)
.service(get_feeds)
.service(get_subscriptions)
.service(create_subscription)
.service(delete_subscription)
.service(get_ws)
}
65 changes: 65 additions & 0 deletions news/src/news_created_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::{
config::Config, models::news::News,
repositories::subscription_repository::SubscriptionRepository,
};
use actix::Addr;
use log::{debug, error, info};
use rdkafka::consumer::Consumer;
use rdkafka::consumer::StreamConsumer;
use rdkafka::Message;
use std::{error::Error, sync::Arc};
use utils::{
broker,
http::websockets::ws_server::{SessionMessage, WebsocketServer},
};

pub async fn setup_news_created_subscriber(
config: &Config,
ws: Addr<WebsocketServer>,
subscription_repo: Arc<dyn SubscriptionRepository>,
) -> Result<(), Box<dyn Error>> {
let consumer: StreamConsumer = broker::create_consumer(config.kafka_url.clone());

consumer
.subscribe(&["news_created"])
.expect("Error subscribing to topic");

// Consume messages from the subscribed topic
loop {
match consumer.recv().await {
Ok(message) => {
let payload = message.payload_view::<str>();
match payload {
Some(Ok(payload)) => {
info!("Received message: {}", payload);
let news: News =
serde_json::from_str(payload).expect("Failed to convert JSON string");

let result = subscription_repo.list_by_feed(news.feed_id);

match result {
Ok(subscriptions) => {
for s in subscriptions {
debug!("sending message to socket {}", s.user_id);
ws.do_send(SessionMessage {
id: s.user_id.to_string(),
message: payload.to_string(),
});
}
}
Err(err) => {
error!(
"failed getting subscriptions from database: {}",
err.message
);
}
}
}
Some(Err(_)) => error!("Error deserializing message payload"),
None => error!("Empty message payload"),
}
}
Err(_) => error!("Error deserializing message payload"),
}
}
}
Loading

0 comments on commit 188ba64

Please sign in to comment.