From 25e3c0606a96bb5e66950e97e4eeada95c24e3b2 Mon Sep 17 00:00:00 2001 From: jolestar Date: Wed, 27 Sep 2023 17:54:46 +0800 Subject: [PATCH] [Fix] Fix block_on bug and add RpcLogger (#875) --- crates/rooch-rpc-server/src/lib.rs | 8 +- .../src/service/aggregate_service.rs | 21 ++--- crates/rooch-rpc-server/src/service/mod.rs | 1 + .../src/service/rpc_logger.rs | 80 +++++++++++++++++++ 4 files changed, 92 insertions(+), 18 deletions(-) create mode 100644 crates/rooch-rpc-server/src/service/rpc_logger.rs diff --git a/crates/rooch-rpc-server/src/lib.rs b/crates/rooch-rpc-server/src/lib.rs index 98714d236e..dafc44ba2e 100644 --- a/crates/rooch-rpc-server/src/lib.rs +++ b/crates/rooch-rpc-server/src/lib.rs @@ -5,6 +5,7 @@ use crate::server::eth_server::EthServer; use crate::server::rooch_server::RoochServer; use crate::server::wallet_server::WalletServer; use crate::service::aggregate_service::AggregateService; +use crate::service::rpc_logger::RpcLogger; use crate::service::rpc_service::RpcService; use anyhow::Result; use coerce::actor::scheduler::timer::Timer; @@ -43,6 +44,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tower_http::cors::{AllowOrigin, CorsLayer}; +use tower_http::trace::TraceLayer; use tracing::info; pub mod server; @@ -253,11 +255,13 @@ pub async fn run_start_server(opt: &RoochOpt) -> Result { .allow_origin(acl) .allow_headers([hyper::header::CONTENT_TYPE]); - // TODO: tracing - let middleware = tower::ServiceBuilder::new().layer(cors); + let middleware = tower::ServiceBuilder::new() + .layer(TraceLayer::new_for_http()) + .layer(cors); // Build server let server = ServerBuilder::default() + .set_logger(RpcLogger) .set_middleware(middleware) .build(&addr) .await?; diff --git a/crates/rooch-rpc-server/src/service/aggregate_service.rs b/crates/rooch-rpc-server/src/service/aggregate_service.rs index cb04671049..96a4f614a1 100644 --- a/crates/rooch-rpc-server/src/service/aggregate_service.rs +++ b/crates/rooch-rpc-server/src/service/aggregate_service.rs @@ -3,7 +3,6 @@ use crate::service::rpc_service::RpcService; use anyhow::Result; -use lazy_static::lazy_static; use move_core_types::account_address::AccountAddress; use move_core_types::language_storage::StructTag; use moveos_types::access_path::AccessPath; @@ -19,7 +18,7 @@ use rooch_types::addresses::ROOCH_FRAMEWORK_ADDRESS_LITERAL; use rooch_types::framework::coin::{AnnotatedCoinInfo, AnnotatedCoinStore, CoinModule}; use std::collections::HashMap; use std::str::FromStr; -use tokio::runtime::Runtime; +use tokio::runtime::Handle; /// AggregateService is aggregate RPC service and MoveFunctionCaller. #[derive(Clone)] @@ -144,27 +143,17 @@ impl AggregateService { } } -lazy_static! { - static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() - .thread_name("rooch-aggregate-service") - .enable_all() - .build() - .unwrap(); -} - impl MoveFunctionCaller for AggregateService { - // Use futures::executors::block_on to go from sync -> async - // Warning! Possible deadlocks can occur if we try to wait for a future without spawn fn call_function( &self, _ctx: &TxContext, function_call: FunctionCall, ) -> Result { let rpc_service = self.rpc_service.clone(); - let function_result = futures::executor::block_on( - RUNTIME.spawn(async move { rpc_service.execute_view_function(function_call).await }), - )??; - + let function_result = tokio::task::block_in_place(|| { + Handle::current() + .block_on(async move { rpc_service.execute_view_function(function_call).await }) + })?; function_result.try_into() } } diff --git a/crates/rooch-rpc-server/src/service/mod.rs b/crates/rooch-rpc-server/src/service/mod.rs index 3c8acd0439..02eaf7e980 100644 --- a/crates/rooch-rpc-server/src/service/mod.rs +++ b/crates/rooch-rpc-server/src/service/mod.rs @@ -2,4 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 pub mod aggregate_service; +pub mod rpc_logger; pub mod rpc_service; diff --git a/crates/rooch-rpc-server/src/service/rpc_logger.rs b/crates/rooch-rpc-server/src/service/rpc_logger.rs new file mode 100644 index 0000000000..e043bae0e1 --- /dev/null +++ b/crates/rooch-rpc-server/src/service/rpc_logger.rs @@ -0,0 +1,80 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use jsonrpsee::server::logger::Logger; +use tracing::Level; + +#[derive(Debug, Clone)] +pub struct RpcLogger; + +impl Logger for RpcLogger { + type Instant = std::time::Instant; + + fn on_connect( + &self, + _remote_addr: std::net::SocketAddr, + _request: &jsonrpsee::server::logger::HttpRequest, + _t: jsonrpsee::server::logger::TransportProtocol, + ) { + } + + fn on_request( + &self, + _transport: jsonrpsee::server::logger::TransportProtocol, + ) -> Self::Instant { + std::time::Instant::now() + } + + fn on_call( + &self, + method_name: &str, + params: jsonrpsee::types::Params, + _kind: jsonrpsee::server::logger::MethodKind, + transport: jsonrpsee::server::logger::TransportProtocol, + ) { + //TODO remove param parse when server stable. + let params_str = match params.parse::() { + Ok(json) => json.to_string(), + Err(e) => e.to_string(), + }; + tracing::event!( + Level::INFO, + event = "on_call", + transport = transport.to_string(), + method_name = method_name, + params = params_str, + ); + } + + fn on_result( + &self, + method_name: &str, + success: bool, + started_at: Self::Instant, + _transport: jsonrpsee::server::logger::TransportProtocol, + ) { + let elapsed_millis = started_at.elapsed().as_millis(); + tracing::event!( + Level::INFO, + event = "on_result", + method_name = method_name, + success = success, + elapsed_millis = elapsed_millis + ); + } + + fn on_response( + &self, + _result: &str, + _started_at: Self::Instant, + _transport: jsonrpsee::server::logger::TransportProtocol, + ) { + } + + fn on_disconnect( + &self, + _remote_addr: std::net::SocketAddr, + _transport: jsonrpsee::server::logger::TransportProtocol, + ) { + } +}