Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
feat: enable benchmarking (#67)
Browse files Browse the repository at this point in the history
* feat: read the server endpoint from env

* fix: try kill server
  • Loading branch information
unw9527 authored Apr 30, 2024
1 parent 678a62e commit e198865
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 14 deletions.
15 changes: 9 additions & 6 deletions storage-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
name = "parpulse-client"
version = "0.1.0"
edition = "2021"
authors = ["Yuanxin Cao <[email protected]>", "Kunle <[email protected]>", "Lan Lou <[email protected]>"]
authors = [
"Yuanxin Cao <[email protected]>",
"Kunle <[email protected]>",
"Lan Lou <[email protected]>",
]
description = "Client application for Parpulse OLAP database I/O cache service"
license-file = "LICENSE"
homepage = "https://github.com/cmu-db/15721-s24-cache1"
repository = "https://github.com/cmu-db/15721-s24-cache1"
documentation = "https://github.com/cmu-db/15721-s24-cache1/blob/main/README.md"
readme = "README.md"
include = [
"src/client.rs",
"src/lib.rs"
]
include = ["src/client.rs", "src/lib.rs"]

[dependencies]
anyhow = "1"
Expand All @@ -24,10 +25,12 @@ reqwest = { version = "0.12", features = ["stream"] }
tempfile = "3.2"
parquet = { version = "50.0.0", features = ["async"] }
arrow = "50.0.0"
mockito = "1.4.0"
log = "0.4"
istziio-client = "0.1"
lazy_static = "1.4"
enum-as-inner = "0.6"
serde = { version = "1", features = ["derive"] }
env_logger = "0.11"

[dev-dependencies]
mockito = "1.4.0"
26 changes: 21 additions & 5 deletions storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,29 @@ impl StorageClientImpl {
fn get_request_url_and_params(
&self,
location: (String, Vec<String>),
) -> (String, Vec<(&str, String)>) {
let url = format!("{}file", self.storage_server_endpoint);
) -> Result<(String, Vec<(&str, String)>)> {
let scheme = self
.storage_server_endpoint
.scheme()
.ok_or_else(|| anyhow!("Failed to get the scheme of the storage server endpoint."))?
.to_owned();
let authority = self
.storage_server_endpoint
.authority()
.ok_or_else(|| anyhow!("Failed to get the authority of the storage server endpoint."))?
.to_owned();
let path = "/file";
let url = Uri::builder()
.scheme(scheme)
.authority(authority)
.path_and_query(path)
.build()
.unwrap();
let params = vec![
(PARAM_BUCKET_KEY, location.0),
(PARAM_KEYS_KEY, location.1.join(",")),
];
(url, params)
Ok((url.to_string(), params))
}

pub async fn request_data_test(
Expand All @@ -162,7 +178,7 @@ impl StorageClientImpl {

// Then we need to send the request to the storage server.
let client = Client::new();
let (url, mut params) = self.get_request_url_and_params(location);
let (url, mut params) = self.get_request_url_and_params(location)?;
params.push(("is_test", "true".to_owned()));

let url = Url::parse_with_params(&url, params)?;
Expand All @@ -187,7 +203,7 @@ impl StorageClient for StorageClientImpl {

// Then we need to send the request to the storage server.
let client = Client::new();
let (url, params) = self.get_request_url_and_params(location);
let (url, params) = self.get_request_url_and_params(location)?;
let url = Url::parse_with_params(&url, params)?;
let response = client.get(url).send().await?;
Self::get_data_from_response(response).await
Expand Down
4 changes: 3 additions & 1 deletion storage-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ tokio-stream = "0.1"
rusqlite = { version = "0.31", features = ["blob"] }
log = "0.4"
env_logger = "0.11"
serial_test = "0.4"
crc32fast = "1.4.0"

[dev-dependencies]
serial_test = "3.1"
4 changes: 3 additions & 1 deletion storage-node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()>
}
});

let heartbeat = warp::path!("heartbeat").map(|| warp::http::StatusCode::OK);

// Catch a request that does not match any of the routes above.
let catch_all = warp::any()
.and(warp::path::full())
Expand All @@ -100,7 +102,7 @@ pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()>
warp::http::StatusCode::NOT_FOUND
});

let routes = route.or(catch_all);
let routes = route.or(heartbeat).or(catch_all);
let ip_addr: IpAddr = ip_addr.parse().unwrap();
warp::serve(routes).run((ip_addr, port)).await;

Expand Down
4 changes: 3 additions & 1 deletion tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ storage-node = { path = "../storage-node" }
parpulse-client = { path = "../storage-client" }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
arrow = "50.0.0"
serial_test = "0.4"
log = "0.4"
istziio-client = "0.1"

[dev-dependencies]
serial_test = "3.1"
env_logger = "0.11"

0 comments on commit e198865

Please sign in to comment.