diff --git a/Cargo.lock b/Cargo.lock index b0d45e6..3991cac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2193,6 +2193,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "intmap" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee87fd093563344074bacf24faa0bb0227fb6969fb223e922db798516de924d6" + [[package]] name = "iovec" version = "0.1.4" @@ -2516,6 +2522,7 @@ dependencies = [ "criterion", "csv", "futures 0.3.30", + "intmap", "itertools 0.10.5", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", diff --git a/connector/Cargo.toml b/connector/Cargo.toml index 91ad11e..fe4e26d 100644 --- a/connector/Cargo.toml +++ b/connector/Cargo.toml @@ -42,6 +42,7 @@ serde_derive = { workspace = true } log = { workspace = true } anyhow = { workspace = true } smallvec = "1.13.2" +intmap = "2.0.0" itertools = { workspace = true } diff --git a/connector/src/chain_data.rs b/connector/src/chain_data.rs index 3be449a..1a4ead7 100644 --- a/connector/src/chain_data.rs +++ b/connector/src/chain_data.rs @@ -1,7 +1,9 @@ +use intmap::IntMap; use crate::chain_data::SlotVectorEffect::*; use log::trace; use smallvec::{smallvec, SmallVec}; use solana_sdk::clock::Slot; +use warp::trace; use { solana_sdk::account::{AccountSharedData, ReadableAccount}, solana_sdk::pubkey::Pubkey, @@ -50,8 +52,8 @@ impl AccountData { /// - use account() to retrieve the current best data for an account. /// - update_from_snapshot() and update_from_websocket() update the state for new messages pub struct ChainData { - /// only slots >= newest_rooted_slot are retained - slots: HashMap, + /// only slots >= newest_rooted_slot are retained; some 33 + slots: IntMap, /// writes to accounts, only the latest rooted write an newer are retained /// size distribution on startup: total:1105, size1:315, size2:146 accounts: HashMap>, @@ -65,7 +67,7 @@ pub struct ChainData { impl ChainData { pub fn new() -> Self { Self { - slots: HashMap::new(), + slots: IntMap::new(), accounts: HashMap::new(), newest_rooted_slot: 0, newest_processed_slot: 0, @@ -85,12 +87,7 @@ impl Default for ChainData { impl ChainData { #[tracing::instrument(skip_all, level = "trace")] pub fn update_slot(&mut self, new_slotdata: SlotData) { - let SlotData { - slot: new_slot, - parent: new_parent, - status: new_status, - .. - } = new_slotdata; + let SlotData { slot: new_slot, parent: new_parent, status: new_status, .. } = new_slotdata; trace!("update_slot from newslot {:?}", new_slot); let new_processed_head = new_slot > self.newest_processed_slot; @@ -116,7 +113,7 @@ impl ChainData { let mut parent_update = false; - use std::collections::hash_map::Entry; + use intmap::Entry; match self.slots.entry(new_slot) { Entry::Vacant(v) => { v.insert(new_slotdata); @@ -126,12 +123,8 @@ impl ChainData { let v = o.into_mut(); parent_update = v.parent != new_parent && new_parent.is_some(); if parent_update { - trace!( - "update parent of slot {}: {}->{}", - new_slot, - v.parent.unwrap_or(0), - new_parent.unwrap_or(0) - ); + trace!("update parent of slot {}: {}->{}", + new_slot, v.parent.unwrap_or(0), new_parent.unwrap_or(0)); } v.parent = v.parent.or(new_parent); // Never decrease the slot status @@ -152,7 +145,7 @@ impl ChainData { // update the "chain" field down to the first rooted slot let mut slot = self.best_chain_slot; loop { - if let Some(data) = self.slots.get_mut(&slot) { + if let Some(data) = self.slots.get_mut(slot) { data.chain = self.best_chain_slot; if data.status == SlotStatus::Rooted { break; @@ -178,7 +171,7 @@ impl ChainData { // now it's fine to drop any slots before the new rooted head // as account writes for non-rooted slots before it have been dropped - self.slots.retain(|s, _| *s >= self.newest_rooted_slot); + self.slots.retain(|s, _| s >= self.newest_rooted_slot); } } @@ -191,11 +184,12 @@ impl ChainData { self.best_chain_slot, &self.slots, ) - .map(|w| w.slot) - // no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway - .unwrap_or(self.newest_rooted_slot + 1); - writes - .retain(|w| w.slot == newest_rooted_write_slot || w.slot > self.newest_rooted_slot); + .map(|w| w.slot) + // no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway + .unwrap_or(self.newest_rooted_slot + 1); + writes.retain(|w| { + w.slot == newest_rooted_write_slot || w.slot > self.newest_rooted_slot + }); self.account_versions_stored += writes.len(); self.account_bytes_stored += writes.iter().map(|w| w.account.data().len()).sum::() @@ -247,7 +241,7 @@ impl ChainData { fn is_account_write_live(&self, write: &AccountData) -> bool { self.slots - .get(&write.slot) + .get(write.slot) // either the slot is rooted or in the current chain .map(|s| { s.status == SlotStatus::Rooted @@ -263,12 +257,12 @@ impl ChainData { writes: &'a [AccountData], newest_rooted_slot: u64, best_chain_slot: u64, - slots: &HashMap, + slots: &IntMap, ) -> Option<&'a AccountData> { writes.iter().rev().find(|w| { w.slot <= newest_rooted_slot && slots - .get(&w.slot) + .get(w.slot) .map(|s| { // sometimes we seem not to get notifications about slots // getting rooted, hence assume non-uncle slots < newest_rooted_slot @@ -352,9 +346,6 @@ impl ChainData { self.newest_processed_slot } - pub fn raw_slot_data(&self) -> &HashMap { - &self.slots - } } #[derive(Debug, PartialEq)] @@ -447,12 +438,15 @@ impl ChainDataMetrics { #[cfg(test)] mod tests { + use std::path::PathBuf; use crate::chain_data::{update_slotvec_logic, SlotVectorEffect::*}; use crate::chain_data::{AccountData, ChainData, SlotData, SlotStatus}; use solana_sdk::account::{AccountSharedData, ReadableAccount}; use solana_sdk::clock::Slot; use solana_sdk::pubkey::Pubkey; use std::str::FromStr; + use csv::ReaderBuilder; + use solana_sdk::commitment_config::CommitmentLevel; #[test] pub fn test_loosing_account_write() { @@ -739,4 +733,5 @@ mod tests { }, ] } + }