Skip to content

Commit

Permalink
rm tokio-tungstenite
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrishn7 committed Mar 15, 2024
1 parent e0f174c commit 9d156bd
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 86 deletions.
58 changes: 12 additions & 46 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/kiwi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ serde_yaml = "0.9"
thiserror = "1.0.57"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-tungstenite = "0.21.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
wasmtime = { version = "18.0.2", features = ["component-model", "async"] }
Expand All @@ -46,3 +45,6 @@ http-body-util = "0.1.1"
tempfile = "3"
nix = { version = "0.28.0", features = ["signal"] }
reqwest = "0.11.26"
hyper = "1.2.0"
hyper-util = "0.1.3"
bytes = "1.5.0"
20 changes: 2 additions & 18 deletions src/kiwi/tests/common/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{future::IntoFuture, time::Duration};
use std::time::Duration;

use anyhow::anyhow;
use futures::Future;

pub struct Healthcheck<'a> {
pub interval: Duration,
Expand All @@ -10,7 +9,7 @@ pub struct Healthcheck<'a> {
}

impl<'a> Healthcheck<'a> {
async fn run(&self) -> anyhow::Result<()> {
pub async fn run(&self) -> anyhow::Result<()> {
for _ in 0..self.attempts {
if let Ok(response) = reqwest::get(self.url).await {
if response.status().is_success() {
Expand All @@ -27,18 +26,3 @@ impl<'a> Healthcheck<'a> {
))
}
}

impl<'a> Future for Healthcheck<'a> {
type Output = anyhow::Result<()>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let fut = self.run().into_future();

let mut pinned_fut = std::pin::pin!(fut);

pinned_fut.as_mut().poll(cx)
}
}
87 changes: 66 additions & 21 deletions src/kiwi/tests/common/ws.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,70 @@
use futures::{SinkExt, StreamExt};
use http::Response;
use fastwebsockets::FragmentCollector;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use fastwebsockets::Payload;
use futures::Future;
use http_body_util::Empty;
use tokio::net::TcpStream;
use tokio_tungstenite::{
connect_async,
tungstenite::{client::IntoClientRequest, Message},
MaybeTlsStream, WebSocketStream,
};

use bytes::Bytes;
use hyper::body::Incoming;
use hyper::header::CONNECTION;
use hyper::header::UPGRADE;
use hyper::upgrade::Upgraded;
use hyper::{Request, Response, Uri};
use hyper_util::rt::TokioIo;

pub struct Client {
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
ws: FragmentCollector<TokioIo<Upgraded>>,
}

struct SpawnExecutor;

impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
fn execute(&self, fut: Fut) {
tokio::task::spawn(fut);
}
}

impl Client {
pub async fn connect<R: IntoClientRequest + Unpin>(
url: R,
) -> anyhow::Result<(Self, Response<Option<Vec<u8>>>)> {
let (stream, response) = connect_async(url).await?;
pub async fn connect(uri: &str) -> anyhow::Result<(Self, Response<Incoming>)> {
let uri: Uri = uri.try_into()?;
let stream = TcpStream::connect(
format!("{}:{}", uri.host().unwrap(), uri.port_u16().unwrap()).as_str(),
)
.await?;

let req = Request::builder()
.method("GET")
.uri(&uri)
.header("Host", uri.host().unwrap())
.header(UPGRADE, "websocket")
.header(CONNECTION, "upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.body(Empty::<Bytes>::new())?;

Ok((Self { stream }, response))
let (ws, res) = fastwebsockets::handshake::client(&SpawnExecutor, req, stream).await?;

Ok((
Self {
ws: FragmentCollector::new(ws),
},
res,
))
}

pub async fn send_text(&mut self, text: &str) -> anyhow::Result<()> {
self.stream.send(Message::Text(text.to_string())).await?;
self.ws
.write_frame(Frame::text(Payload::Borrowed(text.as_bytes())))
.await?;

Ok(())
}
Expand All @@ -33,16 +76,18 @@ impl Client {
Ok(())
}

pub async fn recv(&mut self) -> Option<Result<Message, tokio_tungstenite::tungstenite::Error>> {
self.stream.next().await
pub async fn recv_text_frame(&mut self) -> anyhow::Result<Frame<'_>> {
let frame = self.ws.read_frame().await?;

match frame.opcode {
OpCode::Text => Ok(frame),
_ => Err(anyhow::anyhow!("Expected text frame")),
}
}

pub async fn recv_json<T: serde::de::DeserializeOwned>(&mut self) -> anyhow::Result<T> {
let message = self
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("Connection closed"))??;
let text = message.to_text()?;
let text_frame = self.recv_text_frame().await?;
let text = std::str::from_utf8(text_frame.payload.as_ref())?;
let value = serde_json::from_str(&text)?;

Ok(value)
Expand Down
4 changes: 4 additions & 0 deletions src/kiwi/tests/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn test_receives_messages_kafka_source() -> anyhow::Result<()> {
attempts: 10,
url: "http://127.0.0.1:8000/health",
}
.run()
.await?;

let (mut ws_client, _) = WsClient::connect("ws://127.0.0.1:8000").await?;
Expand Down Expand Up @@ -139,6 +140,7 @@ async fn test_closes_subscription_on_partition_added() -> anyhow::Result<()> {
attempts: 10,
url: "http://127.0.0.1:8000/health",
}
.run()
.await?;

let (mut ws_client, _) = WsClient::connect("ws://127.0.0.1:8000").await?;
Expand Down Expand Up @@ -207,6 +209,7 @@ async fn test_named_kafka_source() -> anyhow::Result<()> {
attempts: 10,
url: "http://127.0.0.1:8000/health",
}
.run()
.await?;

let (mut ws_client, _) = WsClient::connect("ws://127.0.0.1:8000").await?;
Expand Down Expand Up @@ -275,6 +278,7 @@ async fn test_dynamic_config_source_removal() -> anyhow::Result<()> {
attempts: 10,
url: "http://127.0.0.1:8000/health",
}
.run()
.await?;

let (mut ws_client, _) = WsClient::connect("ws://127.0.0.1:8000").await?;
Expand Down

0 comments on commit 9d156bd

Please sign in to comment.