Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix off-by-one error in slot_vector inside update account #26

Merged
merged 9 commits into from
Aug 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 317 additions & 16 deletions connector/src/chain_data.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use crate::chain_data::SlotVectorEffect::*;
use log::trace;
use solana_sdk::clock::Slot;
use {
solana_sdk::account::{AccountSharedData, ReadableAccount},
solana_sdk::pubkey::Pubkey,
Expand All @@ -18,16 +21,22 @@
pub slot: u64,
pub parent: Option<u64>,
pub status: SlotStatus,
pub chain: u64, // the top slot that this is in a chain with. uncles will have values < tip
// the top slot that this is in a chain with. uncles will have values < tip
// this field gets progressively rewritten as new successor slots are added
pub chain: u64,
}

#[derive(Clone, Debug)]
pub struct AccountData {
pub slot: u64,
/// - caution: write_versions sourced from different Validator Node must not be compared
/// - account data from gMA/gPA RPC "snapshot" do not contain write_version
/// - from docs: "A single global atomic `AccountsDb::write_version` - tracks the number of commits to the entire data store."
pub write_version: u64,
pub account: AccountSharedData,
}

// caution: this is brittle if data comes from multiple sources
impl AccountData {
pub fn is_newer_than(&self, slot: u64, write_version: u64) -> bool {
(self.slot > slot) || (self.slot == slot && self.write_version > write_version)
Expand Down Expand Up @@ -157,31 +166,41 @@
}

pub fn update_account(&mut self, pubkey: Pubkey, account: AccountData) {
if account.write_version == 0 {
// some upstream components provide write_version=0 for snapshot accounts from gMA/gPA
// this maybe defies the intended effect that the snapshot account data should overwrite data from grpc, etc.
// would recommend to provide a very high write_version instead
// disclaimer(groovie, 2024/08): this logic is controversial, and I'm flexible to remove this completely
trace!("account {} has write_version 0 - not recommended", pubkey);
}

use std::collections::hash_map::Entry;
match self.accounts.entry(pubkey) {
Entry::Vacant(v) => {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(vec![account]);
v.insert(vec![account]); // capacity = 1
}
Entry::Occupied(o) => {
let v_effect = update_slotvec_logic(o.get(), account.slot, account.write_version);

let v = o.into_mut();
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let rev_pos = v
.iter()
.rev()
.position(|d| d.slot <= account.slot)
.unwrap_or(v.len());
let pos = v.len() - rev_pos;
if pos < v.len() && v[pos].slot == account.slot {
if v[pos].write_version <= account.write_version {

match v_effect {
Overwrite(pos) => {
v[pos] = account;
}
} else {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(pos, account);
Prepend => {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(0, account);
}
InsertAfter(pos) => {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(pos, account);
}
DoNothing => {}
}
}
};
Expand Down Expand Up @@ -301,6 +320,46 @@
}
}

#[derive(Debug, PartialEq)]
pub enum SlotVectorEffect {
Overwrite(usize),
Prepend,
InsertAfter(usize),
DoNothing,
}

#[inline]
pub fn update_slotvec_logic(
v: &Vec<AccountData>,
update_slot: Slot,
update_write_version: u64,
) -> SlotVectorEffect {
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let pos = v
.iter()
.rev()
.position(|d| d.slot <= update_slot)
.map(|rev_pos| v.len() - 1 - rev_pos);

match pos {
Some(pos) => {
if v[pos].slot == update_slot {
if v[pos].write_version <= update_write_version {
// note: applies last-wins-strategy if write_version is equal
Overwrite(pos)
} else {
DoNothing
}
} else {
assert!(v[pos].slot < update_slot);
InsertAfter(pos)
}
}
None => Prepend,
}
}

pub struct ChainDataMetrics {
slots_stored: MetricU64,
accounts_stored: MetricU64,
Expand Down Expand Up @@ -348,3 +407,245 @@
});
}
}

