diff --git a/src/kiwi/src/source/mod.rs b/src/kiwi/src/source/mod.rs index 7845528..c48cd3f 100644 --- a/src/kiwi/src/source/mod.rs +++ b/src/kiwi/src/source/mod.rs @@ -1,3 +1,6 @@ +use base64::Engine; +use rdkafka::Message; +use serde::{Deserialize, Serialize}; use tokio::sync::broadcast::Receiver; pub mod kafka; @@ -11,3 +14,27 @@ pub trait Source { } pub type SourceId = String; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SourceResult { + /// Event payload + pub payload: Option, + /// Source ID + pub source_id: SourceId, + /// Timestamp at which the message was produced + pub timestamp: Option, +} + +impl From for SourceResult { + fn from(value: rdkafka::message::OwnedMessage) -> Self { + let payload = value + .payload() + .map(|p| base64::engine::general_purpose::STANDARD.encode(p)); + + Self { + payload, + source_id: value.topic().to_owned(), + timestamp: value.timestamp().to_millis(), + } + } +} diff --git a/src/kiwi/src/ws.rs b/src/kiwi/src/ws.rs index 26eaef7..4ecd9c0 100644 --- a/src/kiwi/src/ws.rs +++ b/src/kiwi/src/ws.rs @@ -2,10 +2,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::{net::SocketAddr, sync::Arc}; -use base64::{engine::general_purpose, Engine as _}; use futures::{SinkExt, StreamExt}; -use rdkafka::message::OwnedMessage; -use rdkafka::Message as _Message; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response}; @@ -20,32 +17,11 @@ use crate::ingest::IngestActor; use crate::hook::intercept::{self, Intercept}; use crate::protocol::{Command, Message, ProtocolError as KiwiProtocolError}; -use crate::source::Source; +use crate::source::{Source, SourceResult}; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::tungstenite::protocol::{CloseFrame, Message as ProtocolMessage}; -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct MessageData { - pub payload: Option, - pub topic: String, - pub timestamp: Option, - pub partition: i32, - pub offset: i64, -} - -impl From for MessageData { - fn from(value: OwnedMessage) -> Self { - Self { - payload: value.payload().map(|p| general_purpose::STANDARD.encode(p)), - topic: value.topic().to_owned(), - timestamp: value.timestamp().to_millis(), - partition: value.partition(), - offset: value.offset(), - } - } -} - /// Starts a WebSocket server with the specified configuration pub async fn serve( listen_addr: &SocketAddr, @@ -56,7 +32,7 @@ pub async fn serve( where S: Source + Send + Sync + 'static, M: Into - + Into + + Into + MutableEvent + std::fmt::Debug + Clone @@ -150,7 +126,7 @@ where Stream: AsyncRead + AsyncWrite + Unpin, S: Source + Send + Sync + 'static, M: Into - + Into + + Into + MutableEvent + std::fmt::Debug + Clone @@ -158,7 +134,7 @@ where + 'static, I: Intercept + Clone + Send + Sync + 'static, { - let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::>(); + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::>(); let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::(); let actor = IngestActor::new( diff --git a/src/kiwi/tests/kafka.rs b/src/kiwi/tests/kafka.rs index 4e1cb62..2863177 100644 --- a/src/kiwi/tests/kafka.rs +++ b/src/kiwi/tests/kafka.rs @@ -6,7 +6,7 @@ use tokio_tungstenite::{connect_async, tungstenite}; use kiwi::{ protocol::{Command, CommandResponse, Message}, - ws::MessageData, + source::SourceResult, }; use tempfile::NamedTempFile; @@ -101,11 +101,11 @@ async fn test_kafka_source() -> anyhow::Result<()> { let mut count = 0; while let Some(msg) = read.next().await { let msg = msg.unwrap(); - let msg: Message = serde_json::from_str(&msg.to_text().unwrap()).unwrap(); + let msg: Message = serde_json::from_str(&msg.to_text().unwrap()).unwrap(); match msg { Message::Result(msg) => { - assert_eq!(msg.topic.as_ref(), "topic1".to_string()); + assert_eq!(msg.source_id.as_ref(), "topic1".to_string()); let msg = base64::engine::general_purpose::STANDARD .decode(msg.payload.as_ref().unwrap()) .unwrap();