From ce48f9c9d5ee49c74b9215071f2f3e33d4520b89 Mon Sep 17 00:00:00 2001 From: kwiss Date: Sat, 25 May 2024 02:01:55 +0200 Subject: [PATCH] feat(solis-pending): add modification to starknet service to gather range block & then pending --- .../core/src/service/messaging/starknet.rs | 196 ++++++++++++++---- 1 file changed, 152 insertions(+), 44 deletions(-) diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index fc7b864975..abfbee4e49 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -17,19 +17,42 @@ use std::sync::Arc; use tokio::sync::RwLock as AsyncRwLock; use tracing::{debug, error, info, trace, warn}; use url::Url; +use std::collections::HashSet; use super::{Error, MessagingConfig, Messenger, MessengerResult, LOG_TARGET}; -/// As messaging in starknet is only possible with EthAddress in the `to_address` -/// field, we have to set magic value to understand what the user want to do. -/// In the case of execution -> the felt 'EXE' will be passed. -/// And for normal messages, the felt 'MSG' is used. -/// Those values are very not likely a valid account address on starknet. const MSG_MAGIC: FieldElement = felt!("0x4d5347"); const EXE_MAGIC: FieldElement = felt!("0x455845"); pub const HASH_EXEC: FieldElement = felt!("0xee"); +pub struct EventCache { + processed_events: AsyncRwLock>, +} + +impl EventCache { + pub fn new() -> Self { + EventCache { + processed_events: AsyncRwLock::new(HashSet::new()), + } + } + + pub async fn is_event_processed(&self, event_id: &String) -> bool { + let events = self.processed_events.read().await; + events.contains(event_id) + } + + pub async fn mark_event_as_processed(&self, event_id: String) { + let mut events = self.processed_events.write().await; + events.insert(event_id); + } + + pub async fn clear(&self) { + let mut events = self.processed_events.write().await; + events.clear(); + } +} + pub struct StarknetMessaging { chain_id: FieldElement, provider: AnyProvider, @@ -37,6 +60,8 @@ pub struct StarknetMessaging sender_account_address: FieldElement, messaging_contract_address: FieldElement, hooker: Arc + Send + Sync>>, + cache_lock: AsyncRwLock<()>, + event_cache: Arc, } impl StarknetMessaging { @@ -65,6 +90,8 @@ impl StarknetMessaging { sender_account_address, messaging_contract_address, hooker, + cache_lock: AsyncRwLock::new(()), + event_cache: Arc::new(EventCache::new()), }) } @@ -82,11 +109,9 @@ impl StarknetMessaging { from_block: Some(from_block), to_block: Some(to_block), address: Some(self.messaging_contract_address), - // TODO: this might come from the configuration actually. keys: None, }; - // TODO: this chunk_size may also come from configuration? let chunk_size = 200; let mut continuation_token: Option = None; @@ -95,7 +120,6 @@ impl StarknetMessaging { self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?; event_page.events.into_iter().for_each(|event| { - // We ignore events without the block number if let Some(block_number) = event.block_number { block_to_events .entry(block_number) @@ -114,7 +138,58 @@ impl StarknetMessaging { Ok(block_to_events) } - /// Sends an invoke TX on starknet. + async fn fetch_pending_events( + &self, + chain_id: ChainId, + chunk_size: u64, + ) -> Result> { + let mut l1_handler_txs: Vec = vec![]; + let mut continuation_token: Option = None; + + loop { + let filter = EventFilter { + from_block: Some(BlockId::Tag(BlockTag::Pending)), + to_block: Some(BlockId::Tag(BlockTag::Pending)), + address: Some(self.messaging_contract_address), + keys: None, + }; + + let event_page = self.provider.get_events(filter.clone(), continuation_token.clone(), chunk_size).await?; + + for event in event_page.events { + let event_id = event.transaction_hash.to_string(); // Assuming `transaction_hash` is the unique identifier for the event + + if self.event_cache.is_event_processed(&event_id).await { + continue; + } + + if let Ok(tx) = l1_handler_tx_from_event(&event, chain_id) { + if let Ok((from, to, selector)) = info_from_event(&event) { + let hooker = Arc::clone(&self.hooker); + let is_message_accepted = hooker + .read() + .await + .verify_message_to_appchain(from, to, selector) + .await; + + if is_message_accepted { + l1_handler_txs.push(tx); + self.event_cache.mark_event_as_processed(event_id).await; + } + } + } + } + + continuation_token = event_page.continuation_token; + + if continuation_token.is_none() { + break; + } + } + + Ok(l1_handler_txs) + } + async fn send_invoke_tx(&self, calls: Vec) -> Result { let signer = Arc::new(&self.wallet); @@ -128,7 +203,6 @@ impl StarknetMessaging { account.set_block_id(BlockId::Tag(BlockTag::Pending)); - // TODO: we need to have maximum fee configurable. let execution = account.execute(calls).fee_estimate_multiplier(10f64); let estimated_fee = (execution.estimate_fee().await?.overall_fee) * 10u64.into(); let execution_with_fee = execution.max_fee(estimated_fee); @@ -149,7 +223,6 @@ impl StarknetMessaging { } } - /// Sends messages hashes to settlement layer by sending a transaction. async fn send_hashes(&self, mut hashes: Vec) -> MessengerResult { hashes.retain(|&x| x != HASH_EXEC); @@ -205,7 +278,7 @@ impl Messenger for StarknetM }; if from_block > chain_latest_block { - // Nothing to fetch, we can skip waiting the next tick. + // Nothing to fetch, we can skip waiting for the next tick. return Ok((chain_latest_block, vec![])); } @@ -228,15 +301,8 @@ impl Messenger for StarknetM Error::SendError })?; - for (block_number, block_events) in block_to_events.iter() { - debug!( - target: LOG_TARGET, - block_number = %block_number, - events_count = %block_events.len(), - "Converting events of block into L1HandlerTx." - ); - - for event in block_events.iter() { + for block_events in block_to_events.values() { + for event in block_events { if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) { if let Ok((from, to, selector)) = info_from_event(event) { let hooker = Arc::clone(&self.hooker); @@ -254,6 +320,70 @@ impl Messenger for StarknetM } } + // Now, handle pending block events + { + // Use a lock to ensure atomicity + let cache_lock = self.cache_lock.write().await; + + // Fetch pending block events + let pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| { + error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e); + Error::SendError + })?; + l1_handler_txs.extend(pending_events); + + // Get the latest block number again to ensure we didn't miss any new blocks + let latest_block_number = match self.provider.block_number().await { + Ok(n) => n, + Err(e) => { + warn!( + target: LOG_TARGET, + "Couldn't fetch settlement chain last block number. Skipped, retry at the next tick. Error: {:?}", e + ); + return Err(Error::SendError); + } + }; + + // Fetch all events from the latest block to ensure none are missed + let confirmed_events = self.fetch_events(BlockId::Number(latest_block_number), BlockId::Number(latest_block_number)).await.map_err(|e| { + error!(target: LOG_TARGET, "Error fetching confirmed block events: {:?}", e); + Error::SendError + })?; + + for block_events in confirmed_events.values() { + for event in block_events { + let event_id = event.transaction_hash.to_string(); + if !self.event_cache.is_event_processed(&event_id).await { + if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) { + if let Ok((from, to, selector)) = info_from_event(event) { + let hooker = Arc::clone(&self.hooker); + let is_message_accepted = hooker + .read() + .await + .verify_message_to_appchain(from, to, selector) + .await; + + if is_message_accepted { + l1_handler_txs.push(tx); + } + } + } + } + } + } + + self.event_cache.clear().await; + + // Fetch pending events again to ensure no events were missed during the cache clearing + let rechecked_pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| { + error!(target: LOG_TARGET, "Error rechecking pending events: {:?}", e); + Error::SendError + })?; + l1_handler_txs.extend(rechecked_pending_events); + + drop(cache_lock); // Release the lock + } + Ok((to_block, l1_handler_txs)) } @@ -297,18 +427,11 @@ impl Messenger for StarknetM } } -/// Parses messages sent by cairo contracts to compute their hashes. -/// -/// Messages can also be labelled as EXE, which in this case generate a `Call` -/// additionally to the hash. fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec, Vec)> { let mut hashes: Vec = vec![]; let mut calls: Vec = vec![]; for m in messages { - // Field `to_address` is restricted to eth addresses space. So the - // `to_address` is set to 'EXE'/'MSG' to indicate that the message - // has to be executed or sent normally. let magic = m.to_address; if magic == EXE_MAGIC { @@ -325,7 +448,6 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec= 3 { calldata.extend(m.payload[2..].to_vec()); } @@ -333,15 +455,7 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec = vec![]; @@ -354,7 +468,6 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec Result Result Result<(FieldElement, FieldElement, error!(target: LOG_TARGET, "Event MessageSentToAppchain is not well formatted"); } - // See contract appchain_messaging.cairo for MessageSentToAppchain event. let from_address = event.keys[2]; let to_address = event.keys[3]; let entry_point_selector = event.data[0]; @@ -550,8 +658,8 @@ mod tests { chain_id, message_hash, paid_fee_on_l1: 30000_u128, - version: FieldElement::ZERO, entry_point_selector: selector, + version: FieldElement::ZERO, contract_address: to_address.into(), };