Skip to content

Commit

Permalink
Update message processor gauge first (#1512)
Browse files Browse the repository at this point in the history
  • Loading branch information
asaj authored Jan 5, 2023
1 parent eb3550a commit 389eec5
Showing 1 changed file with 91 additions and 76 deletions.
167 changes: 91 additions & 76 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,92 +63,107 @@ impl MessageProcessor {
}
}

/// Tries to get the next message to process.
///
/// If no message with self.message_nonce is found, returns None.
/// If the message with self.message_nonce is found and has previously
/// been marked as processed, increments self.message_nonce and returns
/// None.
fn try_get_unprocessed_message(&mut self) -> Result<Option<HyperlaneMessage>> {
// First, see if we can find the message so we can update the gauge.
if let Some(message) = self.db.message_by_nonce(self.message_nonce)? {
// Update the latest nonce gauge if the message is destined for one
// of the domains we service.
if let Some(metrics) = self.metrics.get(message.destination) {
metrics.set(message.nonce as i64);
}

// If this message has already been processed, on to the next one.
if self
.db
.retrieve_message_processed(self.message_nonce)?
.is_none()
{
Ok(Some(message))
} else {
debug!(
msg_nonce=?self.message_nonce,
"Message already marked as processed in DB");
self.message_nonce += 1;
Ok(None)
}
} else {
debug!(
msg_nonce=?self.message_nonce,
"No message found in DB for nonce");
Ok(None)
}
}

/// One round of processing, extracted from infinite work loop for
/// testing purposes.
async fn tick(&mut self) -> Result<()> {
// Scan until we find next nonce without delivery confirmation.
if self
.db
.retrieve_message_processed(self.message_nonce)?
.is_some()
{
debug!(
nonce=?self.message_nonce,
"Skipping since message_nonce already in DB");
self.message_nonce += 1;
return Ok(());
}
let message = if let Some(msg) = self
.db
.message_by_nonce(self.message_nonce)?
.map(HyperlaneMessage::from)
{
debug!(msg=?msg, "Working on msg");
msg
} else {
debug!("Leaf in db without message nonce: {}", self.message_nonce);
// Not clear what the best thing to do here is, but there is seemingly an
// existing race wherein an indexer might non-atomically write leaf
// info to rocksdb across a few records, so we might see the leaf
// status above, but not the message contents here. For now,
// optimistically yield and then re-enter the loop in hopes that the
// DB is now coherent. TODO(webbhorn): Why can't we yield here
// instead of sleep?
tokio::time::sleep(Duration::from_secs(1)).await;
return Ok(());
};
if let Some(metrics) = self.metrics.get(message.destination) {
metrics.set(self.message_nonce as i64);
}
if let Some(message) = self.try_get_unprocessed_message()? {
debug!(msg=?message, "Working on message");

// Skip if not whitelisted.
if !self.whitelist.msg_matches(&message, true) {
debug!(
msg_id=?message.id(),
msg_destination=message.destination,
msg_nonce=message.nonce,
whitelist=?self.whitelist,
"Message not whitelisted, skipping");
self.message_nonce += 1;
return Ok(());
}

// Skip if the message is blacklisted
if self.blacklist.msg_matches(&message, false) {
debug!(
msg_id=?message.id(),
msg_destination=message.destination,
msg_nonce=message.nonce,
blacklist=?self.blacklist,
"Message blacklisted, skipping");
self.message_nonce += 1;
return Ok(());
}

// Skip if the message is intended for a destination we do not service
if self.send_channels.get(&message.destination).is_none() {
debug!(
msg_id=?message.id(),
msg_destination=message.destination,
msg_nonce=message.nonce,
"Message destined for unknown domain, skipping");
self.message_nonce += 1;
return Ok(());
}

// Feed the message to the prover sync
self.prover_sync
.write()
.await
.update_to_index(message.nonce)
.await?;

// Skip if not whitelisted.
if !self.whitelist.msg_matches(&message, true) {
debug!(
id=?message.id(),
destination=message.destination,
nonce=message.nonce,
whitelist=?self.whitelist,
"Message not whitelisted, skipping");
self.message_nonce += 1;
return Ok(());
}

// Skip if the message is blacklisted
if self.blacklist.msg_matches(&message, false) {
debug!(
id=?message.id(),
destination=message.destination,
nonce=message.nonce,
blacklist=?self.blacklist,
"Message blacklisted, skipping");
self.message_nonce += 1;
return Ok(());
}
msg_id=?message.id(),
msg_nonce=message.nonce,
"Sending message to submitter"
);

// Feed the message to the prover sync
self.prover_sync
.write()
.await
.update_to_index(message.nonce)
.await?;

debug!(
msg_id=?message.id(),
msg_nonce=message.nonce,
"Sending message to submitter"
);
// Finally, build the submit arg and dispatch it to the submitter.
let submit_args = SubmitMessageArgs::new(message.clone());
if let Some(send_channel) = self.send_channels.get(&message.destination) {
// Finally, build the submit arg and dispatch it to the submitter.
let submit_args = SubmitMessageArgs::new(message.clone());
// Guaranteed to exist as we return early above if it does not.
let send_channel = self.send_channels.get(&message.destination).unwrap();
send_channel.send(submit_args)?;
self.message_nonce += 1;
} else {
debug!(
id=?message.id(),
destination=message.destination,
nonce=message.nonce,
"Message destined for unknown domain, skipping");
tokio::time::sleep(Duration::from_secs(1)).await;
}
self.message_nonce += 1;
Ok(())
}
}
Expand Down

