Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stable2407] Backport #6652 #6874

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions prdoc/pr_6652.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: "rpc server: re-use server builder per rpc interface"

doc:
- audience: Node Dev
description: |
This changes that the RPC server builder is re-used for each RPC interface which is more efficient than to build it for every connection.

crates:
- name: sc-rpc-server
bump: patch
100 changes: 100 additions & 0 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ where
rate_limit_whitelisted_ips: Arc::new(rate_limit_whitelisted_ips),
};

<<<<<<< HEAD
tokio_handle.spawn(async move {
loop {
let (sock, remote_addr) = tokio::select! {
Expand All @@ -173,12 +174,90 @@ where
Err(e) => {
log::debug!(target: "rpc", "Failed to accept ipv4 connection: {:?}", e);
continue;
=======
let mut local_addrs = Vec::new();

for endpoint in endpoints {
let allowed_to_fail = endpoint.is_optional;
let local_addr = endpoint.listen_addr;

let mut listener = match endpoint.bind().await {
Ok(l) => l,
Err(e) if allowed_to_fail => {
log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e);
continue;
},
Err(e) => return Err(e),
};
let local_addr = listener.local_addr();
local_addrs.push(local_addr);
let cfg = cfg.clone();

let RpcSettings {
batch_config,
max_connections,
max_payload_in_mb,
max_payload_out_mb,
max_buffer_capacity_per_connection,
max_subscriptions_per_connection,
rpc_methods,
rate_limit_trust_proxy_headers,
rate_limit_whitelisted_ips,
host_filter,
cors,
rate_limit,
} = listener.rpc_settings();

let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
// Proxy `GET /health, /health/readiness` requests to the internal
// `system_health` method.
.layer(NodeHealthProxyLayer::default())
.layer(cors);

let mut builder = jsonrpsee::server::Server::builder()
.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subscriptions_per_connection)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(max_buffer_capacity_per_connection)
.set_batch_request_config(batch_config)
.custom_tokio_runtime(cfg.tokio_handle.clone());

if let Some(provider) = id_provider.clone() {
builder = builder.set_id_provider(provider);
} else {
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};

let service_builder = builder.to_service_builder();
let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);

tokio_handle.spawn(async move {
loop {
let (sock, remote_addr) = tokio::select! {
res = listener.accept() => {
match res {
Ok(s) => s,
Err(e) => {
log::debug!(target: "rpc", "Failed to accept connection: {:?}", e);
continue;
}
>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652))
}
}
}
_ = cfg.stop_handle.clone().shutdown() => break,
};

<<<<<<< HEAD
let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let svc = tower::service_fn(move |req: http::Request<hyper::body::Incoming>| {
Expand Down Expand Up @@ -223,6 +302,12 @@ where
.with_rate_limit_per_minute(rate_limit),
),
};
=======
let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let service_builder2 = service_builder.clone();
let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();
>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652))

let rpc_middleware = RpcServiceBuilder::new()
.rpc_logger(1024)
Expand All @@ -243,12 +328,27 @@ where
});
}

<<<<<<< HEAD
// https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
// to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to
// a concrete type as workaround.
svc.call(req).await.map_err(|e| BoxError::from(e))
}
});
=======
let rate_limit_cfg = if rate_limit_whitelisted_ips2
.iter()
.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
{
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
None
} else {
if !rate_limit_whitelisted_ips2.is_empty() {
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
}
rate_limit
};
>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652))

cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
sock,
Expand Down
155 changes: 155 additions & 0 deletions substrate/client/rpc-servers/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,161 @@ pub(crate) fn host_filtering(enabled: bool, addr: Option<SocketAddr>) -> Option<
// If the local_addr failed, fallback to wildcard.
let port = addr.map_or("*".to_string(), |p| p.port().to_string());

<<<<<<< HEAD
=======
impl std::error::Error for ListenAddrError {}

impl std::fmt::Display for ListenAddrError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "No listen address was successfully bound")
}
}

/// Available RPC methods.
#[derive(Debug, Copy, Clone)]
pub enum RpcMethods {
/// Allow only a safe subset of RPC methods.
Safe,
/// Expose every RPC method (even potentially unsafe ones).
Unsafe,
/// Automatically determine the RPC methods based on the connection.
Auto,
}

impl Default for RpcMethods {
fn default() -> Self {
RpcMethods::Auto
}
}

impl FromStr for RpcMethods {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"safe" => Ok(RpcMethods::Safe),
"unsafe" => Ok(RpcMethods::Unsafe),
"auto" => Ok(RpcMethods::Auto),
invalid => Err(format!("Invalid rpc methods {invalid}")),
}
}
}