#[cfg(test)]
mod tests {
use crate::chain_data::{update_slotvec_logic, SlotVectorEffect::*};
use crate::chain_data::{AccountData, ChainData, SlotData, SlotStatus};
use solana_sdk::account::{AccountSharedData, ReadableAccount};
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey;
use std::str::FromStr;

#[test]
pub fn test_move_slot_to_finalized() {
const SLOT: Slot = 42_000_000;
const SOME_LAMPORTS: u64 = 99000;

let owner = Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap();
let my_account = Pubkey::new_unique();
let mut chain_data = ChainData::new();

chain_data.update_account(
my_account,
AccountData {
slot: SLOT,
write_version: 2000,
account: AccountSharedData::new(SOME_LAMPORTS, 100 /*space*/, &owner),
},
);

// note: this is initial state
assert_eq!(chain_data.newest_rooted_slot(), 0);

// assume no rooted slot yet
assert_eq!(chain_data.iter_accounts_rooted().count(), 0);

chain_data.update_slot(SlotData {
slot: SLOT,
parent: None,
status: SlotStatus::Processed,
chain: 0,
});

assert_eq!(chain_data.newest_rooted_slot(), 0);

chain_data.update_slot(SlotData {
slot: SLOT - 2,
parent: None,
status: SlotStatus::Rooted, // =finalized
chain: 0,
});

assert_eq!(chain_data.newest_rooted_slot(), SLOT - 2);

assert_eq!(chain_data.iter_accounts_rooted().count(), 0);

// GIVEN: finalized slot SLOT
chain_data.update_slot(SlotData {
slot: SLOT,
parent: None,
status: SlotStatus::Rooted, // =finalized
chain: 0,
});

assert_eq!(chain_data.iter_accounts_rooted().count(), 1);
}

#[test]
pub fn test_must_not_overwrite_with_older_by_slot() {
const SLOT: Slot = 42_000_000;

let owner = Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap();
let my_account = Pubkey::new_unique();
let mut chain_data = ChainData::new();

chain_data.update_slot(SlotData {
slot: SLOT - 2,
parent: None,
status: SlotStatus::Rooted, // =finalized
chain: 0,
});

chain_data.update_account(
my_account,
AccountData {
slot: SLOT - 2,
write_version: 2000,
account: AccountSharedData::new(300, 100 /*space*/, &owner),
},
);

assert_eq!(chain_data.iter_accounts_rooted().count(), 1);
assert_eq!(
chain_data.account(&my_account).unwrap().account.lamports(),
300
);

// WHEN: update with older data according to slot
chain_data.update_account(
my_account,
AccountData {
slot: SLOT - 20,
write_version: 2000,
account: AccountSharedData::new(350, 100 /*space*/, &owner),
},
);
// THEN: should not overwrite
assert_eq!(
chain_data.account(&my_account).unwrap().account.lamports(),
300,
"should not overwrite if slot is older"
);
}

#[test]
pub fn test_overwrite_with_older_by_write_version() {
const SLOT: Slot = 42_000_000;

let owner = Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap();
let my_account = Pubkey::new_unique();
let mut chain_data = ChainData::new();

chain_data.update_slot(SlotData {
slot: SLOT - 2,
parent: None,
status: SlotStatus::Rooted, // =finalized
chain: 0,
});

chain_data.update_account(
my_account,
AccountData {
slot: SLOT - 2,
write_version: 2000,
account: AccountSharedData::new(300, 100 /*space*/, &owner),
},
);

assert_eq!(chain_data.iter_accounts_rooted().count(), 1);
assert_eq!(
chain_data.account(&my_account).unwrap().account.lamports(),
300
);

// WHEN: update with older data according to write_version
chain_data.update_account(
my_account,
AccountData {
slot: SLOT - 2,
write_version: 1980,
account: AccountSharedData::new(400, 100 /*space*/, &owner),
},
);
assert_eq!(
chain_data.account(&my_account).unwrap().account.lamports(),
300,
"should not overwrite if write_version is older"
);
}

#[test]
fn slotvec_overwrite_newer_write_version() {
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let dummy_account_data = AccountSharedData::new(99999999, 999999, &Pubkey::new_unique());
// 10 - 20 - 30 - 50
let v = given_v1235(dummy_account_data);

assert_eq!(update_slotvec_logic(&v, 20, 20000), Overwrite(1));
}

#[test]
fn slotvec_overwrite_older_write_version() {
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let dummy_account_data = AccountSharedData::new(99999999, 999999, &Pubkey::new_unique());
// 10 - 20 - 30 - 50
let v = given_v1235(dummy_account_data);

assert_eq!(update_slotvec_logic(&v, 20, 999), DoNothing);
}

#[test]
fn slotvec_insert_hole() {
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let dummy_account_data = AccountSharedData::new(99999999, 999999, &Pubkey::new_unique());
// 10 - 20 - 30 - 50
let v = given_v1235(dummy_account_data);

assert_eq!(update_slotvec_logic(&v, 40, 10040), InsertAfter(2));
}

#[test]
fn slotvec_insert_left() {
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let dummy_account_data = AccountSharedData::new(99999999, 999999, &Pubkey::new_unique());
// 10 - 20 - 30 - 50
let v = given_v1235(dummy_account_data);

// insert before first slot (10)
assert_eq!(update_slotvec_logic(&v, 5, 500), Prepend); // OK
}

// this should be the most common case
#[test]
fn slotvec_append() {
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let dummy_account_data = AccountSharedData::new(99999999, 999999, &Pubkey::new_unique());
// 10 - 20 - 30 - 50
let v = given_v1235(dummy_account_data);

assert_eq!(update_slotvec_logic(&v, 90, 50000), InsertAfter(3));
}

// 10 - 20 - 30 - 50
fn given_v1235(dummy_account_data: AccountSharedData) -> Vec<AccountData> {
vec![
AccountData {
slot: 10,
write_version: 10010,
account: dummy_account_data.clone(),
},
AccountData {
slot: 20,
write_version: 10020,
account: dummy_account_data.clone(),
},
AccountData {
slot: 30,
write_version: 10030,
account: dummy_account_data.clone(),
},
// no 40
AccountData {
slot: 50,
write_version: 10050,
account: dummy_account_data.clone(),

Check warning on line 647 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

redundant clone

Check warning on line 647 in connector/src/chain_data.rs

View workflow job for this annotation

GitHub Actions / test

redundant clone
},
]
}
}
Loading