Skip to content

Commit

Permalink
Fix WAL location and deserialzation
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaskrause committed Dec 30, 2024
1 parent c77bf4e commit df42d67
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `UpdateEvent` now implements `PartialEq` to make possible to compare changes.

### Fixed

- Deserializing a write-ahead log failed because it was located at the wrong
sub-directory and the deserialization routine for the map had a bug.

## [3.5.1] - 2024-09-25

### Fixed
Expand Down
14 changes: 8 additions & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,36 @@ bincode = "1.2"
clru = "0.6.1"
itertools = "0.10"
lazy_static = "1.4"
toml = "0.8"
log = "0.4"
memmap2 = "0.9"
normpath = "1.1.1"
num-traits = "0.2"
percent-encoding = "2.1"
quick-xml = "0.28"
rand = { version = "0.8", features = ["small_rng"] }
rayon = { version = "1.3", default-features = false }
rand = {version = "0.8", features = ["small_rng"]}
rayon = {version = "1.3", default-features = false}
regex = "1"
regex-syntax = "0.8"
rustc-hash = "1.0"
serde = { version = "1.0", features = ["rc"] }
serde = {version = "1.0", features = ["rc"]}
serde_bytes = "0.11"
serde_derive = "1.0"
smallvec = "1.6"
smartstring = { version = "1", features = ["serde"] }
smartstring = {version = "1", features = ["serde"]}
sstable = "0.11"
strum = "0.21"
strum_macros = "0.21"
tempfile = "3.1"
thiserror = "1"
toml = "0.8"
transient-btree-index = "0.5"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["heapapi"] }
winapi = {version = "0.3", features = ["heapapi"]}

[dev-dependencies]
env_logger = "0.9"
fake = "2.2"
insta = {version = "1.38.0", features = ["json"]}
pretty_assertions = "1.3"
serde_json = "1.0"
60 changes: 59 additions & 1 deletion core/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl<CT: ComponentType> Graph<CT> {
std::fs::create_dir_all(&current_path)?;

// If successfull write log
let log_path = location.join("update_log.bin");
let log_path = current_path.join("update_log.bin");

// Create a temporary directory in the same file system as the output
let temporary_dir = tempfile::tempdir_in(&current_path)?;
Expand Down Expand Up @@ -1155,4 +1155,62 @@ mod tests {
db.ensure_loaded_parallel(&[component]).unwrap();
assert_eq!(0, db.components.len());
}

#[test]
fn load_with_wal_file() {
let mut db = Graph::<DefaultComponentType>::new(false).unwrap();
let example_node = 0;
db.node_annos
.insert(
example_node,
Annotation {
key: NODE_TYPE_KEY.as_ref().clone(),
val: "corpus".into(),
},
)
.unwrap();
db.node_annos
.insert(
example_node,
Annotation {
key: NODE_NAME_KEY.as_ref().clone(),
val: "root".into(),
},
)
.unwrap();

let tmp = tempfile::tempdir().unwrap();
// Save and remember the location, so that updates are recorded in a WAL
// file
db.persist_to(tmp.path()).unwrap();

// Add an node annotation with apply_update
let mut u = GraphUpdate::new();
u.add_event(UpdateEvent::AddNodeLabel {
node_name: "root".into(),
anno_ns: "example".into(),
anno_name: "anno-name".into(),
anno_value: "anno-value".into(),
})
.unwrap();
db.apply_update(&mut u, |_| {}).unwrap();

std::mem::drop(db);

// Check that loading the database again contains the changes
let mut db = Graph::<DefaultComponentType>::new(false).unwrap();
db.load_from(tmp.path(), true).unwrap();
let anno_value = db
.node_annos
.get_value_for_item(
&example_node,
&AnnoKey {
name: "anno-name".into(),
ns: "example".into(),
},
)
.unwrap()
.unwrap();
assert_eq!("anno-value", anno_value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
source: core/src/graph/update.rs
expression: seralized_string
---
{
"1": {
"AddNode": {
"node_name": "parent",
"node_type": "corpus"
}
},
"2": {
"AddNode": {
"node_name": "child",
"node_type": "corpus"
}
},
"3": {
"AddEdge": {
"source_node": "child",
"target_node": "parent",
"layer": "annis",
"component_type": "PartOf",
"component_name": ""
}
}
}
116 changes: 112 additions & 4 deletions core/src/graph/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,7 @@ impl<'de> Visitor<'de> for GraphUpdateVisitor {

let mut event_counter = 0;

while let Some((id, event)) = access
.next_entry::<u64, GraphUpdate>()
.map_err(M::Error::custom)?
{
while let Some((id, event)) = access.next_entry::<u64, UpdateEvent>()? {
event_counter = id;
let key = id.create_key();
let value = serialization.serialize(&event).map_err(M::Error::custom)?;
Expand Down Expand Up @@ -338,3 +335,114 @@ impl<'de> Deserialize<'de> for GraphUpdate {
deserializer.deserialize_map(GraphUpdateVisitor {})
}
}

#[cfg(test)]
mod tests {

use insta::assert_snapshot;

use super::*;

#[test]
fn serialize_deserialize_bincode() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_bytes: Vec<u8> = bincode::serialize(&updates).unwrap();
let deseralized_update: GraphUpdate = bincode::deserialize(&seralized_bytes).unwrap();

assert_eq!(3, deseralized_update.len().unwrap());
let deseralized_events: Vec<UpdateEvent> = deseralized_update
.iter()
.unwrap()
.map(|e| e.unwrap().1)
.collect();
assert_eq!(example_updates, deseralized_events);
}

#[test]
fn serialize_json() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_string = serde_json::to_string_pretty(&updates).unwrap();
assert_snapshot!(seralized_string);
}

#[test]
fn serialize_deserialize_json() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_string = serde_json::to_string_pretty(&updates).unwrap();
let deseralized_update: GraphUpdate = serde_json::from_str(&seralized_string).unwrap();

assert_eq!(3, deseralized_update.len().unwrap());
let deseralized_events: Vec<UpdateEvent> = deseralized_update
.iter()
.unwrap()
.map(|e| e.unwrap().1)
.collect();
assert_eq!(example_updates, deseralized_events);
}
}

0 comments on commit df42d67

Please sign in to comment.