2 comments on commit 389eec5

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forge Coverage Report

Coverage for this commit
14.61%▴ +0.01%
Coverage Report
FileStmtsBranchesFuncsLinesUncovered Lines
../node_modules/@openzeppelin/contracts-upgradeable/access
   OwnableUpgradeable.sol61.90%50%57.14%70%30, 34, 56, 67, 75
../node_modules/@openzeppelin/contracts-upgradeable/proxy/utils
   Initializable.sol0%0%0%0%145, 145, 145–146, 146, 146–148, 156, 163
../node_modules/@openzeppelin/contracts-upgradeable/token/ERC20
   ERC20Upgradeable.sol66%54.55%55%74.14%100, 187–189, 207–209, 209, 209, 211, 214, 236–237, 242, 265, 291, 296, 326–327, 56, 60–61, 68, 76, 93
../node_modules/@openzeppelin/contracts-upgradeable/utils
   AddressUpgradeable.sol0%0%0%0%100, 119, 134, 134, 134–136, 146, 160–161, 176, 176, 176–177, 177, 177, 180, 180, 180, 182, 184, 199, 199, 199–200, 202, 208, 208, 208, 216, 41, 61, 61, 61, 63–64, 64, 64, 86
   ContextUpgradeable.sol33.33%100%25%50%28
../node_modules/@openzeppelin/contracts/access
   Ownable.sol0%0%0%0%44, 51, 51, 51, 62, 70, 70, 70–71, 79–81
../node_modules/@openzeppelin/contracts/proxy
   Proxy.sol0%100%0%0%59–60, 68
../node_modules/@openzeppelin/contracts/proxy/ERC1967
   ERC1967Proxy.sol0%100%0%0%30
   ERC1967Upgrade.sol0%0%0%0%117, 124, 124, 124–125, 134–135, 153, 160, 160, 160–161, 161, 161, 165, 179–181, 181, 181–182, 39, 46, 46, 46–47, 56–57, 70–71, 71, 71–72, 89, 89, 89–90, 92, 97
../node_modules/@openzeppelin/contracts/proxy/transparent
   ProxyAdmin.sol0%0%0%0%24–25, 25, 25–26, 39–40, 40, 40–41, 52, 63, 79
   TransparentUpgradeableProxy.sol0%0%0%0%107, 114, 121, 121, 121–122, 63, 76, 87, 96
../node_modules/@openzeppelin/contracts/utils
   Address.sol0%0%0%0%100, 119, 134, 134, 134–136, 146, 160–161, 171, 185–186, 201, 201, 201–202, 202, 202, 205, 205, 205, 207, 209, 224, 224, 224–225, 227, 233, 233, 233, 241, 41, 61, 61, 61, 63–64, 64, 64, 86
   Context.sol0%100%0%0%18, 22
   Create2.sol0%0%0%0%35, 35, 35–36, 36, 36, 39, 41, 41, 41, 49, 80
   StorageSlot.sol0%100%0%0%55, 65, 75, 85
   Strings.sol0%0%0%0%20–22, 25, 28, 33–34, 34, 34, 36, 45, 53–58, 60, 60, 60–61, 68
../node_modules/@openzeppelin/contracts/utils/cryptography
   ECDSA.sol0%0%0%0%106–108, 121–123, 147, 147, 147–148, 152–153, 153, 153–154, 157, 170–172, 186, 198, 211, 24, 24, 24–26, 26, 26–28, 28, 28–30, 30, 30–31, 56, 56, 56–59, 64–66, 68, 70, 89–91
