Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix ibc-tests raises gen_proof error when run with multithread #360

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 47 additions & 48 deletions crates/relayer/src/chain/axon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{collections::HashMap, str::FromStr, sync::Arc, thread, time::Duration};
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, RwLock},
thread,
time::Duration,
};

use axon_tools::types::{AxonBlock, Proof as AxonProof, ValidatorExtend};
use eth2_types::Hash256;
Expand Down Expand Up @@ -62,7 +68,7 @@ use ibc_relayer_types::{
packet::{PacketMsgType, Sequence},
},
ics23_commitment::{commitment::CommitmentPrefix, merkle::MerkleProof},
ics24_host::identifier::{ChainId, ChannelId, ClientId, ConnectionId, PortId},
ics24_host::identifier::{ChannelId, ClientId, ConnectionId, PortId},
},
events::{IbcEvent, WithBlockDataType},
proofs::{ConsensusProof, Proofs},
Expand Down Expand Up @@ -110,20 +116,6 @@ pub mod utils;
pub use rpc::AxonRpc;
use utils::*;

use lazy_static::lazy_static;
use std::sync::Mutex;

// use lazy_static to replace in-object variables to speed up message transfer while
// counting on the process of cross-thread message is too slow to trigger some issues
lazy_static! {
pub static ref CONNECTION_TX_HASH: Mutex<HashMap<(ChainId, ConnectionId), TxHash>> =
Mutex::new(HashMap::new());
pub static ref CHANNEL_TX_HASH: Mutex<HashMap<(ChainId, ChannelId, PortId), TxHash>> =
Mutex::new(HashMap::new());
pub static ref PACKET_TX_HASH: Mutex<HashMap<(ChainId, ChannelId, PortId, u64), TxHash>> =
Mutex::new(HashMap::new());
}

abigen!(
ERC20,
r"[
Expand All @@ -144,6 +136,13 @@ abigen!(
]"
);

#[derive(Default)]
pub struct IBCInfoCache {
conn_tx_hash: HashMap<ConnectionId, TxHash>,
chan_tx_hash: HashMap<(ChannelId, PortId), TxHash>,
packet_tx_hash: HashMap<(ChannelId, PortId, u64), TxHash>,
}

