Skip to content

Commit

Permalink
refactor(ffi): cleanup streaming and json decoding (#468)
Browse files Browse the repository at this point in the history
  • Loading branch information
erka authored Oct 23, 2024
1 parent 8c396f0 commit ab4a82e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 84 deletions.
1 change: 1 addition & 0 deletions flipt-engine-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ serde = { version = "1.0.147", features = ["derive"] }
serde_json = { version = "1.0.89", features = ["raw_value"] }
reqwest = { version = "0.12.2", features = ["json", "stream"] }
tokio = { version = "1.36.0", features = ["full"] }
tokio-util = { version = "0.7", features = ["io", "codec"] }
futures-util = "0.3.30"
native-tls = { version = "0.2", features = ["vendored"] }
futures = "0.3"
Expand Down
161 changes: 77 additions & 84 deletions flipt-engine-ffi/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use futures::TryStreamExt;
use futures_util::stream::StreamExt;
use reqwest::header::{self, HeaderMap};
use reqwest::Response;
Expand All @@ -11,6 +12,7 @@ use reqwest_retry::RetryTransientMiddleware;
use serde::Deserialize;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio_util::io::StreamReader;

use fliptevaluation::error::Error;
use fliptevaluation::models::source;
Expand Down Expand Up @@ -299,16 +301,10 @@ impl HTTPFetcher {
sender: &mpsc::Sender<Result<source::Document, Error>>,
) -> Result<(), Error> {
let result = match self.fetch().await {
Ok(Some(resp)) => match resp.text().await {
Ok(json) => match serde_json::from_str(&json) {
Ok(doc) => Ok(doc),
Err(e) => Err(Error::InvalidJSON(format!(
"failed to parse response body: {}",
e
))),
},
Err(e) => Err(Error::Server(format!(
"failed to read response body: {}",
Ok(Some(resp)) => match resp.json::<source::Document>().await {
Ok(doc) => Ok(doc),
Err(e) => Err(Error::InvalidJSON(format!(
"failed to parse response body: {}",
e
))),
},
Expand All @@ -330,73 +326,55 @@ impl HTTPFetcher {

match stream_result {
Ok(Some(resp)) => {
let mut stream = resp.bytes_stream();
let mut buffer = Vec::new();
let reader = StreamReader::new(
resp.bytes_stream()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let codec = tokio_util::codec::LinesCodec::new();
let frame_reader = tokio_util::codec::FramedRead::new(reader, codec);

let mut stream = frame_reader
.into_stream()
.map(|frame| match frame {
Ok(frame) => match serde_json::from_str::<StreamChunk>(&frame) {
Ok(result) => Ok(result),
Err(e) => Err(Error::InvalidJSON(format!(
"failed to parse response body: {}",
e
))),
},
Err(e) => Err(Error::Server(format!("failed to read stream chunk: {}", e))),
})
.map(|result| match result {
Ok(result) => match result.result.namespaces.into_iter().next() {
Some((_, doc)) => Ok(doc),
None => {
let err = Error::Server("no data received from server".into());
Err(err)
}
},
Err(err) => Err(err),
});

while let Some(bytes) = stream.next().await {
let chunk = match bytes {
Ok(chunk) => chunk,
while let Some(value) = stream.next().await {
match sender.send(value).await {
Ok(_) => continue,
Err(e) => {
let err = Error::Server(format!("failed to read stream chunk: {}", e));
sender
.send(Err(err.clone()))
.await
.map_err(|_| Error::Server("failed to send error".into()))?;
continue; // Continue to the next iteration instead of returning
}
};

for byte in chunk {
if byte == b'\n' {
let text = String::from_utf8_lossy(&buffer);
let parse_result: Result<StreamChunk, Error> =
serde_json::from_str(&text).map_err(|e| {
Error::InvalidJSON(format!(
"failed to parse response body: {}",
e
))
});

match parse_result {
Ok(stream_chunk) => {
match stream_chunk.result.namespaces.into_iter().next() {
Some((_, doc)) => {
sender.send(Ok(doc)).await.map_err(|_| {
Error::Server("failed to send result".into())
})?;
}
None => {
let err = Error::Server(
"no data received from server".into(),
);
sender.send(Err(err)).await.map_err(|_| {
Error::Server("failed to send error".into())
})?;
}
}
}
Err(e) => {
sender.send(Err(e)).await.map_err(|_| {
Error::Server("failed to send error".into())
})?;
}
}
buffer.clear();
} else {
buffer.push(byte);
return Err(Error::Server(format!(
"failed to send result to engine {}",
e
)))
}
}
}

Ok(())
}
Ok(None) => Ok(()),
Err(e) => {
sender
.send(Err(e.clone()))
.await
.map_err(|_| Error::Server("failed to send error".into()))?;
Ok(()) // Return Ok after sending the error through the channel
}
Err(e) => sender
.send(Err(e.clone()))
.await
.map_err(|_| Error::Server("failed to send error".into())),
}
}

Expand All @@ -405,15 +383,9 @@ impl HTTPFetcher {

match response {
Some(resp) => {
let body = resp
.text()
.await
.map_err(|e| Error::Server(format!("failed to read response body: {}", e)))?;

let document: source::Document = serde_json::from_str(&body).map_err(|e| {
let document = resp.json::<source::Document>().await.map_err(|e| {
Error::InvalidJSON(format!("failed to parse response body: {}", e))
})?;

Ok(document)
}
None => Err(Error::Server("no data received from server".into())),
Expand All @@ -423,11 +395,13 @@ impl HTTPFetcher {

#[cfg(test)]
mod tests {
use futures::FutureExt;
use mockito::Server;

use crate::http::Authentication;
use crate::http::FetchMode;
use crate::http::HTTPFetcherBuilder;
use tokio::sync::mpsc;

#[tokio::test]
async fn test_http_fetch() {
Expand Down Expand Up @@ -524,7 +498,7 @@ mod tests {

let result = fetcher.fetch().await;

assert!(!result.is_ok());
assert!(result.is_err());
mock.assert();
}

Expand Down Expand Up @@ -608,10 +582,11 @@ mod tests {
"application/json",
)
.with_chunked_body(|w| {
w.write_all(b"{\"namespace\": {\"key\": \"default\"}, \"flags\":[]}\n")?;
w.write_all(b"{\"result\":{ \"namespaces\":{\"default\":{\"namespace\": {\"key\": \"default\"}, \"flags\":[]}}}}\n")?;
w.write_all(
b"{\"namespace\": {\"key\": \"default\"}, \"flags\":[{\"key\": \"new_flag\"}]}\n",
b"{\"result\":{ \"namespaces\":{\"default\":{\"namespace\": {\"key\": \"default\"}, \"flags\":[{\"key\": \"new_flag\", \"name\": \"new flag\", \"enabled\": false}]}}}}\n",
)?;
w.write_all(b"{\n")?;
Ok(())
})
.create_async()
Expand All @@ -623,10 +598,28 @@ mod tests {
.mode(FetchMode::Streaming)
.build();

let result = fetcher.fetch_stream().await;

let (tx, mut rx) = mpsc::channel(4);
let result = fetcher.handle_streaming(&tx).await;
assert!(result.is_ok());
mock.assert();
assert!(rx.len() == 3);
// check first result
let result = rx
.recv()
.map(|r| r.expect("valid record").expect("valid doc"))
.await;
assert_eq!("default", result.namespace.key);
assert_eq!(0, result.flags.len());
// check second result
let result = rx
.recv()
.map(|r| r.expect("valid record").expect("valid doc"))
.await;
assert_eq!("default", result.namespace.key);
assert_eq!(1, result.flags.len());
// check third result
let result = rx.recv().map(|r| r.expect("valid record")).await;
assert!(result.is_err())
}

#[test]
Expand Down Expand Up @@ -654,7 +647,7 @@ mod tests {
fn test_deserialize_no_auth() {
let json = r#""#;

let unwrapped_string: Authentication = serde_json::from_str(&json).unwrap_or_default();
let unwrapped_string: Authentication = serde_json::from_str(json).unwrap_or_default();

assert_eq!(unwrapped_string, Authentication::None);
}
Expand All @@ -663,7 +656,7 @@ mod tests {
fn test_deserialize_client_token() {
let json = r#"{"client_token":"secret"}"#;

let unwrapped_string: Authentication = serde_json::from_str(&json).unwrap_or_default();
let unwrapped_string: Authentication = serde_json::from_str(json).unwrap_or_default();

assert_eq!(
unwrapped_string,
Expand All @@ -675,7 +668,7 @@ mod tests {
fn test_deserialize_jwt_token() {
let json = r#"{"jwt_token":"secret"}"#;

let unwrapped_string: Authentication = serde_json::from_str(&json).unwrap_or_default();
let unwrapped_string: Authentication = serde_json::from_str(json).unwrap_or_default();

assert_eq!(unwrapped_string, Authentication::JwtToken("secret".into()));
}
Expand Down

0 comments on commit ab4a82e

Please sign in to comment.