Skip to content

Commit

Permalink
BACKPORT-CONFLICT
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 authored and github-actions[bot] committed Dec 13, 2024
1 parent f2081f6 commit feb2e88
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 0 deletions.
13 changes: 13 additions & 0 deletions prdoc/pr_6652.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: "rpc server: re-use server builder per rpc interface"

doc:
- audience: Node Dev
description: |
This changes that the RPC server builder is re-used for each RPC interface which is more efficient than to build it for every connection.

crates:
- name: sc-rpc-server
bump: patch
100 changes: 100 additions & 0 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ where
rate_limit_whitelisted_ips: Arc::new(rate_limit_whitelisted_ips),
};

<<<<<<< HEAD
tokio_handle.spawn(async move {
loop {
let (sock, remote_addr) = tokio::select! {
Expand All @@ -173,12 +174,90 @@ where
Err(e) => {
log::debug!(target: "rpc", "Failed to accept ipv4 connection: {:?}", e);
continue;
=======
let mut local_addrs = Vec::new();

for endpoint in endpoints {
let allowed_to_fail = endpoint.is_optional;
let local_addr = endpoint.listen_addr;

let mut listener = match endpoint.bind().await {
Ok(l) => l,
Err(e) if allowed_to_fail => {
log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e);
continue;
},
Err(e) => return Err(e),
};
let local_addr = listener.local_addr();
local_addrs.push(local_addr);
let cfg = cfg.clone();

let RpcSettings {
batch_config,
max_connections,
max_payload_in_mb,
max_payload_out_mb,
max_buffer_capacity_per_connection,
max_subscriptions_per_connection,
rpc_methods,
rate_limit_trust_proxy_headers,
rate_limit_whitelisted_ips,
host_filter,
cors,
rate_limit,
} = listener.rpc_settings();

let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
// Proxy `GET /health, /health/readiness` requests to the internal
// `system_health` method.
.layer(NodeHealthProxyLayer::default())
.layer(cors);

let mut builder = jsonrpsee::server::Server::builder()
.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subscriptions_per_connection)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(max_buffer_capacity_per_connection)
.set_batch_request_config(batch_config)
.custom_tokio_runtime(cfg.tokio_handle.clone());

if let Some(provider) = id_provider.clone() {
builder = builder.set_id_provider(provider);
} else {
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};

let service_builder = builder.to_service_builder();
let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);

tokio_handle.spawn(async move {
loop {
let (sock, remote_addr) = tokio::select! {
res = listener.accept() => {
match res {
Ok(s) => s,
Err(e) => {
log::debug!(target: "rpc", "Failed to accept connection: {:?}", e);
continue;
}
>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652))
}
}
}
_ = cfg.stop_handle.clone().shutdown() => break,
};

<<<<<<< HEAD
let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let svc = tower::service_fn(move |req: http::Request<hyper::body::Incoming>| {
Expand Down Expand Up @@ -223,6 +302,12 @@ where
.with_rate_limit_per_minute(rate_limit),
),
};
=======
let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let service_builder2 = service_builder.clone();
let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();
>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652))

let rpc_middleware = RpcServiceBuilder::new()
.rpc_logger(1024)
Expand All @@ -243,12 +328,27 @@ where
});
}

<<<<<<< HEAD
// https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
// to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to
// a concrete type as workaround.
svc.call(req).await.map_err(|e| BoxError::from(e))
}
});
=======
let rate_limit_cfg = if rate_limit_whitelisted_ips2
.iter()
.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
{
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
None
} else {
if !rate_limit_whitelisted_ips2.is_empty() {
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
}
rate_limit
};
>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652))

cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
sock,
Expand Down
155 changes: 155 additions & 0 deletions substrate/client/rpc-servers/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,161 @@ pub(crate) fn host_filtering(enabled: bool, addr: Option<SocketAddr>) -> Option<
// If the local_addr failed, fallback to wildcard.
let port = addr.map_or("*".to_string(), |p| p.port().to_string());

<<<<<<< HEAD
=======
impl std::error::Error for ListenAddrError {}

impl std::fmt::Display for ListenAddrError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "No listen address was successfully bound")
}
}

/// Available RPC methods.
#[derive(Debug, Copy, Clone)]
pub enum RpcMethods {
/// Allow only a safe subset of RPC methods.
Safe,
/// Expose every RPC method (even potentially unsafe ones).
Unsafe,
/// Automatically determine the RPC methods based on the connection.
Auto,
}

impl Default for RpcMethods {
fn default() -> Self {
RpcMethods::Auto
}
}

