Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple change to reduce memory usage #1668

Merged
merged 5 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 67 additions & 54 deletions raphtory/src/core/entities/edges/edge_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ pub struct EdgeStore {
pub(crate) eid: EID,
pub(crate) src: VID,
pub(crate) dst: VID,
pub(crate) layers: Vec<EdgeLayer>, // each layer has its own set of properties
pub(crate) additions: Vec<TimeIndex<TimeIndexEntry>>,
pub(crate) deletions: Vec<TimeIndex<TimeIndexEntry>>,
pub(crate) data: Vec<EdgeData>,
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
pub struct EdgeData {
pub(crate) layer: EdgeLayer,
pub(crate) additions: TimeIndex<TimeIndexEntry>,
pub(crate) deletions: TimeIndex<TimeIndexEntry>,
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
Expand Down Expand Up @@ -95,32 +100,28 @@ impl EdgeStore {
}

pub fn internal_num_layers(&self) -> usize {
self.layers
.len()
.max(self.additions.len())
.max(self.deletions.len())
self.data.len()
}

fn get_or_allocate_layer(&mut self, layer_id: usize) -> &mut EdgeLayer {
if self.layers.len() <= layer_id {
self.layers.resize_with(layer_id + 1, Default::default);
if self.data.len() <= layer_id {
self.data.resize_with(layer_id + 1, Default::default);
}
&mut self.layers[layer_id]
&mut self.data[layer_id].layer
}

pub fn has_layer_inner(&self, layer_id: usize) -> bool {
self.additions
.get(layer_id)
self.get_additions(layer_id)
.filter(|t_index| !t_index.is_empty())
.is_some()
|| self
.deletions
.get(layer_id)
.get_deletions(layer_id)
.filter(|t_index| !t_index.is_empty())
.is_some()
}

pub fn layer_iter(&self) -> impl Iterator<Item = &EdgeLayer> + '_ {
self.layers.iter()
pub fn layer_iter(&self) -> impl Iterator<Item = &EdgeData> + '_ {
self.data.iter()
}

/// Iterate over (layer_id, additions, deletions) triplets for edge
Expand Down Expand Up @@ -148,14 +149,14 @@ impl EdgeStore {
.into_dyn_boxed(),
LayerIds::One(id) => Box::new(iter::once((
*id,
self.additions.get(*id).unwrap_or(&TimeIndex::Empty),
self.deletions.get(*id).unwrap_or(&TimeIndex::Empty),
self.get_additions(*id).unwrap_or(&TimeIndex::Empty),
self.get_deletions(*id).unwrap_or(&TimeIndex::Empty),
))),
LayerIds::Multiple(ids) => Box::new(ids.iter().map(|id| {
(
*id,
self.additions.get(*id).unwrap_or(&TimeIndex::Empty),
self.deletions.get(*id).unwrap_or(&TimeIndex::Empty),
self.get_additions(*id).unwrap_or(&TimeIndex::Empty),
self.get_deletions(*id).unwrap_or(&TimeIndex::Empty),
)
})),
}
Expand All @@ -167,11 +168,11 @@ impl EdgeStore {
) -> BoxedLIter<'a, &TimeIndex<TimeIndexEntry>> {
match layers {
LayerIds::None => iter::empty().into_dyn_boxed(),
LayerIds::All => self.additions.iter().into_dyn_boxed(),
LayerIds::One(id) => self.additions.get(*id).into_iter().into_dyn_boxed(),
LayerIds::All => self.iter_additions().into_dyn_boxed(),
LayerIds::One(id) => self.get_additions(*id).into_iter().into_dyn_boxed(),
LayerIds::Multiple(ids) => ids
.iter()
.flat_map(|id| self.additions.get(*id))
.flat_map(|id| self.get_additions(*id))
.into_dyn_boxed(),
}
}
Expand All @@ -182,21 +183,20 @@ impl EdgeStore {
) -> BoxedLIter<'a, &TimeIndex<TimeIndexEntry>> {
match layers {
LayerIds::None => iter::empty().into_dyn_boxed(),
LayerIds::All => self.deletions.iter().into_dyn_boxed(),
LayerIds::One(id) => self.deletions.get(*id).into_iter().into_dyn_boxed(),
LayerIds::All => self.iter_deletions().into_dyn_boxed(),
LayerIds::One(id) => self.get_deletions(*id).into_iter().into_dyn_boxed(),
LayerIds::Multiple(ids) => ids
.iter()
.flat_map(|id| self.deletions.get(*id))
.flat_map(|id| self.get_deletions(*id))
.into_dyn_boxed(),
}
}