pub struct AxonChain {
rt: Arc<TokioRuntime>,
config: AxonChainConfig,
Expand All @@ -153,6 +152,7 @@ pub struct AxonChain {
client: Provider<Ws>,
keybase: KeyRing<Secp256k1KeyPair>,
chain_id: u64,
ibc_cache: Arc<RwLock<IBCInfoCache>>,
}

impl AxonChain {
Expand Down Expand Up @@ -212,6 +212,7 @@ impl ChainEndpoint for AxonChain {
.map_err(|e| Error::other_error(e.to_string()))?
.as_u64();
let light_client = AxonLightClient::from_config(&config, rt.clone())?;
let ibc_cache = Arc::new(RwLock::new(IBCInfoCache::default()));

// TODO: since Ckb endpoint uses Axon metadata cell as its light client, Axon
// endpoint has no need to monitor the update of its metadata
Expand All @@ -231,6 +232,7 @@ impl ChainEndpoint for AxonChain {
chain_id,
rpc_client,
client,
ibc_cache,
})
}

Expand Down Expand Up @@ -1046,9 +1048,10 @@ impl ChainEndpoint for AxonChain {
ConnectionMsgType::OpenAck => connection::State::TryOpen,
ConnectionMsgType::OpenConfirm => connection::State::Open,
};
let conn_tx_hash = CONNECTION_TX_HASH.lock().unwrap();
let tx_hash = conn_tx_hash
.get(&(self.id(), connection_id.clone()))
let ibc_cache = self.ibc_cache.read().unwrap();
let tx_hash = ibc_cache
.conn_tx_hash
.get(connection_id)
.ok_or(Error::conn_proof(
connection_id.clone(),
format!("missing connection tx_hash, state {state:?}"),
Expand All @@ -1068,9 +1071,10 @@ impl ChainEndpoint for AxonChain {
channel_id: &ChannelId,
height: Height,
) -> Result<Proofs, Error> {
let chan_tx_hash = CHANNEL_TX_HASH.lock().unwrap();
let tx_hash = chan_tx_hash
.get(&(self.id(), channel_id.clone(), port_id.clone()))
let ibc_cache = self.ibc_cache.read().unwrap();
let tx_hash = ibc_cache
.chan_tx_hash
.get(&(channel_id.clone(), port_id.clone()))
.ok_or(Error::chan_proof(
port_id.clone(),
channel_id.clone(),
Expand All @@ -1090,14 +1094,10 @@ impl ChainEndpoint for AxonChain {
sequence: Sequence,
height: Height,
) -> Result<Proofs, Error> {
let conn_tx_hash = PACKET_TX_HASH.lock().unwrap();
let tx_hash = conn_tx_hash
.get(&(
self.id(),
channel_id.clone(),
port_id.clone(),
sequence.into(),
))
let ibc_cache = self.ibc_cache.read().unwrap();
let tx_hash = ibc_cache
.packet_tx_hash
.get(&(channel_id.clone(), port_id.clone(), sequence.into()))
.ok_or(Error::packet_proof(
port_id.clone(),
channel_id.clone(),
Expand Down Expand Up @@ -1166,22 +1166,25 @@ impl AxonChain {
fn init_event_monitor(&mut self) -> Result<TxMonitorCmd, Error> {
crate::time!("axon_init_event_monitor");
// let header_receiver = self.light_client.subscribe();
let ibc_cache = self.ibc_cache.clone();
let (event_monitor, monitor_tx) = AxonEventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
self.config.contract_address,
// header_receiver,
self.rt.clone(),
ibc_cache.clone(),
)
.map_err(Error::event_monitor)?;

// restore past events to initialize tx_hash caches
let mut ibc_cache = ibc_cache.write().unwrap();
let latest_block_count = self.config.restore_block_count;
event_monitor
.restore_event_tx_hashes(latest_block_count)
.map_err(Error::event_monitor)?
.into_iter()
.for_each(|v| cache_ics_tx_hash_with_event(self.id(), v.event, v.tx_hash));
.for_each(|v| cache_ics_tx_hash_with_event(&mut ibc_cache, v.event, v.tx_hash));

thread::spawn(move || event_monitor.run());
Ok(monitor_tx)
Expand Down Expand Up @@ -1523,7 +1526,8 @@ impl AxonChain {
})?;
Height::from_noncosmos_height(block_height.as_u64())
};
cache_ics_tx_hash_with_event(self.id(), event.clone(), tx_hash);
let mut ibc_cache = self.ibc_cache.write().unwrap();
cache_ics_tx_hash_with_event(&mut ibc_cache, event.clone(), tx_hash);
Ok(IbcEventWithHeight {
event,
height,
Expand All @@ -1533,35 +1537,30 @@ impl AxonChain {
}

fn cache_ics_tx_hash<T: Into<[u8; 32]>>(
chain_id: ChainId,
ibc_cache: &mut IBCInfoCache,
cached_status: CacheTxHashStatus,
tx_hash: T,
) {
let hash: [u8; 32] = tx_hash.into();
match cached_status {
CacheTxHashStatus::Connection(conn_id) => {
CONNECTION_TX_HASH
.lock()
.unwrap()
.insert((chain_id, conn_id), hash.into());
ibc_cache.conn_tx_hash.insert(conn_id, hash.into());
}
CacheTxHashStatus::Channel(chan_id, port_id) => {
CHANNEL_TX_HASH
.lock()
.unwrap()
.insert((chain_id, chan_id, port_id), hash.into());
ibc_cache
.chan_tx_hash
.insert((chan_id, port_id), hash.into());
}
CacheTxHashStatus::Packet(chan_id, port_id, sequence) => {
PACKET_TX_HASH
.lock()
.unwrap()
.insert((chain_id, chan_id, port_id, sequence), hash.into());
ibc_cache
.packet_tx_hash
.insert((chan_id, port_id, sequence), hash.into());
}
}
}

pub fn cache_ics_tx_hash_with_event<T: Into<[u8; 32]>>(
chain_id: ChainId,
pub(crate) fn cache_ics_tx_hash_with_event<T: Into<[u8; 32]>>(
ibc_cache: &mut IBCInfoCache,
event: IbcEvent,
tx_hash: T,
) {
Expand Down Expand Up @@ -1607,6 +1606,6 @@ pub fn cache_ics_tx_hash_with_event<T: Into<[u8; 32]>>(
_ => None,
};
if let Some(tx_hash_status) = tx_hash_status {
cache_ics_tx_hash(chain_id, tx_hash_status, tx_hash);
cache_ics_tx_hash(ibc_cache, tx_hash_status, tx_hash);
}
}
13 changes: 10 additions & 3 deletions crates/relayer/src/chain/axon/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use super::contract::*;
use super::{contract::*, IBCInfoCache};
use crate::chain::axon::cache_ics_tx_hash_with_event;
use crate::event::bus::EventBus;
use crate::event::IbcEventWithHeight;
Expand Down Expand Up @@ -33,6 +33,7 @@ pub struct AxonEventMonitor {
start_block_number: u64,
rx_cmd: channel::Receiver<MonitorCmd>,
event_bus: EventBus<Arc<Result<EventBatch>>>,
ibc_cache: Arc<RwLock<IBCInfoCache>>,
}

impl AxonEventMonitor {
Expand All @@ -48,6 +49,7 @@ impl AxonEventMonitor {
websocket_addr: WebSocketClientUrl,
contract_address: Address,
rt: Arc<TokioRuntime>,
ibc_cache: Arc<RwLock<IBCInfoCache>>,
) -> Result<(Self, TxMonitorCmd)> {
let (tx_cmd, rx_cmd) = channel::unbounded();

Expand All @@ -71,6 +73,7 @@ impl AxonEventMonitor {
start_block_number,
rx_cmd,
event_bus,
ibc_cache,
};
Ok((monitor, TxMonitorCmd::new(tx_cmd)))
}
Expand Down Expand Up @@ -231,7 +234,11 @@ impl AxonEventMonitor {
Height::from_noncosmos_height(meta.block_number.as_u64()),
meta.transaction_hash.into(),
);
cache_ics_tx_hash_with_event(self.chain_id.clone(), event.event.clone(), event.tx_hash);
cache_ics_tx_hash_with_event(
&mut self.ibc_cache.write().unwrap(),
event.event.clone(),
event.tx_hash,
);
let batch = EventBatch {
chain_id: self.chain_id.clone(),
tracking_id: TrackingId::Static("Axon solidity event streaming"),
Expand Down
Loading