../node_modules/@openzeppelin/contracts/utils/math
   Math.sol0%0%0%0%102, 105, 108, 112, 117, 121–126, 132–133, 146–147, 147, 147–148, 150, 159, 159, 159–160, 173, 180–187, 196–197, 20, 206, 208, 208, 208–210, 212, 212, 212–214, 216, 216, 216–218, 220, 220, 220–222, 224, 224, 224–226, 228, 228, 228–230, 232, 232, 232–234, 236, 236, 236–237, 240, 249–250, 259, 261, 261, 261–263, 265, 265, 265–267, 269, 269, 269, 27, 270–271, 273, 273, 273–275, 277, 277, 277–279, 281, 281, 281–283, 285, 285, 285–286, 289, 298–299, 310, 312, 312, 312–314, 316, 316, 316–318, 320, 320, 320–322, 324, 324, 324–326, 328, 328, 328–329, 332,

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardhat Coverage Report

Coverage for this commit
57.98%▴ +0.13%
Coverage Report
FileStmtsBranchesFuncsLinesUncovered Lines
solidity/contracts
   Call.sol100%100%100%100%
   Create2Factory.sol0%0%0%0%101, 110, 110, 110, 112, 33, 58, 64–65, 65, 65, 77–78, 98, 98, 98
   HyperlaneConnectionClient.sol64.71%75%63.64%63.16%119, 151–152, 58, 76–77, 85–86
   InterchainGasPaymaster.sol92.31%50%100%100%72
   Mailbox.sol97.06%100%92.31%97.56%232
   OwnableMulticall.sol0%0%0%0%15, 19, 23–24, 27, 27, 27–28, 39–41, 44, 44, 44–45, 51–53, 53, 53–54
   PausableReentrancyGuard.sol95.65%83.33%100%100%30
   Router.sol89.80%87.50%85.71%92.59%111, 53, 64
solidity/contracts/isms
   MultisigIsm.sol97.12%86.36%100%100%102, 155, 263
solidity/contracts/libs
   EnumerableMapExtended.sol53.33%100%57.14%50%45, 61, 69–70
   Merkle.sol97.14%87.50%100%98.25%30, 44
   Message.sol90.91%100%90.91%90.91%109
   MinimalProxy.sol0%100%0%0%14
   MultisigIsmMetadata.sol100%100%100%100%
   TypeCasts.sol44.44%100%66.67%33.33%11–13, 17
solidity/contracts/middleware
   InterchainAccountRouter.sol0%0%0%0%101–103, 117, 130–132, 132, 132–136, 138, 152, 165, 178, 182, 39, 41–42, 57, 74, 86
   InterchainQueryRouter.sol0%0%0%0%100–103, 117–121, 134, 134, 134, 138, 142, 155–156, 156, 156–157, 163–164, 168–169, 169–170, 174–175, 65, 82, 99
solidity/contracts/middleware/liquidity-layer
   LiquidityLayerRouter.sol0%0%0%0%111, 116, 124, 137–138, 146, 148, 148, 148, 28, 40, 51, 56, 56, 56, 64, 72, 82, 99
solidity/contracts/middleware/liquidity-layer/adapters
   CircleBridgeAdapter.sol0%0%0%0%100, 100, 100, 107, 107, 107, 112, 119–120, 131, 135, 141–142, 142, 142, 147–148, 148, 148, 157, 157, 157, 159, 169, 176, 178, 185, 185, 185, 191–192, 192, 192, 194–195, 195, 195, 200–201, 203, 211–212, 212, 212, 214–215, 215, 215, 221–222, 224, 240, 58, 58, 58–59, 75, 77–78, 81, 90–91, 91, 91, 96, 99
solidity/contracts/middleware/liquidity-layer/interfaces
   ILiquidityLayerAdapter.sol100%100%100%100%
solidity/contracts/middleware/liquidity-layer/interfaces/circle
   ICircleBridge.sol100%100%100%100%
   ICircleMessageTransmitter.sol100%100%100%100%
solidity/contracts/mock
   MockCircleBridge.sol0%0%0%0%12, 21–23, 23, 23–25, 35–36
   MockCircleMessageTransmitter.sol0%100%0%0%12, 20, 28, 36–37, 41
   MockHyperlaneEnvironment.sol0%100%0%0%21–22, 24–25, 27–28, 30–31, 33–34, 36–37, 39–40, 42, 47, 53, 57, 62–63, 67, 71
   MockMailbox.sol94.74%50%100%100%41
   MockToken.sol0%100%0%0%12, 8
solidity/contracts/upgrade
   ProxyAdmin.sol100%100%100%100%
   TransparentUpgradeableProxy.sol100%100%100%100%
   Versioned.sol100%100%100%100%

Please sign in to comment.