-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: state primitives for the amplifier component (#26)
* feat: trait for amplifier storage * feat: memmap storage implementation * feat: storage path gets derived from config * feat: fix an issue where we can skip tasks if relayer is shut down mid-processing
- Loading branch information
1 parent
ad23e6a
commit 0463033
Showing
20 changed files
with
361 additions
and
59 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,3 +10,4 @@ target/ | |
|
||
*.key | ||
config.toml | ||
store |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
storage_path = "./storage" | ||
[amplifier_component] | ||
# pem format cert | ||
identity = ''' | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
[package] | ||
name = "file-based-storage" | ||
version.workspace = true | ||
authors.workspace = true | ||
repository.workspace = true | ||
homepage.workspace = true | ||
license.workspace = true | ||
edition.workspace = true | ||
|
||
[dependencies] | ||
relayer-amplifier-state.workspace = true | ||
amplifier-api.workspace = true | ||
memmap2.workspace = true | ||
tracing.workspace = true | ||
bytemuck.workspace = true | ||
|
||
[lints] | ||
workspace = true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
#![expect(clippy::allow_attributes_without_reason)] | ||
#![expect(clippy::allow_attributes)] | ||
|
||
//! simple memory storage implementation using memory maps | ||
use std::fs::OpenOptions; | ||
use std::io::{self, Seek as _, SeekFrom, Write as _}; | ||
use std::path::Path; | ||
use std::sync::{Arc, Mutex}; | ||
|
||
use amplifier_api::types::{uuid, TaskItemId}; | ||
use bytemuck::{Pod, Zeroable}; | ||
use memmap2::MmapMut; | ||
|
||
/// Memory map wrapper that implements the state to successfully store and retrieve latest task item | ||
/// id | ||
#[derive(Debug, Clone)] | ||
pub struct MemmapState { | ||
mmap: Arc<Mutex<MmapMut>>, | ||
} | ||
|
||
#[repr(C)] | ||
#[derive(Default, Debug, Copy, Clone, Pod, Zeroable)] | ||
struct InternalState { | ||
latest_queried_task_item_id: u128, | ||
latest_processed_task_item_id: u128, | ||
} | ||
|
||
#[expect( | ||
clippy::expect_used, | ||
clippy::unwrap_in_result, | ||
reason = "irrecoverable error" | ||
)] | ||
impl MemmapState { | ||
/// Creates a new [`MemmapState`] with the memory-mapped file at the given path. | ||
/// | ||
/// # Errors | ||
/// If the file cannot be created / opened | ||
/// | ||
/// # Panics | ||
/// If the expected state of the [`InternalState`] will be larger than `u64` | ||
pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> { | ||
// Open or create the file with read and write permissions | ||
let mut file = OpenOptions::new() | ||
.read(true) | ||
.write(true) | ||
.create(true) | ||
.truncate(false) | ||
.open(path)?; | ||
|
||
// Ensure the file is at least the size of InternalState | ||
let default_state = InternalState::default(); | ||
let default_state_bytes = bytemuck::bytes_of(&default_state); | ||
let expected_len = default_state_bytes | ||
.len() | ||
.try_into() | ||
.expect("the size of default state must fit in a u64"); | ||
if file.metadata()?.len() < expected_len { | ||
file.set_len(expected_len)?; | ||
file.seek(SeekFrom::Start(0))?; | ||
file.write_all(default_state_bytes)?; | ||
} | ||
|
||
// Create a mutable memory map of the file | ||
// SAFETY: | ||
// we ensured that the size is large enough | ||
let mmap = unsafe { MmapMut::map_mut(&file)? }; | ||
mmap.flush_async()?; | ||
|
||
Ok(Self { | ||
mmap: Arc::new(Mutex::new(mmap)), | ||
}) | ||
} | ||
|
||
// Generic helper function for getting a TaskItemId | ||
fn get_task_item_id<F>(&self, field_accessor: F) -> Option<TaskItemId> | ||
where | ||
F: Fn(&InternalState) -> u128, | ||
{ | ||
let mmap = self.mmap.lock().expect("lock should not be poisoned"); | ||
let data = bytemuck::from_bytes::<InternalState>(&mmap[..]); | ||
let task_item_id = field_accessor(data); | ||
drop(mmap); | ||
|
||
if task_item_id == 0 { | ||
None | ||
} else { | ||
Some(TaskItemId(uuid::Uuid::from_u128(task_item_id))) | ||
} | ||
} | ||
|
||
// Generic helper function for setting a TaskItemId | ||
fn set_task_item_id<F>( | ||
&self, | ||
task_item_id: &TaskItemId, | ||
field_mutator: F, | ||
) -> Result<(), io::Error> | ||
where | ||
F: Fn(&mut InternalState, u128), | ||
{ | ||
let mut mmap = self.mmap.lock().expect("lock should not be poisoned"); | ||
let raw_u128 = task_item_id.0.as_u128(); | ||
let data = bytemuck::from_bytes_mut::<InternalState>(&mut mmap[..]); | ||
field_mutator(data, raw_u128); | ||
mmap.flush_async()?; | ||
drop(mmap); | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl relayer_amplifier_state::State for MemmapState { | ||
type Err = io::Error; | ||
|
||
#[tracing::instrument(skip(self), level = "trace", ret)] | ||
fn latest_queried_task_id(&self) -> Option<TaskItemId> { | ||
tracing::trace!("getting latest queried task item id"); | ||
self.get_task_item_id(|data| data.latest_queried_task_item_id) | ||
} | ||
|
||
#[tracing::instrument(skip(self), err)] | ||
fn set_latest_queried_task_id(&self, task_item_id: TaskItemId) -> Result<(), Self::Err> { | ||
tracing::info!("updating latest queried task item id"); | ||
self.set_task_item_id(&task_item_id, |data, value| { | ||
data.latest_queried_task_item_id = value; | ||
}) | ||
} | ||
|
||
#[tracing::instrument(skip(self), level = "trace", ret)] | ||
fn latest_processed_task_id(&self) -> Option<TaskItemId> { | ||
tracing::trace!("getting latest processed task item id"); | ||
self.get_task_item_id(|data| data.latest_processed_task_item_id) | ||
} | ||
|
||
#[tracing::instrument(skip(self), err)] | ||
fn set_latest_processed_task_id(&self, task_item_id: TaskItemId) -> Result<(), Self::Err> { | ||
tracing::info!("updating latest processed task item id"); | ||
self.set_task_item_id(&task_item_id, |data, value| { | ||
data.latest_processed_task_item_id = value; | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.