Skip to content

Commit

Permalink
[Fix] Fix block_on bug and add RpcLogger (#875)
Browse files Browse the repository at this point in the history
  • Loading branch information
jolestar authored Sep 27, 2023
1 parent 38105ee commit 25e3c06
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 18 deletions.
8 changes: 6 additions & 2 deletions crates/rooch-rpc-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::server::eth_server::EthServer;
use crate::server::rooch_server::RoochServer;
use crate::server::wallet_server::WalletServer;
use crate::service::aggregate_service::AggregateService;
use crate::service::rpc_logger::RpcLogger;
use crate::service::rpc_service::RpcService;
use anyhow::Result;
use coerce::actor::scheduler::timer::Timer;
Expand Down Expand Up @@ -43,6 +44,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;
use tracing::info;

pub mod server;
Expand Down Expand Up @@ -253,11 +255,13 @@ pub async fn run_start_server(opt: &RoochOpt) -> Result<ServerHandle> {
.allow_origin(acl)
.allow_headers([hyper::header::CONTENT_TYPE]);

// TODO: tracing
let middleware = tower::ServiceBuilder::new().layer(cors);
let middleware = tower::ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(cors);

// Build server
let server = ServerBuilder::default()
.set_logger(RpcLogger)
.set_middleware(middleware)
.build(&addr)
.await?;
Expand Down
21 changes: 5 additions & 16 deletions crates/rooch-rpc-server/src/service/aggregate_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use crate::service::rpc_service::RpcService;
use anyhow::Result;
use lazy_static::lazy_static;
use move_core_types::account_address::AccountAddress;
use move_core_types::language_storage::StructTag;
use moveos_types::access_path::AccessPath;
Expand All @@ -19,7 +18,7 @@ use rooch_types::addresses::ROOCH_FRAMEWORK_ADDRESS_LITERAL;
use rooch_types::framework::coin::{AnnotatedCoinInfo, AnnotatedCoinStore, CoinModule};
use std::collections::HashMap;
use std::str::FromStr;
use tokio::runtime::Runtime;
use tokio::runtime::Handle;

/// AggregateService is aggregate RPC service and MoveFunctionCaller.
#[derive(Clone)]
Expand Down Expand Up @@ -144,27 +143,17 @@ impl AggregateService {
}
}

lazy_static! {
static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("rooch-aggregate-service")
.enable_all()
.build()
.unwrap();
}

impl MoveFunctionCaller for AggregateService {
// Use futures::executors::block_on to go from sync -> async
// Warning! Possible deadlocks can occur if we try to wait for a future without spawn
fn call_function(
&self,
_ctx: &TxContext,
function_call: FunctionCall,
) -> Result<FunctionResult> {
let rpc_service = self.rpc_service.clone();
let function_result = futures::executor::block_on(
RUNTIME.spawn(async move { rpc_service.execute_view_function(function_call).await }),
)??;

let function_result = tokio::task::block_in_place(|| {
Handle::current()
.block_on(async move { rpc_service.execute_view_function(function_call).await })
})?;
function_result.try_into()
}
}
1 change: 1 addition & 0 deletions crates/rooch-rpc-server/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
// SPDX-License-Identifier: Apache-2.0

pub mod aggregate_service;
pub mod rpc_logger;
pub mod rpc_service;
80 changes: 80 additions & 0 deletions crates/rooch-rpc-server/src/service/rpc_logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use jsonrpsee::server::logger::Logger;
use tracing::Level;

#[derive(Debug, Clone)]
pub struct RpcLogger;

impl Logger for RpcLogger {
type Instant = std::time::Instant;

fn on_connect(
&self,
_remote_addr: std::net::SocketAddr,
_request: &jsonrpsee::server::logger::HttpRequest,
_t: jsonrpsee::server::logger::TransportProtocol,
) {
}

fn on_request(
&self,
_transport: jsonrpsee::server::logger::TransportProtocol,
) -> Self::Instant {
std::time::Instant::now()
}

fn on_call(
&self,
method_name: &str,
params: jsonrpsee::types::Params,
_kind: jsonrpsee::server::logger::MethodKind,
transport: jsonrpsee::server::logger::TransportProtocol,
) {
//TODO remove param parse when server stable.
let params_str = match params.parse::<serde_json::Value>() {
Ok(json) => json.to_string(),
Err(e) => e.to_string(),
};
tracing::event!(
Level::INFO,
event = "on_call",
transport = transport.to_string(),
method_name = method_name,
params = params_str,
);
}

fn on_result(
&self,
method_name: &str,
success: bool,
started_at: Self::Instant,
_transport: jsonrpsee::server::logger::TransportProtocol,
) {
let elapsed_millis = started_at.elapsed().as_millis();
tracing::event!(
Level::INFO,
event = "on_result",
method_name = method_name,
success = success,
elapsed_millis = elapsed_millis
);
}

fn on_response(
&self,
_result: &str,
_started_at: Self::Instant,
_transport: jsonrpsee::server::logger::TransportProtocol,
) {
}

fn on_disconnect(
&self,
_remote_addr: std::net::SocketAddr,
_transport: jsonrpsee::server::logger::TransportProtocol,
) {
}
}

0 comments on commit 25e3c06

Please sign in to comment.