Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use intmap for slot map #33

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ serde_derive = { workspace = true }
log = { workspace = true }
anyhow = { workspace = true }
smallvec = "1.13.2"
intmap = "2.0.0"

itertools = { workspace = true }

Expand Down
53 changes: 24 additions & 29 deletions connector/src/chain_data.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use intmap::IntMap;

Check warning on line 1 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs

Check warning on line 1 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs
use crate::chain_data::SlotVectorEffect::*;
use log::trace;
use smallvec::{smallvec, SmallVec};
use solana_sdk::clock::Slot;
use warp::trace;
use {
solana_sdk::account::{AccountSharedData, ReadableAccount},
solana_sdk::pubkey::Pubkey,
Expand Down Expand Up @@ -50,8 +52,8 @@
/// - use account() to retrieve the current best data for an account.
/// - update_from_snapshot() and update_from_websocket() update the state for new messages
pub struct ChainData {
/// only slots >= newest_rooted_slot are retained
slots: HashMap<u64, SlotData>,
/// only slots >= newest_rooted_slot are retained; some 33
slots: IntMap<SlotData>,
/// writes to accounts, only the latest rooted write an newer are retained
/// size distribution on startup: total:1105, size1:315, size2:146
accounts: HashMap<Pubkey, SmallVec<[AccountData; 2]>>,
Expand All @@ -65,7 +67,7 @@
impl ChainData {
pub fn new() -> Self {
Self {
slots: HashMap::new(),
slots: IntMap::new(),
accounts: HashMap::new(),
newest_rooted_slot: 0,
newest_processed_slot: 0,
Expand All @@ -82,15 +84,10 @@
}
}

impl ChainData {

Check warning on line 87 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs

Check warning on line 87 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs
#[tracing::instrument(skip_all, level = "trace")]
pub fn update_slot(&mut self, new_slotdata: SlotData) {
let SlotData {
slot: new_slot,
parent: new_parent,
status: new_status,
..
} = new_slotdata;
let SlotData { slot: new_slot, parent: new_parent, status: new_status, .. } = new_slotdata;

trace!("update_slot from newslot {:?}", new_slot);
let new_processed_head = new_slot > self.newest_processed_slot;
Expand All @@ -116,22 +113,18 @@

let mut parent_update = false;

use std::collections::hash_map::Entry;
use intmap::Entry;
match self.slots.entry(new_slot) {
Entry::Vacant(v) => {
v.insert(new_slotdata);
trace!("inserted new slot {:?}", new_slot);
}
Entry::Occupied(o) => {
let v = o.into_mut();

Check warning on line 123 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs

Check warning on line 123 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs
parent_update = v.parent != new_parent && new_parent.is_some();
if parent_update {
trace!(
"update parent of slot {}: {}->{}",
new_slot,
v.parent.unwrap_or(0),
new_parent.unwrap_or(0)
);
trace!("update parent of slot {}: {}->{}",
new_slot, v.parent.unwrap_or(0), new_parent.unwrap_or(0));
}
v.parent = v.parent.or(new_parent);
// Never decrease the slot status
Expand All @@ -152,7 +145,7 @@
// update the "chain" field down to the first rooted slot
let mut slot = self.best_chain_slot;
loop {
if let Some(data) = self.slots.get_mut(&slot) {
if let Some(data) = self.slots.get_mut(slot) {
data.chain = self.best_chain_slot;
if data.status == SlotStatus::Rooted {
break;
Expand All @@ -178,7 +171,7 @@

// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
self.slots.retain(|s, _| s >= self.newest_rooted_slot);
}
}

Expand All @@ -188,14 +181,15 @@
let newest_rooted_write_slot = Self::newest_rooted_write(
writes,
self.newest_rooted_slot,
self.best_chain_slot,

Check warning on line 184 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs

Check warning on line 184 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs
&self.slots,
)
.map(|w| w.slot)
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
.unwrap_or(self.newest_rooted_slot + 1);
writes
.retain(|w| w.slot == newest_rooted_write_slot || w.slot > self.newest_rooted_slot);
.map(|w| w.slot)
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
.unwrap_or(self.newest_rooted_slot + 1);
writes.retain(|w| {
w.slot == newest_rooted_write_slot || w.slot > self.newest_rooted_slot
});
self.account_versions_stored += writes.len();
self.account_bytes_stored +=
writes.iter().map(|w| w.account.data().len()).sum::<usize>()
Expand Down Expand Up @@ -247,7 +241,7 @@

fn is_account_write_live(&self, write: &AccountData) -> bool {
self.slots
.get(&write.slot)
.get(write.slot)
// either the slot is rooted or in the current chain
.map(|s| {
s.status == SlotStatus::Rooted
Expand All @@ -263,12 +257,12 @@
writes: &'a [AccountData],
newest_rooted_slot: u64,
best_chain_slot: u64,
slots: &HashMap<u64, SlotData>,
slots: &IntMap<SlotData>,
) -> Option<&'a AccountData> {
writes.iter().rev().find(|w| {
w.slot <= newest_rooted_slot
&& slots
.get(&w.slot)
.get(w.slot)
.map(|s| {
// sometimes we seem not to get notifications about slots
// getting rooted, hence assume non-uncle slots < newest_rooted_slot
Expand Down Expand Up @@ -348,13 +342,10 @@
self.newest_rooted_slot
}

pub fn newest_processed_slot(&self) -> u64 {

Check warning on line 345 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs

Check warning on line 345 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs
self.newest_processed_slot
}

pub fn raw_slot_data(&self) -> &HashMap<u64, SlotData> {
&self.slots
}
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -444,15 +435,18 @@
});
}
}

Check warning on line 438 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs

Check warning on line 438 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use crate::chain_data::{update_slotvec_logic, SlotVectorEffect::*};
use crate::chain_data::{AccountData, ChainData, SlotData, SlotStatus};
use solana_sdk::account::{AccountSharedData, ReadableAccount};
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey;
use std::str::FromStr;
use csv::ReaderBuilder;
use solana_sdk::commitment_config::CommitmentLevel;

#[test]
pub fn test_loosing_account_write() {
Expand Down Expand Up @@ -736,7 +730,8 @@
slot: 50,
write_version: 10050,
account: dummy_account_data.clone(),
},

Check warning on line 733 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs

Check warning on line 733 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/connector/src/chain_data.rs
]
}

}
Loading