Skip to content

Commit

Permalink
Merge pull request #12 from ArkProjectNFTs/feat/solis-pending-gather
Browse files Browse the repository at this point in the history
feat(solis-pending): add modification to starknet service to gather r…
  • Loading branch information
kwiss authored May 28, 2024
2 parents f6705a4 + ce48f9c commit 64a15cb
Showing 1 changed file with 152 additions and 44 deletions.
196 changes: 152 additions & 44 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,51 @@ use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, info, trace, warn};
use url::Url;
use std::collections::HashSet;

use super::{Error, MessagingConfig, Messenger, MessengerResult, LOG_TARGET};

/// As messaging in starknet is only possible with EthAddress in the `to_address`
/// field, we have to set magic value to understand what the user want to do.
/// In the case of execution -> the felt 'EXE' will be passed.
/// And for normal messages, the felt 'MSG' is used.
/// Those values are very not likely a valid account address on starknet.
const MSG_MAGIC: FieldElement = felt!("0x4d5347");
const EXE_MAGIC: FieldElement = felt!("0x455845");

pub const HASH_EXEC: FieldElement = felt!("0xee");

pub struct EventCache {
processed_events: AsyncRwLock<HashSet<String>>,
}

impl EventCache {
pub fn new() -> Self {
EventCache {
processed_events: AsyncRwLock::new(HashSet::new()),
}
}

pub async fn is_event_processed(&self, event_id: &String) -> bool {
let events = self.processed_events.read().await;
events.contains(event_id)
}

pub async fn mark_event_as_processed(&self, event_id: String) {
let mut events = self.processed_events.write().await;
events.insert(event_id);
}

pub async fn clear(&self) {
let mut events = self.processed_events.write().await;
events.clear();
}
}

pub struct StarknetMessaging<EF: katana_executor::ExecutorFactory + Send + Sync> {
chain_id: FieldElement,
provider: AnyProvider,
wallet: LocalWallet,
sender_account_address: FieldElement,
messaging_contract_address: FieldElement,
hooker: Arc<AsyncRwLock<dyn KatanaHooker<EF> + Send + Sync>>,
cache_lock: AsyncRwLock<()>,
event_cache: Arc<EventCache>,
}

impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
Expand Down Expand Up @@ -65,6 +90,8 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
sender_account_address,
messaging_contract_address,
hooker,
cache_lock: AsyncRwLock::new(()),
event_cache: Arc::new(EventCache::new()),
})
}

Expand All @@ -82,11 +109,9 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
from_block: Some(from_block),
to_block: Some(to_block),
address: Some(self.messaging_contract_address),
// TODO: this might come from the configuration actually.
keys: None,
};

// TODO: this chunk_size may also come from configuration?
let chunk_size = 200;
let mut continuation_token: Option<String> = None;

Expand All @@ -95,7 +120,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?;

event_page.events.into_iter().for_each(|event| {
// We ignore events without the block number
if let Some(block_number) = event.block_number {
block_to_events
.entry(block_number)
Expand All @@ -114,7 +138,58 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
Ok(block_to_events)
}

/// Sends an invoke TX on starknet.
async fn fetch_pending_events(
&self,
chain_id: ChainId,
chunk_size: u64,
) -> Result<Vec<L1HandlerTx>> {
let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let mut continuation_token: Option<String> = None;

loop {
let filter = EventFilter {
from_block: Some(BlockId::Tag(BlockTag::Pending)),
to_block: Some(BlockId::Tag(BlockTag::Pending)),
address: Some(self.messaging_contract_address),
keys: None,
};

let event_page = self.provider.get_events(filter.clone(), continuation_token.clone(), chunk_size).await?;

for event in event_page.events {
let event_id = event.transaction_hash.to_string(); // Assuming `transaction_hash` is the unique identifier for the event

if self.event_cache.is_event_processed(&event_id).await {
continue;
}

if let Ok(tx) = l1_handler_tx_from_event(&event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(&event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;

if is_message_accepted {
l1_handler_txs.push(tx);
self.event_cache.mark_event_as_processed(event_id).await;
}
}
}
}

continuation_token = event_page.continuation_token;

if continuation_token.is_none() {
break;
}
}

Ok(l1_handler_txs)
}

async fn send_invoke_tx(&self, calls: Vec<Call>) -> Result<FieldElement> {
let signer = Arc::new(&self.wallet);

Expand All @@ -128,7 +203,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {

account.set_block_id(BlockId::Tag(BlockTag::Pending));

// TODO: we need to have maximum fee configurable.
let execution = account.execute(calls).fee_estimate_multiplier(10f64);
let estimated_fee = (execution.estimate_fee().await?.overall_fee) * 10u64.into();
let execution_with_fee = execution.max_fee(estimated_fee);
Expand All @@ -149,7 +223,6 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
}
}

/// Sends messages hashes to settlement layer by sending a transaction.
async fn send_hashes(&self, mut hashes: Vec<FieldElement>) -> MessengerResult<FieldElement> {
hashes.retain(|&x| x != HASH_EXEC);

Expand Down Expand Up @@ -205,7 +278,7 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
};

if from_block > chain_latest_block {
// Nothing to fetch, we can skip waiting the next tick.
// Nothing to fetch, we can skip waiting for the next tick.
return Ok((chain_latest_block, vec![]));
}

Expand All @@ -228,15 +301,8 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
Error::SendError
})?;

for (block_number, block_events) in block_to_events.iter() {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Converting events of block into L1HandlerTx."
);