pub fn layer_ids_window_iter(&self, w: Range<i64>) -> impl Iterator<Item = usize> + '_ {
let layer_ids = self
.additions
.iter()
.iter_additions()
.enumerate()
.zip_longest(self.deletions.iter().enumerate())
.zip_longest(self.iter_deletions().enumerate())
.flat_map(move |e| match e {
EitherOrBoth::Both((i, t1), (_, t2)) => {
if t1.contains(w.clone()) || t2.contains(w.clone()) {
Expand Down Expand Up @@ -229,27 +229,23 @@ impl EdgeStore {
eid: 0.into(),
src,
dst,
layers: Vec::with_capacity(1),
additions: Vec::with_capacity(1),
deletions: Vec::with_capacity(1),
data: Vec::with_capacity(1),
}
}

pub fn layer(&self, layer_id: usize) -> Option<&EdgeLayer> {
self.layers.get(layer_id)
self.data.get(layer_id).map(|data| &data.layer)
}

/// an edge is active in a window if it has an addition event in any of the layers
pub fn active(&self, layer_ids: &LayerIds, w: Range<i64>) -> bool {
match layer_ids {
LayerIds::None => false,
LayerIds::All => self
.additions
.iter()
.iter_additions()
.any(|t_index| t_index.contains(w.clone())),
LayerIds::One(l_id) => self
.additions
.get(*l_id)
.get_additions(*l_id)
.map(|t_index| t_index.contains(w))
.unwrap_or(false),
LayerIds::Multiple(layers) => layers
Expand All @@ -261,72 +257,89 @@ impl EdgeStore {
pub fn last_deletion(&self, layer_ids: &LayerIds) -> Option<TimeIndexEntry> {
match layer_ids {
LayerIds::None => None,
LayerIds::All => self.deletions.iter().flat_map(|d| d.last()).max(),
LayerIds::One(id) => self.deletions.get(*id).and_then(|t| t.last()),
LayerIds::All => self.iter_deletions().flat_map(|d| d.last()).max(),
LayerIds::One(id) => self.get_deletions(*id).and_then(|t| t.last()),
LayerIds::Multiple(ids) => ids
.iter()
.flat_map(|id| self.deletions.get(*id).and_then(|t| t.last()))
.flat_map(|id| self.get_deletions(*id).and_then(|t| t.last()))
.max(),
}
}

pub fn last_addition(&self, layer_ids: &LayerIds) -> Option<TimeIndexEntry> {
match layer_ids {
LayerIds::None => None,
LayerIds::All => self.additions.iter().flat_map(|d| d.last()).max(),
LayerIds::One(id) => self.additions.get(*id).and_then(|t| t.last()),
LayerIds::All => self.iter_additions().flat_map(|d| d.last()).max(),
LayerIds::One(id) => self.get_additions(*id).and_then(|t| t.last()),
LayerIds::Multiple(ids) => ids
.iter()
.flat_map(|id| self.additions.get(*id).and_then(|t| t.last()))
.flat_map(|id| self.get_additions(*id).and_then(|t| t.last()))
.max(),
}
}

pub fn temporal_prop_layer_inner(&self, layer_id: usize, prop_id: usize) -> Option<&TProp> {
self.layers
self.data
.get(layer_id)
.and_then(|layer| layer.temporal_property(prop_id))
.and_then(|layer| layer.layer.temporal_property(prop_id))
}

pub fn layer_mut(&mut self, layer_id: usize) -> impl DerefMut<Target = EdgeLayer> + '_ {
self.get_or_allocate_layer(layer_id)
}

pub fn deletions_mut(&mut self, layer_id: usize) -> &mut TimeIndex<TimeIndexEntry> {
if self.deletions.len() <= layer_id {
self.deletions.resize_with(layer_id + 1, Default::default);
if self.data.len() <= layer_id {
self.data.resize_with(layer_id + 1, Default::default);
}
&mut self.deletions[layer_id]
&mut self.data[layer_id].deletions
}

pub fn additions_mut(&mut self, layer_id: usize) -> &mut TimeIndex<TimeIndexEntry> {
if self.additions.len() <= layer_id {
self.additions.resize_with(layer_id + 1, Default::default);
if self.data.len() <= layer_id {
self.data.resize_with(layer_id + 1, Default::default);
}
&mut self.additions[layer_id]
&mut self.data[layer_id].additions
}

pub(crate) fn temp_prop_ids(
&self,
layer_id: Option<usize>,
) -> Box<dyn Iterator<Item = usize> + '_> {
if let Some(layer_id) = layer_id {
Box::new(self.layers.get(layer_id).into_iter().flat_map(|layer| {
Box::new(self.data.get(layer_id).into_iter().flat_map(|layer| {
layer
.layer
.props()
.into_iter()
.flat_map(|props| props.temporal_prop_ids())
}))
} else {
Box::new(
self.layers
self.data
.iter()
.flat_map(|layer| layer.props().map(|prop| prop.temporal_prop_ids()))
.flat_map(|layer| layer.layer.props().map(|prop| prop.temporal_prop_ids()))
.kmerge()
.dedup(),
)
}
}

pub fn get_additions(&self, layer_id: usize) -> Option<&TimeIndex<TimeIndexEntry>> {
self.data.get(layer_id).map(|data| &data.additions)
}

pub fn get_deletions(&self, layer_id: usize) -> Option<&TimeIndex<TimeIndexEntry>> {
self.data.get(layer_id).map(|data| &data.deletions)
}

pub fn iter_additions(&self) -> impl Iterator<Item = &TimeIndex<TimeIndexEntry>> + '_ {
self.data.iter().map(|data| &data.additions)
}

pub fn iter_deletions(&self) -> impl Iterator<Item = &TimeIndex<TimeIndexEntry>> + '_ {
self.data.iter().map(|data| &data.deletions)
}
}

impl EdgeStorageIntoOps for ArcEntry<EdgeStore> {
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/db/api/storage/edges/edge_storage_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore {
}

fn additions(self, layer_id: usize) -> TimeIndexRef<'a> {
TimeIndexRef::Ref(self.additions.get(layer_id).unwrap_or(&TimeIndex::Empty))
TimeIndexRef::Ref(self.get_additions(layer_id).unwrap_or(&TimeIndex::Empty))
}

fn deletions(self, layer_id: usize) -> TimeIndexRef<'a> {
TimeIndexRef::Ref(self.deletions.get(layer_id).unwrap_or(&TimeIndex::Empty))
TimeIndexRef::Ref(self.get_deletions(layer_id).unwrap_or(&TimeIndex::Empty))
}

fn temporal_prop_layer(self, layer_id: usize, prop_id: usize) -> impl TPropOps<'a> + 'a {
Expand Down
8 changes: 4 additions & 4 deletions raphtory/src/db/internal/core_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ impl CoreGraphOps for InternalGraph {
entry
.layer_iter()
.next()
.and_then(|layer| layer.const_prop(prop_id).cloned())
.and_then(|data| data.layer.const_prop(prop_id).cloned())
} else {
let prop_map: HashMap<_, _> = entry
.layer_iter()
.enumerate()
.flat_map(|(id, layer)| {
layer
.flat_map(|(id, data)| {
data.layer
.const_prop(prop_id)
.map(|p| (self.inner().get_layer_name(id), p.clone()))
})
Expand Down Expand Up @@ -214,7 +214,7 @@ impl CoreGraphOps for InternalGraph {
LayerIds::None => vec![],
LayerIds::All => entry
.layer_iter()
.map(|l| l.const_prop_ids())
.map(|data| data.layer.const_prop_ids())
.kmerge()
.dedup()
.collect(),
Expand Down
Loading