diff --git a/Cargo.lock b/Cargo.lock index f7c50a5..fe7c62c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2830,6 +2830,7 @@ dependencies = [ "env_logger", "ethereum-consensus", "helix-common", + "helix-utils", "hex", "rand", "refinery", diff --git a/crates/api/src/builder/api.rs b/crates/api/src/builder/api.rs index 5f28785..d68ddfd 100644 --- a/crates/api/src/builder/api.rs +++ b/crates/api/src/builder/api.rs @@ -1,9 +1,4 @@ -use std::{ - collections::HashMap, - io::Read, - sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; +use std::{collections::HashMap, io::Read, sync::Arc, time::Duration}; use axum::{ body::{to_bytes, Body}, @@ -62,7 +57,7 @@ use helix_common::{ use helix_database::DatabaseService; use helix_datastore::{types::SaveBidAndUpdateTopBidResponse, Auctioneer}; use helix_housekeeper::{ChainUpdate, PayloadAttributesUpdate, SlotUpdate}; -use helix_utils::{extract_request_id, get_payload_attributes_key, has_reached_fork}; +use helix_utils::{extract_request_id, get_payload_attributes_key, has_reached_fork, utcnow_ns}; use serde::Deserialize; @@ -307,7 +302,7 @@ where headers: HeaderMap, req: Request, ) -> Result { - let mut trace = SubmissionTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = SubmissionTrace { receive: utcnow_ns(), ..Default::default() }; let (head_slot, next_duty) = api.curr_slot_info.read().await.clone(); debug!(head_slot, timestamp_request_start = trace.receive); @@ -383,7 +378,7 @@ where is_cancellations_enabled, ) .await?; - trace.floor_bid_checks = get_nanos_timestamp()?; + trace.floor_bid_checks = utcnow_ns(); // Fetch builder info let builder_info = api.fetch_builder_info(payload.builder_public_key()).await; @@ -426,7 +421,7 @@ where warn!(%err, "failed sanity check"); return Err(err) } - trace.pre_checks = get_nanos_timestamp()?; + trace.pre_checks = utcnow_ns(); let (payload, was_simulated_optimistically) = api .verify_submitted_block( @@ -494,7 +489,7 @@ where } // Log some final info - trace.request_finish = get_nanos_timestamp()?; + trace.request_finish = utcnow_ns(); debug!( ?trace, request_duration_ns = trace.request_finish.saturating_sub(trace.receive), @@ -532,8 +527,7 @@ where headers: HeaderMap, req: Request, ) -> Result { - let mut trace = - HeaderSubmissionTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = HeaderSubmissionTrace { receive: utcnow_ns(), ..Default::default() }; let (head_slot, next_duty) = api.curr_slot_info.read().await.clone(); debug!(head_slot, timestamp_request_start = trace.receive,); @@ -635,14 +629,14 @@ where }) } - trace.pre_checks = get_nanos_timestamp()?; + trace.pre_checks = utcnow_ns(); // Verify the payload signature if let Err(err) = payload.verify_signature(&api.chain_info.context) { warn!(%err, "failed to verify signature"); return Err(BuilderApiError::SignatureVerificationFailed) } - trace.signature = get_nanos_timestamp()?; + trace.signature = utcnow_ns(); // Verify payload has not already been delivered match api.auctioneer.get_last_slot_delivered().await { @@ -671,7 +665,7 @@ where is_cancellations_enabled, ) .await?; - trace.floor_bid_checks = get_nanos_timestamp()?; + trace.floor_bid_checks = utcnow_ns(); // Save bid to auctioneer match api @@ -696,7 +690,7 @@ where } // Log some final info - trace.request_finish = get_nanos_timestamp()?; + trace.request_finish = utcnow_ns(); info!( ?trace, request_duration_ns = trace.request_finish.saturating_sub(trace.receive), @@ -751,8 +745,7 @@ where headers: HeaderMap, req: Request, ) -> Result { - let now = SystemTime::now(); - let mut trace = SubmissionTrace { receive: get_nanos_from(now)?, ..Default::default() }; + let mut trace = SubmissionTrace { receive: utcnow_ns(), ..Default::default() }; let (head_slot, next_duty) = api.curr_slot_info.read().await.clone(); debug!(head_slot, timestamp_request_start = trace.receive); @@ -877,7 +870,7 @@ where warn!(%err, "failed sanity check"); return Err(err) } - trace.pre_checks = get_nanos_timestamp()?; + trace.pre_checks = utcnow_ns(); let (payload, _) = match api .verify_submitted_block( @@ -911,13 +904,13 @@ where error!(%err, "failed to save execution payload"); return Err(BuilderApiError::AuctioneerError(err)) } - trace.auctioneer_update = get_nanos_timestamp()?; + trace.auctioneer_update = utcnow_ns(); // Gossip to other relays api.gossip_payload(&payload, payload.payload_and_blobs()).await; // Log some final info - trace.request_finish = get_nanos_timestamp()?; + trace.request_finish = utcnow_ns(); debug!( ?trace, request_duration_ns = trace.request_finish.saturating_sub(trace.receive), @@ -1040,7 +1033,7 @@ where let mut trace = GossipedHeaderTrace { on_receive: req.on_receive, - on_gossip_receive: get_nanos_timestamp().unwrap_or_default(), + on_gossip_receive: utcnow_ns(), ..Default::default() }; @@ -1098,7 +1091,7 @@ where } }; - trace.pre_checks = get_nanos_timestamp().unwrap_or_default(); + trace.pre_checks = utcnow_ns(); // Save header to auctioneer let mut update_bid_result = SaveBidAndUpdateTopBidResponse::default(); @@ -1118,7 +1111,7 @@ where return } - trace.auctioneer_update = get_nanos_timestamp().unwrap_or_default(); + trace.auctioneer_update = utcnow_ns(); debug!("succesfully saved gossiped header"); @@ -1138,10 +1131,7 @@ where pub async fn process_gossiped_payload(&self, req: BroadcastPayloadParams) { debug!(block_hash = ?req.execution_payload.execution_payload.block_hash(), "received gossiped payload"); - let mut trace = GossipedPayloadTrace { - receive: get_nanos_timestamp().unwrap_or_default(), - ..Default::default() - }; + let mut trace = GossipedPayloadTrace { receive: utcnow_ns(), ..Default::default() }; // Save gossiped payload to auctioneer in case it was sent to diffent region than the header if let Err(err) = self @@ -1178,7 +1168,7 @@ where } } - trace.pre_checks = get_nanos_timestamp().unwrap_or_default(); + trace.pre_checks = utcnow_ns(); // Save payload to auctioneer if let Err(err) = self @@ -1195,7 +1185,7 @@ where return } - trace.auctioneer_update = get_nanos_timestamp().unwrap_or_default(); + trace.auctioneer_update = utcnow_ns(); debug!("succesfully saved gossiped payload"); @@ -1373,7 +1363,7 @@ where warn!(%err, "failed to verify signature"); return Err(BuilderApiError::SignatureVerificationFailed) } - trace.signature = get_nanos_timestamp()?; + trace.signature = utcnow_ns(); // Simulate the submission let payload = Arc::new(payload); @@ -1564,7 +1554,7 @@ where } } - debug!(timestamp_before_validation = get_nanos_timestamp()?); + debug!(timestamp_before_validation = utcnow_ns()); let sim_request = BlockSimRequest::new( registration_info.registration.message.gas_limit, @@ -1581,7 +1571,7 @@ where Ok(sim_optimistic) => { debug!("block simulation successful"); - trace.simulation = get_nanos_timestamp()?; + trace.simulation = utcnow_ns(); debug!(sim_latency = trace.simulation.saturating_sub(trace.signature)); Ok(sim_optimistic) @@ -1622,7 +1612,7 @@ where { Ok(Some((builder_bid, execution_payload))) => { // Log the results of the bid submission - trace.auctioneer_update = get_nanos_timestamp()?; + trace.auctioneer_update = utcnow_ns(); log_save_bid_info(&update_bid_result, trace.simulation, trace.auctioneer_update); Ok(Some((builder_bid, execution_payload))) @@ -1676,7 +1666,7 @@ where { Ok(Some(builder_bid)) => { // Log the results of the bid submission - trace.auctioneer_update = get_nanos_timestamp()?; + trace.auctioneer_update = utcnow_ns(); log_save_bid_info( &update_bid_result, trace.floor_bid_checks, @@ -2029,7 +2019,7 @@ pub async fn decode_payload( serde_json::from_slice(&body_bytes)? }; - trace.decode = get_nanos_timestamp()?; + trace.decode = utcnow_ns(); debug!( timestamp_after_decoding = trace.decode, decode_latency_ns = trace.decode.saturating_sub(trace.receive), @@ -2179,7 +2169,7 @@ pub async fn decode_header_submission( serde_json::from_slice(&body_bytes)? }; - trace.decode = get_nanos_timestamp()?; + trace.decode = utcnow_ns(); debug!( timestamp_after_decoding = trace.decode, decode_latency_ns = trace.decode.saturating_sub(trace.receive), @@ -2375,19 +2365,6 @@ async fn process_db_additions( } } -fn get_nanos_timestamp() -> Result { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .map_err(|_| BuilderApiError::InternalError) -} - -fn get_nanos_from(now: SystemTime) -> Result { - now.duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .map_err(|_| BuilderApiError::InternalError) -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/api/src/constraints/api.rs b/crates/api/src/constraints/api.rs index 16426aa..ecf516a 100644 --- a/crates/api/src/constraints/api.rs +++ b/crates/api/src/constraints/api.rs @@ -16,12 +16,11 @@ use helix_common::{ }; use helix_datastore::Auctioneer; use helix_housekeeper::{ChainUpdate, SlotUpdate}; -use helix_utils::signing::{verify_signed_message, COMMIT_BOOST_DOMAIN}; -use std::{ - collections::HashSet, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, +use helix_utils::{ + signing::{verify_signed_message, COMMIT_BOOST_DOMAIN}, + utcnow_ns, }; +use std::{self, collections::HashSet, sync::Arc}; use tokio::{ sync::{ broadcast, @@ -109,8 +108,7 @@ where req: Request, ) -> Result { let request_id = Uuid::new_v4(); - let mut trace = - ConstraintSubmissionTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = ConstraintSubmissionTrace { receive: utcnow_ns(), ..Default::default() }; // Decode the incoming request body into a payload. let signed_constraints = @@ -214,7 +212,7 @@ where } // Log some final info - trace.request_finish = get_nanos_timestamp()?; + trace.request_finish = utcnow_ns(); trace!( request_id = %request_id, trace = ?trace, @@ -233,8 +231,7 @@ where req: Request, ) -> Result { let request_id = Uuid::new_v4(); - let mut trace = - ConstraintSubmissionTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = ConstraintSubmissionTrace { receive: utcnow_ns(), ..Default::default() }; info!( request_id = %request_id, @@ -264,7 +261,7 @@ where return Err(ConstraintsApiError::InvalidDelegation) } }; - trace.decode = get_nanos_timestamp()?; + trace.decode = utcnow_ns(); for delegation in &signed_delegations { if let Err(e) = verify_signed_message( @@ -278,7 +275,7 @@ where return Err(ConstraintsApiError::InvalidSignature) }; } - trace.verify_signature = get_nanos_timestamp()?; + trace.verify_signature = utcnow_ns(); // Store the delegation in the database tokio::spawn(async move { @@ -288,7 +285,7 @@ where }); // Log some final info - trace.request_finish = get_nanos_timestamp()?; + trace.request_finish = utcnow_ns(); trace!( request_id = %request_id, trace = ?trace, @@ -307,8 +304,7 @@ where req: Request, ) -> Result { let request_id = Uuid::new_v4(); - let mut trace = - ConstraintSubmissionTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = ConstraintSubmissionTrace { receive: utcnow_ns(), ..Default::default() }; info!( request_id = %request_id, @@ -338,7 +334,7 @@ where return Err(ConstraintsApiError::InvalidRevocation) } }; - trace.decode = get_nanos_timestamp()?; + trace.decode = utcnow_ns(); for revocation in &signed_revocations { if let Err(e) = verify_signed_message( @@ -352,7 +348,7 @@ where return Err(ConstraintsApiError::InvalidSignature) }; } - trace.verify_signature = get_nanos_timestamp()?; + trace.verify_signature = utcnow_ns(); // Store the delegation in the database tokio::spawn(async move { @@ -363,7 +359,7 @@ where }); // Log some final info - trace.request_finish = get_nanos_timestamp()?; + trace.request_finish = utcnow_ns(); info!( request_id = %request_id, trace = ?trace, @@ -395,7 +391,7 @@ where .await { Ok(()) => { - trace.auctioneer_update = get_nanos_timestamp()?; + trace.auctioneer_update = utcnow_ns(); info!( request_id = %request_id, timestamp_after_auctioneer = Instant::now().elapsed().as_nanos(), @@ -517,7 +513,7 @@ pub async fn decode_constraints_submission( serde_json::from_slice(&body_bytes)? }; - trace.decode = get_nanos_timestamp()?; + trace.decode = utcnow_ns(); info!( request_id = %request_id, timestamp_after_decoding = Instant::now().elapsed().as_nanos(), @@ -527,10 +523,3 @@ pub async fn decode_constraints_submission( Ok(constraints.to_vec()) } - -fn get_nanos_timestamp() -> Result { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .map_err(|_| ConstraintsApiError::InternalError) -} diff --git a/crates/api/src/proposer/api.rs b/crates/api/src/proposer/api.rs index 43c5893..fc191ad 100644 --- a/crates/api/src/proposer/api.rs +++ b/crates/api/src/proposer/api.rs @@ -1,7 +1,4 @@ -use std::{ - sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; +use std::{sync::Arc, time::Duration}; use axum::{ body::{to_bytes, Body}, @@ -51,6 +48,7 @@ use helix_housekeeper::{ChainUpdate, SlotUpdate}; use helix_utils::{ extract_request_id, signing::{verify_signed_builder_message, verify_signed_consensus_message}, + utcnow_ms, utcnow_ns, }; use crate::{ @@ -172,8 +170,7 @@ where let request_id = extract_request_id(&headers); - let mut trace = - RegisterValidatorsTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = RegisterValidatorsTrace { receive: utcnow_ns(), ..Default::default() }; // Get optional api key from headers let api_key = headers.get("x-api-key").and_then(|key| key.to_str().ok()); @@ -355,7 +352,7 @@ where } }); - trace.registrations_complete = get_nanos_timestamp()?; + trace.registrations_complete = utcnow_ns(); info!( request_id = %request_id, @@ -388,7 +385,7 @@ where let request_id = extract_request_id(&headers); - let mut trace = GetHeaderTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = GetHeaderTrace { receive: utcnow_ns(), ..Default::default() }; let (head_slot, duty) = proposer_api.curr_slot_info.read().await.clone(); debug!( @@ -426,7 +423,7 @@ where return Err(err) } }; - trace.validation_complete = get_nanos_timestamp()?; + trace.validation_complete = utcnow_ns(); let user_agent = headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|v| v.to_string()); @@ -436,7 +433,7 @@ where .auctioneer .get_best_bid(bid_request.slot, &bid_request.parent_hash, &bid_request.public_key) .await; - trace.best_bid_fetched = get_nanos_timestamp()?; + trace.best_bid_fetched = utcnow_ns(); info!(request_id = %request_id, trace = ?trace, "best bid fetched"); match get_best_bid_res { @@ -498,7 +495,7 @@ where Path(GetHeaderParams { slot, parent_hash, public_key }): Path, ) -> Result { let request_id = Uuid::new_v4(); - let mut trace = GetHeaderTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = GetHeaderTrace { receive: utcnow_ns(), ..Default::default() }; let (head_slot, _) = *proposer_api.curr_slot_info.read().await; debug!( @@ -526,14 +523,14 @@ where warn!(request_id = %request_id, err = %err, "invalid bid request time"); return Err(err) } - trace.validation_complete = get_nanos_timestamp()?; + trace.validation_complete = utcnow_ns(); // Get best bid from auctioneer let get_best_bid_res = proposer_api .auctioneer .get_best_bid(bid_request.slot, &bid_request.parent_hash, &bid_request.public_key) .await; - trace.best_bid_fetched = get_nanos_timestamp()?; + trace.best_bid_fetched = utcnow_ns(); info!(request_id = %request_id, trace = ?trace, "best bid fetched"); let user_agent = @@ -650,7 +647,7 @@ where ) -> Result { let request_id = extract_request_id(&headers); - let mut trace = GetPayloadTrace { receive: get_nanos_timestamp()?, ..Default::default() }; + let mut trace = GetPayloadTrace { receive: utcnow_ns(), ..Default::default() }; let user_agent = headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|v| v.to_string()); @@ -747,7 +744,7 @@ where warn!(request_id = %request_id, error = %err, "invalid proposal coordinate"); return Err(err) } - trace.proposer_index_validated = get_nanos_timestamp()?; + trace.proposer_index_validated = utcnow_ns(); let proposer_public_key = slot_duty.entry.registration.message.public_key; if let Err(err) = self.verify_signed_blinded_block_signature( @@ -759,7 +756,7 @@ where warn!(request_id = %request_id, error = %err, "invalid signature"); return Err(ProposerApiError::InvalidSignature(err)) } - trace.signature_validated = get_nanos_timestamp()?; + trace.signature_validated = utcnow_ns(); // Get execution payload from auctioneer let payload_result = self @@ -786,7 +783,7 @@ where } }; info!(request_id = %request_id, "found payload for blinded signed block"); - trace.payload_fetched = get_nanos_timestamp()?; + trace.payload_fetched = utcnow_ns(); // Check if get_payload has already been called if let Err(err) = self @@ -849,7 +846,7 @@ where return Err(err) } - trace.validation_complete = get_nanos_timestamp()?; + trace.validation_complete = utcnow_ns(); let unblinded_payload = match unblind_beacon_block(&signed_blinded_block, &versioned_payload) { @@ -899,7 +896,7 @@ where error!(request_id = %request_id_clone, error = %err, "error publishing block"); }; - trace_clone.beacon_client_broadcast = get_nanos_timestamp().unwrap_or_default(); + trace_clone.beacon_client_broadcast = utcnow_ns(); // Broadcast payload to all broadcasters self_clone.broadcast_signed_block( @@ -907,10 +904,10 @@ where Some(BroadcastValidation::Gossip), &request_id_clone, ); - trace_clone.broadcaster_block_broadcast = get_nanos_timestamp().unwrap_or_default(); + trace_clone.broadcaster_block_broadcast = utcnow_ns(); // While we wait for the block to propagate, we also store the payload information - trace_clone.on_deliver_payload = get_nanos_timestamp().unwrap_or_default(); + trace_clone.on_deliver_payload = utcnow_ns(); self_clone .save_delivered_payload_info( payload_clone, @@ -940,7 +937,7 @@ where // `TARGET_GET_PAYLOAD_PROPAGATION_DURATION_MS` to allow the block to // propagate through the network. let elapsed_since_propagate_start_ms = - (get_nanos_timestamp()?.saturating_sub(trace.beacon_client_broadcast)) / 1_000_000; + (utcnow_ns().saturating_sub(trace.beacon_client_broadcast)) / 1_000_000; let remaining_sleep_ms = self .relay_config .target_get_payload_propagation_duration_ms @@ -963,7 +960,7 @@ where }; // Return response - info!(request_id = %request_id, trace = ?trace, timestamp = get_nanos_timestamp()?, "delivering payload"); + info!(request_id = %request_id, trace = ?trace, timestamp = utcnow_ns(), "delivering payload"); Ok(get_payload_response) } } @@ -1032,7 +1029,7 @@ where /// /// Returns how many ms we are into the slot if ok. fn validate_bid_request_time(&self, bid_request: &BidRequest) -> Result { - let curr_timestamp_ms = get_millis_timestamp()? as i64; + let curr_timestamp_ms = utcnow_ms() as i64; let slot_start_timestamp = self.chain_info.genesis_time_in_secs + (bid_request.slot * self.chain_info.seconds_per_slot); let ms_into_slot = curr_timestamp_ms.saturating_sub((slot_start_timestamp * 1000) as i64); @@ -1276,10 +1273,7 @@ where if let GossipedMessage::GetPayload(payload) = msg { let api_clone = self.clone(); tokio::spawn(async move { - let mut trace = GetPayloadTrace { - receive: get_nanos_timestamp().unwrap_or_default(), - ..Default::default() - }; + let mut trace = GetPayloadTrace { receive: utcnow_ns(), ..Default::default() }; debug!(request_id = %payload.request_id, "processing gossiped payload"); match api_clone ._get_payload( @@ -1320,7 +1314,7 @@ where let mut last_error: Option = None; let mut first_try = true; // Try at least once to cover case where get_payload is called too late. - while first_try || get_millis_timestamp()? < slot_cutoff_millis { + while first_try || utcnow_ms() < slot_cutoff_millis { match self.auctioneer.get_execution_payload(slot, pub_key, block_hash).await { Ok(Some(versioned_payload)) => return Ok(versioned_payload), Ok(None) => { @@ -1511,20 +1505,6 @@ fn calculate_slot_time_info( (ms_into_slot, duration_until_slot_start) } -pub fn get_nanos_timestamp() -> Result { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .map_err(|_| ProposerApiError::InternalServerError) -} - -fn get_millis_timestamp() -> Result { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_millis() as u64) - .map_err(|_| ProposerApiError::InternalServerError) -} - fn get_consensus_version(block: &SignedBeaconBlock) -> ethereum_consensus::Fork { match block { SignedBeaconBlock::Phase0(_) => ethereum_consensus::Fork::Phase0, diff --git a/crates/api/src/proposer/tests.rs b/crates/api/src/proposer/tests.rs index 677d099..e401628 100644 --- a/crates/api/src/proposer/tests.rs +++ b/crates/api/src/proposer/tests.rs @@ -4,8 +4,8 @@ use ethereum_consensus::{ signing::compute_signing_root, }; use helix_common::chain_info::ChainInfo; +use helix_utils::utcnow_sec; use rand::thread_rng; -use std::time::{SystemTime, UNIX_EPOCH}; pub fn gen_signed_vr() -> SignedValidatorRegistration { let mut rng = thread_rng(); @@ -15,7 +15,7 @@ pub fn gen_signed_vr() -> SignedValidatorRegistration { let mut vr = ValidatorRegistration { fee_recipient: Default::default(), gas_limit: 0, - timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), + timestamp: utcnow_sec(), public_key: pk, }; @@ -33,10 +33,7 @@ mod proposer_api_tests { // +++ IMPORTS +++ use crate::{ gossiper::{mock_gossiper::MockGossiper, types::GossipedMessage}, - proposer::{ - api::{get_nanos_timestamp, ProposerApi}, - PATH_GET_PAYLOAD, PATH_PROPOSER_API, - }, + proposer::{api::ProposerApi, PATH_GET_PAYLOAD, PATH_PROPOSER_API}, test_utils::proposer_api_app, }; @@ -69,7 +66,7 @@ mod proposer_api_tests { use helix_database::MockDatabaseService; use helix_datastore::MockAuctioneer; use helix_housekeeper::{ChainUpdate, PayloadAttributesUpdate, SlotUpdate}; - use helix_utils::signing::verify_signed_consensus_message; + use helix_utils::{signing::verify_signed_consensus_message, utcnow_ns}; use serial_test::serial; use std::{sync::Arc, time::Duration}; use tokio::{ @@ -237,7 +234,7 @@ mod proposer_api_tests { fn calculate_current_slot() -> u64 { let genesis_time_in_secs: u64 = ChainInfo::for_mainnet().genesis_time_in_secs; let seconds_per_slot: u64 = ChainInfo::for_mainnet().seconds_per_slot; - let request_time_in_ns = get_nanos_timestamp().unwrap(); + let request_time_in_ns = utcnow_ns(); let current_time_in_secs = request_time_in_ns / 1_000_000_000; let time_since_genesis = current_time_in_secs.saturating_sub(genesis_time_in_secs); diff --git a/crates/common/src/validator.rs b/crates/common/src/validator.rs index bb37d74..b48cc32 100644 --- a/crates/common/src/validator.rs +++ b/crates/common/src/validator.rs @@ -1,10 +1,9 @@ -use std::time::{SystemTime, UNIX_EPOCH}; - use ethereum_consensus::{ phase0::Validator, primitives::{BlsPublicKey, Gwei, ValidatorIndex}, serde::as_str, }; +use helix_utils::utcnow_ms; use serde::{Deserialize, Serialize}; use crate::api::proposer_api::ValidatorRegistrationInfo; @@ -51,12 +50,7 @@ impl SignedValidatorRegistrationEntry { pool_name: Option, user_agent: Option, ) -> Self { - Self { - registration_info, - inserted_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64, - pool_name, - user_agent, - } + Self { registration_info, inserted_at: utcnow_ms(), pool_name, user_agent } } pub fn public_key(&self) -> &BlsPublicKey { diff --git a/crates/database/Cargo.toml b/crates/database/Cargo.toml index a7e20fa..86e7cd4 100644 --- a/crates/database/Cargo.toml +++ b/crates/database/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] helix-common.workspace = true +helix-utils.workspace = true # Async and Networking async-trait.workspace = true diff --git a/crates/database/src/postgres/postgres_db_service.rs b/crates/database/src/postgres/postgres_db_service.rs index a64dd63..b28debb 100644 --- a/crates/database/src/postgres/postgres_db_service.rs +++ b/crates/database/src/postgres/postgres_db_service.rs @@ -26,6 +26,7 @@ use helix_common::{ GossipedPayloadTrace, HeaderSubmissionTrace, ProposerInfo, RelayConfig, SignedValidatorRegistrationEntry, SubmissionTrace, ValidatorPreferences, ValidatorSummary, }; +use helix_utils::utcnow_ms; use tokio_postgres::{types::ToSql, NoTls}; use tracing::{error, info}; @@ -1330,7 +1331,7 @@ impl DatabaseService for PostgresDatabaseService { ) .await?; - let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + let timestamp = utcnow_ms(); transaction .execute( " diff --git a/crates/database/src/postgres/postgres_db_service_tests.rs b/crates/database/src/postgres/postgres_db_service_tests.rs index 57204bc..bd96d9b 100644 --- a/crates/database/src/postgres/postgres_db_service_tests.rs +++ b/crates/database/src/postgres/postgres_db_service_tests.rs @@ -18,14 +18,9 @@ mod tests { versioned_payload::PayloadAndBlobs, Filtering, GetPayloadTrace, HeaderSubmissionTrace, SubmissionTrace, ValidatorSummary, }; + use helix_utils::utcnow_sec; use rand::{seq::SliceRandom, thread_rng, Rng}; - use std::{ - default::Default, - ops::DerefMut, - str::FromStr, - sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, - }; + use std::{default::Default, ops::DerefMut, str::FromStr, sync::Arc, time::Duration}; use tokio::time::sleep; use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod}; @@ -93,7 +88,7 @@ mod tests { fn get_randomized_signed_validator_registration() -> ValidatorRegistrationInfo { let mut rng = rand::thread_rng(); - let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let timestamp = utcnow_sec(); let gas_limit = 0; let key = SecretKey::random(&mut rng).unwrap(); let signature = key.sign("message".as_bytes()); diff --git a/crates/housekeeper/src/chain_event_updater.rs b/crates/housekeeper/src/chain_event_updater.rs index 8e1b29c..0321c46 100644 --- a/crates/housekeeper/src/chain_event_updater.rs +++ b/crates/housekeeper/src/chain_event_updater.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use ethereum_consensus::{ configs::goerli::CAPELLA_FORK_EPOCH, deneb::Withdrawal, primitives::Bytes32, @@ -20,7 +16,7 @@ use helix_common::{ chain_info::ChainInfo, }; use helix_database::DatabaseService; -use helix_utils::{get_payload_attributes_key, has_reached_fork}; +use helix_utils::{get_payload_attributes_key, has_reached_fork, utcnow_sec}; // Do not accept slots more than 60 seconds in the future const MAX_DISTANCE_FOR_FUTURE_SLOT: u64 = 60; @@ -154,13 +150,12 @@ impl ChainEventUpdater { info!(head_slot = slot, "Processing slot",); // Validate this isn't a faulty head slot - if let Ok(current_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) { - let slot_timestamp = - self.chain_info.genesis_time_in_secs + (slot * self.chain_info.seconds_per_slot); - if slot_timestamp > current_timestamp.as_secs() + MAX_DISTANCE_FOR_FUTURE_SLOT { - warn!(head_slot = slot, "slot is too far in the future",); - return - } + + let slot_timestamp = + self.chain_info.genesis_time_in_secs + (slot * self.chain_info.seconds_per_slot); + if slot_timestamp > utcnow_sec() + MAX_DISTANCE_FOR_FUTURE_SLOT { + warn!(head_slot = slot, "slot is too far in the future",); + return } // Log any missed slots diff --git a/crates/housekeeper/src/housekeeper.rs b/crates/housekeeper/src/housekeeper.rs index 23f0948..a598971 100644 --- a/crates/housekeeper/src/housekeeper.rs +++ b/crates/housekeeper/src/housekeeper.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, SystemTime}, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use ethereum_consensus::primitives::BlsPublicKey; use ethers::{ @@ -11,6 +7,7 @@ use ethers::{ providers::{Http, Provider}, types::U256, }; +use helix_utils::utcnow_ms; use reth_primitives::{constants::EPOCH_SLOTS, revm_primitives::HashSet}; use std::convert::TryFrom; use tokio::{ @@ -360,8 +357,7 @@ impl /// /// DB entries are also removed if they have been waiting for over 45 seconds. async fn demote_builders_with_expired_pending_blocks(&self) -> Result<(), HousekeeperError> { - let current_time = - SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64; + let current_time = utcnow_ms(); let mut demoted_builders = HashSet::new(); diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index c050178..40b5af6 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -4,6 +4,7 @@ use std::{ io::Write, panic, path::Path, + time::{SystemTime, UNIX_EPOCH}, }; use ::serde::de; @@ -134,3 +135,22 @@ pub fn extract_request_id(headers: &HeaderMap) -> Uuid { .and_then(|v| Uuid::parse_str(v).ok()) .unwrap_or(Uuid::new_v4()) } + +////// TIME ////// + +/// Seconds +pub fn utcnow_sec() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() +} +/// Millis +pub fn utcnow_ms() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 +} +/// Micros +pub fn utcnow_us() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64 +} +/// Nanos +pub fn utcnow_ns() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64 +}