Skip to content

Commit

Permalink
fix: make results source agnostic
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrishn7 committed Dec 7, 2023
1 parent 0e49988 commit b985a85
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 31 deletions.
27 changes: 27 additions & 0 deletions src/kiwi/src/source/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use base64::Engine;
use rdkafka::Message;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::Receiver;

pub mod kafka;
Expand All @@ -11,3 +14,27 @@ pub trait Source {
}

pub type SourceId = String;

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SourceResult {
/// Event payload
pub payload: Option<String>,
/// Source ID
pub source_id: SourceId,
/// Timestamp at which the message was produced
pub timestamp: Option<i64>,
}

impl From<rdkafka::message::OwnedMessage> for SourceResult {
fn from(value: rdkafka::message::OwnedMessage) -> Self {
let payload = value
.payload()
.map(|p| base64::engine::general_purpose::STANDARD.encode(p));

Self {
payload,
source_id: value.topic().to_owned(),
timestamp: value.timestamp().to_millis(),
}
}
}
32 changes: 4 additions & 28 deletions src/kiwi/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use std::borrow::Cow;
use std::collections::BTreeMap;
use std::{net::SocketAddr, sync::Arc};

use base64::{engine::general_purpose, Engine as _};
use futures::{SinkExt, StreamExt};
use rdkafka::message::OwnedMessage;
use rdkafka::Message as _Message;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
Expand All @@ -20,32 +17,11 @@ use crate::ingest::IngestActor;

use crate::hook::intercept::{self, Intercept};
use crate::protocol::{Command, Message, ProtocolError as KiwiProtocolError};
use crate::source::Source;
use crate::source::{Source, SourceResult};

use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::{CloseFrame, Message as ProtocolMessage};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct MessageData {
pub payload: Option<String>,
pub topic: String,
pub timestamp: Option<i64>,
pub partition: i32,
pub offset: i64,
}

impl From<OwnedMessage> for MessageData {
fn from(value: OwnedMessage) -> Self {
Self {
payload: value.payload().map(|p| general_purpose::STANDARD.encode(p)),
topic: value.topic().to_owned(),
timestamp: value.timestamp().to_millis(),
partition: value.partition(),
offset: value.offset(),
}
}
}

/// Starts a WebSocket server with the specified configuration
pub async fn serve<S, M, I, A>(
listen_addr: &SocketAddr,
Expand All @@ -56,7 +32,7 @@ pub async fn serve<S, M, I, A>(
where
S: Source<Message = M> + Send + Sync + 'static,
M: Into<intercept::types::EventCtx>
+ Into<MessageData>
+ Into<SourceResult>
+ MutableEvent
+ std::fmt::Debug
+ Clone
Expand Down Expand Up @@ -150,15 +126,15 @@ where
Stream: AsyncRead + AsyncWrite + Unpin,
S: Source<Message = M> + Send + Sync + 'static,
M: Into<intercept::types::EventCtx>
+ Into<MessageData>
+ Into<SourceResult>
+ MutableEvent
+ std::fmt::Debug
+ Clone
+ Send
+ 'static,
I: Intercept + Clone + Send + Sync + 'static,
{
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<Message<MessageData>>();
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<Message<SourceResult>>();
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<Command>();

let actor = IngestActor::new(
Expand Down
6 changes: 3 additions & 3 deletions src/kiwi/tests/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio_tungstenite::{connect_async, tungstenite};

use kiwi::{
protocol::{Command, CommandResponse, Message},
ws::MessageData,
source::SourceResult,
};

use tempfile::NamedTempFile;
Expand Down Expand Up @@ -101,11 +101,11 @@ async fn test_kafka_source() -> anyhow::Result<()> {
let mut count = 0;
while let Some(msg) = read.next().await {
let msg = msg.unwrap();
let msg: Message<MessageData> = serde_json::from_str(&msg.to_text().unwrap()).unwrap();
let msg: Message<SourceResult> = serde_json::from_str(&msg.to_text().unwrap()).unwrap();

match msg {
Message::Result(msg) => {
assert_eq!(msg.topic.as_ref(), "topic1".to_string());
assert_eq!(msg.source_id.as_ref(), "topic1".to_string());
let msg = base64::engine::general_purpose::STANDARD
.decode(msg.payload.as_ref().unwrap())
.unwrap();
Expand Down

0 comments on commit b985a85

Please sign in to comment.