#[derive(Debug, Clone)]
pub(crate) struct RpcSettings {
pub(crate) batch_config: BatchRequestConfig,
pub(crate) max_connections: u32,
pub(crate) max_payload_in_mb: u32,
pub(crate) max_payload_out_mb: u32,
pub(crate) max_subscriptions_per_connection: u32,
pub(crate) max_buffer_capacity_per_connection: u32,
pub(crate) rpc_methods: RpcMethods,
pub(crate) rate_limit: Option<NonZeroU32>,
pub(crate) rate_limit_trust_proxy_headers: bool,
pub(crate) rate_limit_whitelisted_ips: Vec<IpNetwork>,
pub(crate) cors: CorsLayer,
pub(crate) host_filter: Option<HostFilterLayer>,
}

/// Represent a single RPC endpoint with its configuration.
#[derive(Debug, Clone)]
pub struct RpcEndpoint {
/// Listen address.
pub listen_addr: SocketAddr,
/// Batch request configuration.
pub batch_config: BatchRequestConfig,
/// Maximum number of connections.
pub max_connections: u32,
/// Maximum inbound payload size in MB.
pub max_payload_in_mb: u32,
/// Maximum outbound payload size in MB.
pub max_payload_out_mb: u32,
/// Maximum number of subscriptions per connection.
pub max_subscriptions_per_connection: u32,
/// Maximum buffer capacity per connection.
pub max_buffer_capacity_per_connection: u32,
/// Rate limit per minute.
pub rate_limit: Option<NonZeroU32>,
/// Whether to trust proxy headers for rate limiting.
pub rate_limit_trust_proxy_headers: bool,
/// Whitelisted IPs for rate limiting.
pub rate_limit_whitelisted_ips: Vec<IpNetwork>,
/// CORS.
pub cors: Option<Vec<String>>,
/// RPC methods to expose.
pub rpc_methods: RpcMethods,
/// Whether it's an optional listening address i.e, it's ignored if it fails to bind.
/// For example substrate tries to bind both ipv4 and ipv6 addresses but some platforms
/// may not support ipv6.
pub is_optional: bool,
/// Whether to retry with a random port if the provided port is already in use.
pub retry_random_port: bool,
}

impl RpcEndpoint {
/// Binds to the listen address.
pub(crate) async fn bind(self) -> Result<Listener, Box<dyn StdError + Send + Sync>> {
let listener = match tokio::net::TcpListener::bind(self.listen_addr).await {
Ok(listener) => listener,
Err(_) if self.retry_random_port => {
let mut addr = self.listen_addr;
addr.set_port(0);

tokio::net::TcpListener::bind(addr).await?
},
Err(e) => return Err(e.into()),
};
let local_addr = listener.local_addr()?;
let host_filter = host_filtering(self.cors.is_some(), local_addr);
let cors = try_into_cors(self.cors)?;

Ok(Listener {
listener,
local_addr,
cfg: RpcSettings {
batch_config: self.batch_config,
max_connections: self.max_connections,
max_payload_in_mb: self.max_payload_in_mb,
max_payload_out_mb: self.max_payload_out_mb,
max_subscriptions_per_connection: self.max_subscriptions_per_connection,
max_buffer_capacity_per_connection: self.max_buffer_capacity_per_connection,
rpc_methods: self.rpc_methods,
rate_limit: self.rate_limit,
rate_limit_trust_proxy_headers: self.rate_limit_trust_proxy_headers,
rate_limit_whitelisted_ips: self.rate_limit_whitelisted_ips,
host_filter,
cors,
},
})
}
}

/// TCP socket server with RPC settings.
pub(crate) struct Listener {
listener: tokio::net::TcpListener,
local_addr: SocketAddr,
cfg: RpcSettings,
}

impl Listener {
/// Accepts a new connection.
pub(crate) async fn accept(&mut self) -> std::io::Result<(tokio::net::TcpStream, SocketAddr)> {
let (sock, remote_addr) = self.listener.accept().await?;
Ok((sock, remote_addr))
}

/// Returns the local address the listener is bound to.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

pub fn rpc_settings(&self) -> RpcSettings {
self.cfg.clone()
}
}

pub(crate) fn host_filtering(enabled: bool, addr: SocketAddr) -> Option<HostFilterLayer> {
>>>>>>> e1add3e (rpc: re-use server builder per rpc interface (#6652))
if enabled {
// NOTE: The listening addresses are whitelisted by default.
let hosts =
Expand Down
Loading