Skip to content

Commit

Permalink
Remove async_recursion for a for loop
Browse files Browse the repository at this point in the history
  • Loading branch information
kayabaNerve committed Nov 8, 2023
1 parent e1c07d8 commit bc07e14
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 126 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions coins/monero/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ serde_json = { version = "1", default-features = false, features = ["alloc"] }
base58-monero = { version = "2", default-features = false, features = ["check"] }

# Used for the provided HTTP RPC
async-recursion = { version = "1", optional = true }
digest_auth = { version = "0.3", default-features = false, optional = true }
simple-request = { path = "../../common/request", version = "0.1", default-features = false, optional = true }
tokio = { version = "1", default-features = false, optional = true }
Expand Down Expand Up @@ -101,7 +100,7 @@ std = [
"base58-monero/std",
]

http-rpc = ["async-recursion", "digest_auth", "simple-request", "tokio"]
http-rpc = ["digest_auth", "simple-request", "tokio"]
multisig = ["transcript", "frost", "dleq", "std"]
binaries = ["tokio/rt-multi-thread", "tokio/macros", "http-rpc"]
experimental = []
Expand Down
222 changes: 110 additions & 112 deletions coins/monero/src/rpc/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,149 +112,147 @@ impl HttpRpc {
}

impl HttpRpc {
#[async_recursion::async_recursion]
async fn inner_post(
&self,
route: &str,
body: Vec<u8>,
recursing: bool,
) -> Result<Vec<u8>, RpcError> {
async fn inner_post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
let request_fn = |uri| {
Request::post(uri)
.body(body.clone().into())
.map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))
};

let response = match &self.authentication {
Authentication::Unauthenticated(client) => client
.request(request_fn(self.url.clone() + "/" + route)?)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
Authentication::Authenticated { username, password, connection } => {
let mut connection_lock = connection.lock().await;
for attempt in 0 .. 2 {
let response = match &self.authentication {
Authentication::Unauthenticated(client) => client
.request(request_fn(self.url.clone() + "/" + route)?)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
Authentication::Authenticated { username, password, connection } => {
let mut connection_lock = connection.lock().await;

let mut request = request_fn("/".to_string() + route)?;
let mut request = request_fn("/".to_string() + route)?;

// If we don't have an auth challenge, obtain one
if connection_lock.0.is_none() {
connection_lock.0 = Self::digest_auth_challenge(
&connection_lock
.1
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
)?;
request = request_fn("/".to_string() + route)?;
}
// If we don't have an auth challenge, obtain one
if connection_lock.0.is_none() {
connection_lock.0 = Self::digest_auth_challenge(
&connection_lock
.1
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
)?;
request = request_fn("/".to_string() + route)?;
}

// Insert the challenge response, if we have a challenge
if let Some((challenge, cnonce)) = connection_lock.0.as_mut() {
// Update the cnonce
// Overflow isn't a concern as this is a u64
*cnonce += 1;
// Insert the challenge response, if we have a challenge
if let Some((challenge, cnonce)) = connection_lock.0.as_mut() {
// Update the cnonce
// Overflow isn't a concern as this is a u64
*cnonce += 1;

let mut context = AuthContext::new_post::<_, _, _, &[u8]>(
username,
password,
"/".to_string() + route,
None,
);
context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes()));
let mut context = AuthContext::new_post::<_, _, _, &[u8]>(
username,
password,
"/".to_string() + route,
None,
);
context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes()));

request.headers_mut().insert(
"Authorization",
HeaderValue::from_str(
&challenge
.respond(&context)
.map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))?
.to_header_string(),
)
.unwrap(),
);
}
request.headers_mut().insert(
"Authorization",
HeaderValue::from_str(
&challenge
.respond(&context)
.map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))?
.to_header_string(),
)
.unwrap(),
);
}

let response_result = connection_lock
.1
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")));
let response_result = connection_lock
.1
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")));

// If the connection entered an error state, drop the cached challenge as challenges are
// per-connection
// We don't need to create a new connection as simple-request will for us
if response_result.is_err() {
connection_lock.0 = None;
}
// If the connection entered an error state, drop the cached challenge as challenges are
// per-connection
// We don't need to create a new connection as simple-request will for us
if response_result.is_err() {
connection_lock.0 = None;
}

// If we're not already recursing and:
// 1) We had a connection error
// 2) We need to re-auth due to this token being stale
// recursively re-call this function
if (!recursing) &&
(response_result.is_err() || {
let response = response_result.as_ref().unwrap();
if response.status() == StatusCode::UNAUTHORIZED {
if let Some(header) = response.headers().get("www-authenticate") {
header
.to_str()
.map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?
.contains("stale")
// If we're not already on our second attempt and:
// A) We had a connection error
// B) We need to re-auth due to this token being stale
// Move to the next loop iteration (retrying all of this)
if (attempt == 0) &&
(response_result.is_err() || {
let response = response_result.as_ref().unwrap();
if response.status() == StatusCode::UNAUTHORIZED {
if let Some(header) = response.headers().get("www-authenticate") {
header
.to_str()
.map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?
.contains("stale")
} else {
false
}
} else {
false
}
} else {
false
}
})
{
connection_lock.0 = None;
drop(connection_lock);
return self.inner_post(route, body, true).await;
})
{
// Drop the cached authentication before we do
connection_lock.0 = None;
continue;
}

response_result?
}
};

response_result?
/*
let length = usize::try_from(
response
.headers()
.get("content-length")
.ok_or(RpcError::InvalidNode("no content-length header"))?
.to_str()
.map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))?
.parse::<u32>()
.map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?,
)
.unwrap();
// Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually
// has to send 1 GB of data to cause a 1 GB allocation
let mut res = Vec::with_capacity(length.max(1024 * 1024));
let mut body = response.into_body();
while res.len() < length {
let Some(data) = body.data().await else { break };
res.extend(data.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?.as_ref());
}
};
*/

/*
let length = usize::try_from(
let mut res = Vec::with_capacity(128);
response
.headers()
.get("content-length")
.ok_or(RpcError::InvalidNode("no content-length header"))?
.to_str()
.map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))?
.parse::<u32>()
.map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?,
)
.unwrap();
// Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually
// has to send 1 GB of data to cause a 1 GB allocation
let mut res = Vec::with_capacity(length.max(1024 * 1024));
let mut body = response.into_body();
while res.len() < length {
let Some(data) = body.data().await else { break };
res.extend(data.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?.as_ref());
}
*/
.body()
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
.read_to_end(&mut res)
.unwrap();

let mut res = Vec::with_capacity(128);
response
.body()
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
.read_to_end(&mut res)
.unwrap();
return Ok(res);
}

Ok(res)
unreachable!()
}
}

#[async_trait]
impl RpcConnection for HttpRpc {
async fn post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
// TODO: Make this timeout configurable
tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body, false))
tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body))
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
}
Expand Down

0 comments on commit bc07e14

Please sign in to comment.