Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrishn7 committed Dec 7, 2023
1 parent abd2247 commit 5542cf6
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 10 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/kiwi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ tracing = "0.1.40"
tracing-subscriber = "0.3.18"
wasmtime = { version = "14.0.4", features = ["component-model"] }
wasmtime-wasi = "14.0.4"

[dev-dependencies]
tempfile = "3"
8 changes: 4 additions & 4 deletions src/kiwi/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use thiserror::Error;

use crate::source::SourceId;

#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(tag = "type")]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
/// A request that is sent from a client to the server
Expand All @@ -14,7 +14,7 @@ pub enum Command {
Unsubscribe { source_id: SourceId },
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type")]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum CommandResponse {
Expand All @@ -28,7 +28,7 @@ pub enum CommandResponse {
UnsubscribeError { source_id: SourceId, error: String },
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type")]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
/// An info or error message that may be pushed to a client. A notice, in many
Expand All @@ -37,7 +37,7 @@ pub enum Notice {
Lag { source: SourceId, count: u64 },
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", content = "data")]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
/// An outbound message that is sent from the server to a client
Expand Down
12 changes: 6 additions & 6 deletions src/kiwi/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use crate::source::Source;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::{CloseFrame, Message as ProtocolMessage};

#[derive(Debug, Clone, serde::Serialize)]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct MessageData {
payload: Option<String>,
topic: String,
timestamp: Option<i64>,
partition: i32,
offset: i64,
pub payload: Option<String>,
pub topic: String,
pub timestamp: Option<i64>,
pub partition: i32,
pub offset: i64,
}

impl From<OwnedMessage> for MessageData {
Expand Down
130 changes: 130 additions & 0 deletions src/kiwi/tests/kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use base64::Engine;
use futures::{SinkExt, StreamExt};
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::{io::Write, process, time::Duration};
use tokio_tungstenite::{connect_async, tungstenite};

use kiwi::{
protocol::{Command, CommandResponse, Message},
ws::MessageData,
};

use tempfile::NamedTempFile;

// Helper function to start the kiwi process.
fn start_kiwi(config: &str) -> anyhow::Result<std::process::Child> {
// Expects `kiwi` to be in the PATH
let mut cmd = process::Command::new("kiwi");

let mut config_file = NamedTempFile::new()?;
config_file
.as_file_mut()
.write_all(config.as_bytes())
.unwrap();

cmd.args(&[
"--config",
config_file.path().to_str().expect("path should be valid"),
"--log-level",
"debug",
]);
let child = cmd.spawn().unwrap();

Ok(child)
}

#[ignore = "todo"]
#[tokio::test]
async fn test_kafka_source() -> anyhow::Result<()> {
let config = r#"
sources:
kafka:
group_prefix: ''
bootstrap_servers:
- 'localhost:9092'
topics:
- name: topic1
server:
address: '127.0.0.1:8000'
"#;

let mut proc = start_kiwi(config)?;

let (ws_stream, _) = connect_async("http://127.0.0.1:8000")
.await
.expect("Failed to connect");

let (mut write, mut read) = ws_stream.split();

let cmd = Command::Subscribe {
source_id: "topic1".into(),
};

write
.send(tungstenite::protocol::Message::Text(
serde_json::to_string(&cmd).unwrap(),
))
.await?;

let resp = read.next().await.expect("Expected response")?;

let resp: Message<()> = serde_json::from_str(&resp.to_text().unwrap())?;

match resp {
Message::CommandResponse(CommandResponse::SubscribeOk { source_id }) => {
assert_eq!(source_id, "topic1".to_string());
}
_ => panic!("Expected subscribe ok"),
}

let producer = tokio::spawn(async {
let producer: FutureProducer = rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");

for i in 0..1000 {
let payload = format!("Message {}", i);
let key = format!("Key {}", i);
let record = FutureRecord::to("topic1").payload(&payload).key(&key);

producer
.send(record, Duration::from_secs(0))
.await
.expect("Failed to enqueue");
}
});

let reader = tokio::spawn(async move {
let mut count = 0;
while let Some(msg) = read.next().await {
let msg = msg.unwrap();
let msg: Message<MessageData> = serde_json::from_str(&msg.to_text().unwrap()).unwrap();

match msg {
Message::Result(msg) => {
assert_eq!(msg.topic.as_ref(), "topic1".to_string());
let msg = base64::engine::general_purpose::STANDARD
.decode(msg.payload.as_ref().unwrap())
.unwrap();
assert_eq!(
std::str::from_utf8(&msg).unwrap(),
format!("Message {}", count)
);
count += 1;
}
_ => panic!("Expected message"),
}
}

assert_eq!(count, 1000, "failed to receive all messages");
});

let _ = futures::join!(producer, reader);

let _ = proc.kill();

Ok(())
}

0 comments on commit 5542cf6

Please sign in to comment.