diff --git a/prdoc/pr_6652.prdoc b/prdoc/pr_6652.prdoc new file mode 100644 index 000000000000..a303311e138f --- /dev/null +++ b/prdoc/pr_6652.prdoc @@ -0,0 +1,13 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: "rpc server: re-use server builder per rpc interface" + +doc: + - audience: Node Dev + description: | + This changes that the RPC server builder is re-used for each RPC interface which is more efficient than to build it for every connection. + +crates: + - name: sc-rpc-server + bump: patch diff --git a/substrate/client/rpc-servers/src/lib.rs b/substrate/client/rpc-servers/src/lib.rs index 0bae16b113df..b3f73e6f1771 100644 --- a/substrate/client/rpc-servers/src/lib.rs +++ b/substrate/client/rpc-servers/src/lib.rs @@ -164,6 +164,7 @@ where rate_limit_whitelisted_ips: Arc::new(rate_limit_whitelisted_ips), }; +<<<<<<< HEAD tokio_handle.spawn(async move { loop { let (sock, remote_addr) = tokio::select! { @@ -173,12 +174,90 @@ where Err(e) => { log::debug!(target: "rpc", "Failed to accept ipv4 connection: {:?}", e); continue; +======= + let mut local_addrs = Vec::new(); + + for endpoint in endpoints { + let allowed_to_fail = endpoint.is_optional; + let local_addr = endpoint.listen_addr; + + let mut listener = match endpoint.bind().await { + Ok(l) => l, + Err(e) if allowed_to_fail => { + log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e); + continue; + }, + Err(e) => return Err(e), + }; + let local_addr = listener.local_addr(); + local_addrs.push(local_addr); + let cfg = cfg.clone(); + + let RpcSettings { + batch_config, + max_connections, + max_payload_in_mb, + max_payload_out_mb, + max_buffer_capacity_per_connection, + max_subscriptions_per_connection, + rpc_methods, + rate_limit_trust_proxy_headers, + rate_limit_whitelisted_ips, + host_filter, + cors, + rate_limit, + } = listener.rpc_settings(); + + let http_middleware = tower::ServiceBuilder::new() + .option_layer(host_filter) + // Proxy `GET /health, /health/readiness` requests to the internal + // `system_health` method. + .layer(NodeHealthProxyLayer::default()) + .layer(cors); + + let mut builder = jsonrpsee::server::Server::builder() + .max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE)) + .max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE)) + .max_connections(max_connections) + .max_subscriptions_per_connection(max_subscriptions_per_connection) + .enable_ws_ping( + PingConfig::new() + .ping_interval(Duration::from_secs(30)) + .inactive_limit(Duration::from_secs(60)) + .max_failures(3), + ) + .set_http_middleware(http_middleware) + .set_message_buffer_capacity(max_buffer_capacity_per_connection) + .set_batch_request_config(batch_config) + .custom_tokio_runtime(cfg.tokio_handle.clone()); + + if let Some(provider) = id_provider.clone() { + builder = builder.set_id_provider(provider); + } else { + builder = builder.set_id_provider(RandomStringIdProvider::new(16)); + }; + + let service_builder = builder.to_service_builder(); + let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods); + + tokio_handle.spawn(async move { + loop { + let (sock, remote_addr) = tokio::select! { + res = listener.accept() => { + match res { + Ok(s) => s, + Err(e) => { + log::debug!(target: "rpc", "Failed to accept connection: {:?}", e); + continue; + } +>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652)) } } } _ = cfg.stop_handle.clone().shutdown() => break, }; +<<<<<<< HEAD let ip = remote_addr.ip(); let cfg2 = cfg.clone(); let svc = tower::service_fn(move |req: http::Request| { @@ -223,6 +302,12 @@ where .with_rate_limit_per_minute(rate_limit), ), }; +======= + let ip = remote_addr.ip(); + let cfg2 = cfg.clone(); + let service_builder2 = service_builder.clone(); + let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone(); +>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652)) let rpc_middleware = RpcServiceBuilder::new() .rpc_logger(1024) @@ -243,12 +328,27 @@ where }); } +<<<<<<< HEAD // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred // to be `Box` so we need to convert it to // a concrete type as workaround. svc.call(req).await.map_err(|e| BoxError::from(e)) } }); +======= + let rate_limit_cfg = if rate_limit_whitelisted_ips2 + .iter() + .any(|ips| ips.contains(proxy_ip.unwrap_or(ip))) + { + log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip); + None + } else { + if !rate_limit_whitelisted_ips2.is_empty() { + log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip); + } + rate_limit + }; +>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652)) cfg.tokio_handle.spawn(serve_with_graceful_shutdown( sock, diff --git a/substrate/client/rpc-servers/src/utils.rs b/substrate/client/rpc-servers/src/utils.rs index d9d943c7c1fb..e8a74979706d 100644 --- a/substrate/client/rpc-servers/src/utils.rs +++ b/substrate/client/rpc-servers/src/utils.rs @@ -37,6 +37,161 @@ pub(crate) fn host_filtering(enabled: bool, addr: Option) -> Option< // If the local_addr failed, fallback to wildcard. let port = addr.map_or("*".to_string(), |p| p.port().to_string()); +<<<<<<< HEAD +======= +impl std::error::Error for ListenAddrError {} + +impl std::fmt::Display for ListenAddrError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "No listen address was successfully bound") + } +} + +/// Available RPC methods. +#[derive(Debug, Copy, Clone)] +pub enum RpcMethods { + /// Allow only a safe subset of RPC methods. + Safe, + /// Expose every RPC method (even potentially unsafe ones). + Unsafe, + /// Automatically determine the RPC methods based on the connection. + Auto, +} + +impl Default for RpcMethods { + fn default() -> Self { + RpcMethods::Auto + } +} + +impl FromStr for RpcMethods { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "safe" => Ok(RpcMethods::Safe), + "unsafe" => Ok(RpcMethods::Unsafe), + "auto" => Ok(RpcMethods::Auto), + invalid => Err(format!("Invalid rpc methods {invalid}")), + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct RpcSettings { + pub(crate) batch_config: BatchRequestConfig, + pub(crate) max_connections: u32, + pub(crate) max_payload_in_mb: u32, + pub(crate) max_payload_out_mb: u32, + pub(crate) max_subscriptions_per_connection: u32, + pub(crate) max_buffer_capacity_per_connection: u32, + pub(crate) rpc_methods: RpcMethods, + pub(crate) rate_limit: Option, + pub(crate) rate_limit_trust_proxy_headers: bool, + pub(crate) rate_limit_whitelisted_ips: Vec, + pub(crate) cors: CorsLayer, + pub(crate) host_filter: Option, +} + +/// Represent a single RPC endpoint with its configuration. +#[derive(Debug, Clone)] +pub struct RpcEndpoint { + /// Listen address. + pub listen_addr: SocketAddr, + /// Batch request configuration. + pub batch_config: BatchRequestConfig, + /// Maximum number of connections. + pub max_connections: u32, + /// Maximum inbound payload size in MB. + pub max_payload_in_mb: u32, + /// Maximum outbound payload size in MB. + pub max_payload_out_mb: u32, + /// Maximum number of subscriptions per connection. + pub max_subscriptions_per_connection: u32, + /// Maximum buffer capacity per connection. + pub max_buffer_capacity_per_connection: u32, + /// Rate limit per minute. + pub rate_limit: Option, + /// Whether to trust proxy headers for rate limiting. + pub rate_limit_trust_proxy_headers: bool, + /// Whitelisted IPs for rate limiting. + pub rate_limit_whitelisted_ips: Vec, + /// CORS. + pub cors: Option>, + /// RPC methods to expose. + pub rpc_methods: RpcMethods, + /// Whether it's an optional listening address i.e, it's ignored if it fails to bind. + /// For example substrate tries to bind both ipv4 and ipv6 addresses but some platforms + /// may not support ipv6. + pub is_optional: bool, + /// Whether to retry with a random port if the provided port is already in use. + pub retry_random_port: bool, +} + +impl RpcEndpoint { + /// Binds to the listen address. + pub(crate) async fn bind(self) -> Result> { + let listener = match tokio::net::TcpListener::bind(self.listen_addr).await { + Ok(listener) => listener, + Err(_) if self.retry_random_port => { + let mut addr = self.listen_addr; + addr.set_port(0); + + tokio::net::TcpListener::bind(addr).await? + }, + Err(e) => return Err(e.into()), + }; + let local_addr = listener.local_addr()?; + let host_filter = host_filtering(self.cors.is_some(), local_addr); + let cors = try_into_cors(self.cors)?; + + Ok(Listener { + listener, + local_addr, + cfg: RpcSettings { + batch_config: self.batch_config, + max_connections: self.max_connections, + max_payload_in_mb: self.max_payload_in_mb, + max_payload_out_mb: self.max_payload_out_mb, + max_subscriptions_per_connection: self.max_subscriptions_per_connection, + max_buffer_capacity_per_connection: self.max_buffer_capacity_per_connection, + rpc_methods: self.rpc_methods, + rate_limit: self.rate_limit, + rate_limit_trust_proxy_headers: self.rate_limit_trust_proxy_headers, + rate_limit_whitelisted_ips: self.rate_limit_whitelisted_ips, + host_filter, + cors, + }, + }) + } +} + +/// TCP socket server with RPC settings. +pub(crate) struct Listener { + listener: tokio::net::TcpListener, + local_addr: SocketAddr, + cfg: RpcSettings, +} + +impl Listener { + /// Accepts a new connection. + pub(crate) async fn accept(&mut self) -> std::io::Result<(tokio::net::TcpStream, SocketAddr)> { + let (sock, remote_addr) = self.listener.accept().await?; + Ok((sock, remote_addr)) + } + + /// Returns the local address the listener is bound to. + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } + + pub fn rpc_settings(&self) -> RpcSettings { + self.cfg.clone() + } +} + +pub(crate) fn host_filtering(enabled: bool, addr: SocketAddr) -> Option { +>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652)) if enabled { // NOTE: The listening addresses are whitelisted by default. let hosts =