impl FromStr for RpcMethods {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"safe" => Ok(RpcMethods::Safe),
"unsafe" => Ok(RpcMethods::Unsafe),
"auto" => Ok(RpcMethods::Auto),
invalid => Err(format!("Invalid rpc methods {invalid}")),
}
}
}

#[derive(Debug, Clone)]
pub(crate) struct RpcSettings {
pub(crate) batch_config: BatchRequestConfig,
pub(crate) max_connections: u32,
pub(crate) max_payload_in_mb: u32,
pub(crate) max_payload_out_mb: u32,
pub(crate) max_subscriptions_per_connection: u32,
pub(crate) max_buffer_capacity_per_connection: u32,
pub(crate) rpc_methods: RpcMethods,
pub(crate) rate_limit: Option<NonZeroU32>,
pub(crate) rate_limit_trust_proxy_headers: bool,
pub(crate) rate_limit_whitelisted_ips: Vec<IpNetwork>,
pub(crate) cors: CorsLayer,
pub(crate) host_filter: Option<HostFilterLayer>,
}

/// Represent a single RPC endpoint with its configuration.
#[derive(Debug, Clone)]
pub struct RpcEndpoint {
/// Listen address.
pub listen_addr: SocketAddr,
/// Batch request configuration.
pub batch_config: BatchRequestConfig,
/// Maximum number of connections.
pub max_connections: u32,
/// Maximum inbound payload size in MB.
pub max_payload_in_mb: u32,
/// Maximum outbound payload size in MB.
pub max_payload_out_mb: u32,
/// Maximum number of subscriptions per connection.
pub max_subscriptions_per_connection: u32,
/// Maximum buffer capacity per connection.
pub max_buffer_capacity_per_connection: u32,
/// Rate limit per minute.
pub rate_limit: Option<NonZeroU32>,
/// Whether to trust proxy headers for rate limiting.
pub rate_limit_trust_proxy_headers: bool,
/// Whitelisted IPs for rate limiting.
pub rate_limit_whitelisted_ips: Vec<IpNetwork>,
/// CORS.
pub cors: Option<Vec<String>>,
/// RPC methods to expose.
pub rpc_methods: RpcMethods,
/// Whether it's an optional listening address i.e, it's ignored if it fails to bind.
/// For example substrate tries to bind both ipv4 and ipv6 addresses but some platforms
/// may not support ipv6.
pub is_optional: bool,
/// Whether to retry with a random port if the provided port is already in use.
pub retry_random_port: bool,
}

impl RpcEndpoint {
/// Binds to the listen address.
pub(crate) async fn bind(self) -> Result<Listener, Box<dyn StdError + Send + Sync>> {
let listener = match tokio::net::TcpListener::bind(self.listen_addr).await {
Ok(listener) => listener,
Err(_) if self.retry_random_port => {
let mut addr = self.listen_addr;
addr.set_port(0);

tokio::net::TcpListener::bind(addr).await?
},
Err(e) => return Err(e.into()),
};
let local_addr = listener.local_addr()?;
let host_filter = host_filtering(self.cors.is_some(), local_addr);
let cors = try_into_cors(self.cors)?;

Ok(Listener {
listener,
local_addr,
cfg: RpcSettings {
batch_config: self.batch_config,
max_connections: self.max_connections,
max_payload_in_mb: self.max_payload_in_mb,
max_payload_out_mb: self.max_payload_out_mb,
max_subscriptions_per_connection: self.max_subscriptions_per_connection,
max_buffer_capacity_per_connection: self.max_buffer_capacity_per_connection,
rpc_methods: self.rpc_methods,
rate_limit: self.rate_limit,
rate_limit_trust_proxy_headers: self.rate_limit_trust_proxy_headers,
rate_limit_whitelisted_ips: self.rate_limit_whitelisted_ips,
host_filter,
cors,
},
})
}
}

/// TCP socket server with RPC settings.
pub(crate) struct Listener {
listener: tokio::net::TcpListener,
local_addr: SocketAddr,
cfg: RpcSettings,
}

impl Listener {
/// Accepts a new connection.
pub(crate) async fn accept(&mut self) -> std::io::Result<(tokio::net::TcpStream, SocketAddr)> {
let (sock, remote_addr) = self.listener.accept().await?;
Ok((sock, remote_addr))
}

/// Returns the local address the listener is bound to.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

pub fn rpc_settings(&self) -> RpcSettings {
self.cfg.clone()
}
}

pub(crate) fn host_filtering(enabled: bool, addr: SocketAddr) -> Option<HostFilterLayer> {
>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652))
if enabled {
// NOTE: The listening addresses are whitelisted by default.
let hosts =
Expand Down

0 comments on commit feb2e88

Please sign in to comment.