for event in block_events.iter() {
for block_events in block_to_events.values() {
for event in block_events {
if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(event) {
let hooker = Arc::clone(&self.hooker);
Expand All @@ -254,6 +320,70 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
}

// Now, handle pending block events
{
// Use a lock to ensure atomicity
let cache_lock = self.cache_lock.write().await;

// Fetch pending block events
let pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| {
error!(target: LOG_TARGET, "Error fetching pending events: {:?}", e);
Error::SendError
})?;
l1_handler_txs.extend(pending_events);

// Get the latest block number again to ensure we didn't miss any new blocks
let latest_block_number = match self.provider.block_number().await {
Ok(n) => n,
Err(e) => {
warn!(
target: LOG_TARGET,
"Couldn't fetch settlement chain last block number. Skipped, retry at the next tick. Error: {:?}", e
);
return Err(Error::SendError);
}
};

// Fetch all events from the latest block to ensure none are missed
let confirmed_events = self.fetch_events(BlockId::Number(latest_block_number), BlockId::Number(latest_block_number)).await.map_err(|e| {
error!(target: LOG_TARGET, "Error fetching confirmed block events: {:?}", e);
Error::SendError
})?;

for block_events in confirmed_events.values() {
for event in block_events {
let event_id = event.transaction_hash.to_string();
if !self.event_cache.is_event_processed(&event_id).await {
if let Ok(tx) = l1_handler_tx_from_event(event, chain_id) {
if let Ok((from, to, selector)) = info_from_event(event) {
let hooker = Arc::clone(&self.hooker);
let is_message_accepted = hooker
.read()
.await
.verify_message_to_appchain(from, to, selector)
.await;

if is_message_accepted {
l1_handler_txs.push(tx);
}
}
}
}
}
}

self.event_cache.clear().await;

// Fetch pending events again to ensure no events were missed during the cache clearing
let rechecked_pending_events = self.fetch_pending_events(chain_id, 100).await.map_err(|e| {
error!(target: LOG_TARGET, "Error rechecking pending events: {:?}", e);
Error::SendError
})?;
l1_handler_txs.extend(rechecked_pending_events);

drop(cache_lock); // Release the lock
}

Ok((to_block, l1_handler_txs))
}

Expand Down Expand Up @@ -297,18 +427,11 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
}

/// Parses messages sent by cairo contracts to compute their hashes.
///
/// Messages can also be labelled as EXE, which in this case generate a `Call`
/// additionally to the hash.
fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement>, Vec<Call>)> {
let mut hashes: Vec<FieldElement> = vec![];
let mut calls: Vec<Call> = vec![];

for m in messages {
// Field `to_address` is restricted to eth addresses space. So the
// `to_address` is set to 'EXE'/'MSG' to indicate that the message
// has to be executed or sent normally.
let magic = m.to_address;

if magic == EXE_MAGIC {
Expand All @@ -325,23 +448,14 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement
let selector = m.payload[1];

let mut calldata = vec![];
// We must exclude the `to_address` and `selector` from the actual payload.
if m.payload.len() >= 3 {
calldata.extend(m.payload[2..].to_vec());
}

calls.push(Call { to, selector, calldata });
hashes.push(HASH_EXEC);
} else if magic == MSG_MAGIC {
// In the case of a regular message, we compute the message's hash
// which will then be sent in a transaction to be registered.

// As `to_address` is used by the magic, the `to_address` we want
// is the first element of the payload.
let to_address = m.payload[0];

// Then, the payload must be changed to only keep the rest of the
// data, without the first element that was the `to_address`.
let payload = &m.payload[1..];

let mut buf: Vec<u8> = vec![];
Expand All @@ -354,7 +468,6 @@ fn parse_messages(messages: &[MessageToL1]) -> MessengerResult<(Vec<FieldElement

hashes.push(starknet_keccak(&buf));
} else {
// Skip the message if no valid magic number is found.
warn!(target: LOG_TARGET, magic = ?magic, "Invalid message to_address magic value.");
continue;
}
Expand All @@ -377,14 +490,11 @@ fn l1_handler_tx_from_event(event: &EmittedEvent, chain_id: ChainId) -> Result<L
error!(target: LOG_TARGET, "Event MessageSentToAppchain is not well formatted.");
}

// See contract appchain_messaging.cairo for MessageSentToAppchain event.
let from_address = event.keys[2];
let to_address = event.keys[3];
let entry_point_selector = event.data[0];
let nonce = event.data[1];

// Skip the length of the serialized array for the payload which is data[2].
// Payload starts at data[3].
let mut calldata = vec![from_address];
calldata.extend(&event.data[3..]);

Expand All @@ -395,7 +505,6 @@ fn l1_handler_tx_from_event(event: &EmittedEvent, chain_id: ChainId) -> Result<L
calldata,
chain_id,
message_hash,
// This is the min value paid on L1 for the message to be sent to L2.
paid_fee_on_l1: 30000_u128,
entry_point_selector,
version: FieldElement::ZERO,
Expand All @@ -416,7 +525,6 @@ fn info_from_event(event: &EmittedEvent) -> Result<(FieldElement, FieldElement,
error!(target: LOG_TARGET, "Event MessageSentToAppchain is not well formatted");
}

// See contract appchain_messaging.cairo for MessageSentToAppchain event.
let from_address = event.keys[2];
let to_address = event.keys[3];
let entry_point_selector = event.data[0];
Expand Down Expand Up @@ -550,8 +658,8 @@ mod tests {
chain_id,
message_hash,
paid_fee_on_l1: 30000_u128,
version: FieldElement::ZERO,
entry_point_selector: selector,
version: FieldElement::ZERO,
contract_address: to_address.into(),
};

Expand Down

0 comments on commit 64a15cb

Please sign in to comment.