Skip to content

Commit

Permalink
Update HttpClient, so that we don't rely on reqwest
Browse files Browse the repository at this point in the history
Signed-off-by: Vrtgs <[email protected]>
  • Loading branch information
Vrtgs committed Oct 22, 2024
1 parent fa6e089 commit ac8fea2
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 41 deletions.
3 changes: 1 addition & 2 deletions thirtyfour/src/session/handle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use reqwest::Client;
use serde_json::Value;
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
Expand Down Expand Up @@ -1199,7 +1198,7 @@ impl Drop for SessionHandle {
support::spawn_blocked_future(|spawned| async move {
if spawned {
// Old I/O drivers may be destroyed at this point
this.client = Arc::new(Client::new());
this.client = this.client.new().await;
}
let _ = this.quit().await;
});
Expand Down
38 changes: 38 additions & 0 deletions thirtyfour/src/session/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ impl<'a, T: Into<Option<&'a Value>>> From<T> for Body<'a> {
pub trait HttpClient: Send + Sync + 'static {
/// Send an HTTP request and return the response.
async fn send(&self, request: Request<Body<'_>>) -> WebDriverResult<Response<Bytes>>;

/// Make a new HttpClient, that **has no connection to the previous I/O drivers of self's runtime**
/// this is used when dropping the webdriver but the old runtime has already shut down
/// or couldn't prove its availability
/// this isn't a simple clone,
/// this new client needs to be able to run in a new runtime even if the old runtime has been destroyed
// needed for object safety
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::wrong_self_convention)]
async fn new(&self) -> Arc<dyn HttpClient>;
}

#[cfg(feature = "reqwest")]
Expand Down Expand Up @@ -79,6 +90,33 @@ impl HttpClient for reqwest::Client {
.map_err(|_| WebDriverError::UnknownResponse(status.as_u16(), body_str))?;
Ok(resp)
}

async fn new(&self) -> Arc<dyn HttpClient> {
Arc::new(self.clone())
}
}

#[cfg(all(feature = "reqwest", test))]
mod tests {
#[test]
fn test_reqwest_clone_ok() {
let rt = tokio::runtime::Runtime::new().unwrap();
let client = rt.block_on(async move {
let client = reqwest::Client::new();
let resp = client.get("https://google.com/").send().await.unwrap();
assert_eq!(resp.status(), 200);
let _ = resp.text().await.unwrap();
client
});

drop(rt);

tokio::runtime::Runtime::new().unwrap().block_on(async move {
let resp = client.get("https://google.com/").send().await.unwrap();
assert_eq!(resp.status(), 200);
let _ = resp.text().await.unwrap();
});
}
}

#[cfg(feature = "reqwest")]
Expand Down
77 changes: 38 additions & 39 deletions thirtyfour/src/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,8 @@ use std::{io, thread};
// used in drop code so its really bad to have a stack overflow then
const BOX_FUTURE_THRESHOLD: usize = 512;

/// Helper to run the specified future and block the current thread waiting for the result.
/// works even while in a tokio runtime
pub fn block_on<F>(future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
{
if cfg!(debug_assertions) && size_of::<F>() > BOX_FUTURE_THRESHOLD {
block_on_inner(Box::pin(future))
} else {
block_on_inner(future)
}
}

fn block_on_inner<F>(future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
{
// a global runtime that is being driven al the time
static GLOBAL_RT: LazyLock<tokio::runtime::Handle> = LazyLock::new(|| {
fn no_unwind<T>(f: impl FnOnce() -> T) -> T {
let res = std::panic::catch_unwind(AssertUnwindSafe(f));

Expand All @@ -47,24 +30,44 @@ where
})
}

static GLOBAL_RT: LazyLock<tokio::runtime::Handle> = LazyLock::new(|| {
no_unwind(|| {
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let handle = rt.handle().clone();
no_unwind(|| {
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let handle = rt.handle().clone();

// drive the runtime
// we do this so that all calls to GLOBAL_RT.block_on() work
thread::spawn(move || -> ! {
async fn forever() -> ! {
match std::future::pending::<Infallible>().await {}
}
// drive the runtime
// we do this so that all calls to GLOBAL_RT.block_on() work
thread::spawn(move || -> ! {
async fn forever() -> ! {
match std::future::pending::<Infallible>().await {}
}

no_unwind(move || rt.block_on(forever()))
});
handle
})
});
no_unwind(move || rt.block_on(forever()))
});
handle
})
});

/// Helper to run the specified future and block the current thread waiting for the result.
/// works even while in a tokio runtime
pub fn block_on<F>(future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
{
// https://github.com/tokio-rs/tokio/pull/6826
// cfg!(debug_assertions) omitted
if size_of::<F>() > BOX_FUTURE_THRESHOLD {
block_on_inner(Box::pin(future))
} else {
block_on_inner(future)
}
}

fn block_on_inner<F>(future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
{
macro_rules! block_global {
($future:expr) => {
thread::scope(|scope| match scope.spawn(|| GLOBAL_RT.block_on($future)).join() {
Expand Down Expand Up @@ -114,11 +117,7 @@ where
($future: expr) => {{
let future = $future;
let func = move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create tokio runtime")
.block_on(future);
GLOBAL_RT.block_on(future);
};
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
Expand Down

0 comments on commit ac8fea2

Please sign in to comment.