diff --git a/taskchampion/taskchampion/src/server/cloud/cloudstorage.rs b/taskchampion/taskchampion/src/server/cloud/cloudstorage.rs new file mode 100644 index 000000000..94174b9cb --- /dev/null +++ b/taskchampion/taskchampion/src/server/cloud/cloudstorage.rs @@ -0,0 +1,30 @@ +use crate::errors::Result; + +/// An abstraction of a cloud-storage service. +/// +/// The underlying cloud storage is assumed to be a map from object names to object values, +/// similar to a HashMap, with the addition of a compare-and-swap operation. Object names +/// are always simple strings from the character set `[a-zA-Z0-9-]`, no more than 100 characters +/// in length. +pub(in crate::server::cloud) trait CloudStorage { + /// Put an object into cloud storage. If the object exists, it is overwritten. + fn put(&mut self, name: &[u8], value: &[u8]) -> Result<()>; + + /// Get an object from cloud storage, or None if the object does not exist. + fn get(&mut self, name: &[u8]) -> Result>>; + + /// Delete an object. Does nothing if the object does not exist. + fn del(&mut self, name: &[u8]) -> Result<()>; + + /// Enumerate objects with the given prefix. + fn list<'a>(&'a mut self, prefix: &'a [u8]) -> Result + 'a>>; + + /// Compare the existing object with `existing_value`, and replace with `new_value` only if the + /// values match. Returns true if the replacement occurred. + fn compare_and_swap( + &mut self, + name: &[u8], + existing_value: Option>, + new_value: Vec, + ) -> Result; +} diff --git a/taskchampion/taskchampion/src/server/cloud/mod.rs b/taskchampion/taskchampion/src/server/cloud/mod.rs new file mode 100644 index 000000000..73e39dce3 --- /dev/null +++ b/taskchampion/taskchampion/src/server/cloud/mod.rs @@ -0,0 +1,11 @@ +/*! +* Support for cloud-storage-backed sync. +* +* All of these operate using a similar approach, with specific patterns of object names. The +* process of adding a new version requires a compare-and-swap operation that sets a new version +* as the "latest" only if the existing "latest" has the expected value. This ensures a continuous +* chain of versions, even if multiple replicas attempt to sync at the same time. +*/ + +mod cloudstorage; +mod server; diff --git a/taskchampion/taskchampion/src/server/cloud/server.rs b/taskchampion/taskchampion/src/server/cloud/server.rs new file mode 100644 index 000000000..0d8d9a366 --- /dev/null +++ b/taskchampion/taskchampion/src/server/cloud/server.rs @@ -0,0 +1,548 @@ +#![allow(unused_variables, dead_code)] +use super::cloudstorage::CloudStorage; +use crate::errors::{Error, Result}; +use crate::server::{ + AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency, + VersionId, +}; +use std::collections::HashMap; +use uuid::Uuid; + +/* +Proposal: + v-PARENT-VERSION: + latest: latest uuid + +no conflict between v-PARENT-VERSION objects +get_child_version(parent): search for `v-PARENT-*` - but might be multiple! +add_version(parent, data): read "latest", check equal to parent, invent VERSION, create v-PARENT-VERSION, CAS "latest" + +Can we combine "create" and "CAS"? Or deal with the multples? +Maybe read the whole chain of versions (just need to list objects) to build in-memory map.. + + +# CAS + +compare_and_swap(name, old, new) + +S3: impossible + +Azure: + - read(name) + - if content != old, fail + - if not found, write new value unconditionally, start over with compare_and_swap(name, new, new) + - write with condition on etag from first step + +GCP: + - read(name) + - if content != old, fail + - write with condition on generation, or zero if not found +*/ + +/// Implement the Server trait for a given cloud-storage backend. +/// +/// ## Object Organization +/// +/// UUIDs emebedded in names appear in their "simple" form: lower-case hexadecimal with no hyphens. +/// +/// Versions are stored as objects with name `v-PARENT-VERSION` where `VERSION` is the version's +/// UUID and `PARENT` is the parent version's UUID. The object value is the raw history segment. +/// These objects are created with simple `put` requests, as the name uniquely identifies the +/// content. +/// +/// The latest version is stored as an object with name "latest", containing the UUID of the latest +/// version. This file is updated with `compare_and_swap`. After a successful update of this +/// object, the version is considered committed. +/// +/// Since there are no strong constraints on creation of version objects, it is possible +/// to have multiple such files with the same `PARENT`. However, only one such object will be +/// contained in the chain of parent-child relationships beginning with the value in "latest". +/// All other objects are invalid and not visible outside this type. +/// +/// Snapshots are stored as objects with name `s-VERSION` where `VERSION` is the version at which +/// the snapshot was made. These objects are created with simple `put` requests, as any snapshot +/// for a given version is interchangeable with any other. +struct CloudServer { + backend: CS, +} + +const LATEST: &[u8] = b"latest"; + +impl CloudServer { + pub(in crate::server) fn new(backend: CS) -> Self { + Self { backend } + } + + /// Generate an object name for the given parent and child versions. + fn version_name(parent_version_id: &VersionId, child_version_id: &VersionId) -> Vec { + format!( + "v-{}-{}", + parent_version_id.as_simple(), + child_version_id.as_simple() + ) + .into_bytes() + } + + fn parse_version_name(name: &[u8]) -> Option<(VersionId, VersionId)> { + if name.len() != 2 + 32 + 1 + 32 || !name.starts_with(b"v-") || name[2 + 32] != b'-' { + return None; + } + let Ok(parent_version_id) = VersionId::try_parse_ascii(&name[2..2 + 32]) else { + return None; + }; + let Ok(child_version_id) = VersionId::try_parse_ascii(&name[2 + 32 + 1..]) else { + return None; + }; + Some((parent_version_id, child_version_id)) + } + + /// Get the version from "latest", or None if the object does not exist. This always fetches a fresh + /// value from storage. + fn get_latest(&mut self) -> Result> { + let Some(latest) = self.backend.get(LATEST)? else { + return Ok(None); + }; + let latest = VersionId::try_parse_ascii(&latest) + .map_err(|_| Error::Server("'latest' object contains invalid data".into()))?; + Ok(Some(latest)) + } + + /// Get the possible child versions of the given parent version, based only on the object + /// names. + fn get_child_versions(&mut self, parent_version_id: &VersionId) -> Result> { + Ok(self + .backend + .list(format!("v-{}-", parent_version_id.as_simple()).as_bytes())? + .filter_map(|name| { + let Some((_, c)) = Self::parse_version_name(name) else { + return None; + }; + Some(c) + }) + .collect()) + } + + /// Get the chain of versions starting at `latest` and proceeding as far as + /// versions exist in the backend. + #[allow(dead_code)] // This code may be useful for implementing cleanup. + fn get_chain(&mut self, latest: VersionId) -> Result> { + // Construct a vector containing all (child, parent) pairs + let mut pairs: Vec<_> = self + .backend + .list(b"v-")? + .filter_map(|name| { + let Some((p, c)) = Self::parse_version_name(name) else { + return None; + }; + Some((c, p)) + }) + .collect(); + pairs.sort(); + + // Function to find the parent of a given version in `pairs` + let parent_of = |c| match pairs.binary_search_by_key(&c, |pair| pair.0) { + Ok(idx) => Some(pairs[idx].1), + Err(_) => None, + }; + + let mut chain = HashMap::new(); + let mut v = latest; + loop { + match parent_of(v) { + Some(p) => { + chain.insert(p, v); + v = p; + } + None => break, + } + } + + Ok(chain) + } +} + +impl Server for CloudServer { + fn add_version( + &mut self, + parent_version_id: VersionId, + history_segment: HistorySegment, + ) -> crate::errors::Result<(AddVersionResult, SnapshotUrgency)> { + let latest = self.get_latest()?; + if let Some(l) = latest { + if l != parent_version_id { + return Ok(( + AddVersionResult::ExpectedParentVersion(l), + SnapshotUrgency::None, + )); + } + } + + // Invent a new version ID and upload the version data. + let version_id = VersionId::new_v4(); + self.backend.put( + &Self::version_name(&parent_version_id, &version_id), + &history_segment, + )?; + + // Try to compare-and-swap this value into LATEST + let old_value = latest.map(|l| l.as_simple().to_string().into_bytes()); + let new_value = version_id.as_simple().to_string().into_bytes(); + if !self + .backend + .compare_and_swap(LATEST, old_value, new_value)? + { + let latest = self.get_latest()?; + // "latest" should exist now, since compare_and_swap failed. + let latest = latest.unwrap(); + return Ok(( + AddVersionResult::ExpectedParentVersion(latest), + SnapshotUrgency::None, + )); + } + + return Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None)); + } + + fn get_child_version( + &mut self, + parent_version_id: VersionId, + ) -> crate::errors::Result { + // The `get_child_versions` function will usually return only one child version for a + // parent, in which case the work is easy. Otherwise, if there are several possible + // children, only one of those will lead to `latest`, and importantly the others will not + // have their own children. So we can detect the "true" child as the one that is equal to + // "latest" or has children. + let version_id = match &(self.get_child_versions(&parent_version_id)?)[..] { + [] => return Ok(GetVersionResult::NoSuchVersion), + [child] => *child, + children => { + let latest = self.get_latest()?; + let mut true_child = None; + for child in children { + if Some(*child) == latest { + true_child = Some(*child); + break; + } + } + if true_child.is_none() { + for child in children { + if self.get_child_versions(&child)?.len() > 0 { + true_child = Some(*child) + } + } + } + match true_child { + Some(true_child) => true_child, + None => return Ok(GetVersionResult::NoSuchVersion), + } + } + }; + + let Some(history_segment) = self + .backend + .get(&Self::version_name(&parent_version_id, &version_id))? + else { + // This really shouldn't happen, since the chain was derived from object names, but + // perhaps the object was deleted. + return Ok(GetVersionResult::NoSuchVersion); + }; + Ok(GetVersionResult::Version { + version_id, + parent_version_id, + history_segment, + }) + } + + fn add_snapshot( + &mut self, + version_id: VersionId, + snapshot: Snapshot, + ) -> crate::errors::Result<()> { + todo!() + } + + fn get_snapshot(&mut self) -> crate::errors::Result> { + todo!() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::server::NIL_VERSION_ID; + + #[derive(Default)] + struct MockStorage(HashMap, Vec>); + + impl MockStorage { + fn add_version(&mut self, parent: VersionId, child: VersionId, data: &[u8]) { + let name = CloudServer::::version_name(&parent, &child); + self.0.insert(name, data.to_vec()); + } + + fn set_latest(&mut self, latest: VersionId) { + let latest = latest.as_simple().to_string().into_bytes(); + self.0.insert(LATEST.to_vec(), latest); + } + } + + impl CloudStorage for MockStorage { + fn put(&mut self, name: &[u8], value: &[u8]) -> Result<()> { + self.0.insert(name.to_vec(), value.to_vec()); + Ok(()) + } + + fn get(&mut self, name: &[u8]) -> Result>> { + Ok(self.0.get(name).cloned()) + } + + fn del(&mut self, name: &[u8]) -> Result<()> { + self.0.remove(name); + Ok(()) + } + + fn compare_and_swap( + &mut self, + name: &[u8], + existing_value: Option>, + new_value: Vec, + ) -> Result { + if self.0.get(name) == existing_value.as_ref() { + self.0.insert(name.to_vec(), new_value); + return Ok(true); + } + Ok(false) + } + + fn list<'a>( + &'a mut self, + prefix: &'a [u8], + ) -> Result + 'a>> { + Ok(Box::new( + self.0 + .iter() + .filter(move |(k, _)| k.starts_with(prefix)) + .map(|(k, _)| k.as_ref()), + )) + } + } + + #[test] + fn version_name() { + let p = Uuid::parse_str("a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8").unwrap(); + let c = Uuid::parse_str("adcf4e350fa54e4aaf9d3f20f3ba5a32").unwrap(); + assert_eq!( + CloudServer::::version_name(&p, &c), + b"v-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8-adcf4e350fa54e4aaf9d3f20f3ba5a32" + ); + } + + #[test] + fn version_name_round_trip() { + let p = Uuid::new_v4(); + let c = Uuid::new_v4(); + assert_eq!( + CloudServer::::parse_version_name( + &CloudServer::::version_name(&p, &c) + ), + Some((p, c)) + ); + } + + #[test] + fn parse_version_name_bad_prefix() { + assert_eq!( + CloudServer::::parse_version_name( + b"X-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8-adcf4e350fa54e4aaf9d3f20f3ba5a32" + ), + None + ); + } + + #[test] + fn parse_version_name_bad_separator() { + assert_eq!( + CloudServer::::parse_version_name( + b"v-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8xadcf4e350fa54e4aaf9d3f20f3ba5a32" + ), + None + ); + } + + #[test] + fn parse_version_name_too_short() { + assert_eq!( + CloudServer::::parse_version_name( + b"v-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8-adcf4e350fa54e4aaf9d3f20f3ba5a3" + ), + None + ); + } + + #[test] + fn parse_version_name_too_long() { + assert_eq!( + CloudServer::::parse_version_name( + b"v-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8-adcf4e350fa54e4aaf9d3f20f3ba5a320" + ), + None + ); + } + + #[test] + fn get_latest_empty() { + let mock = MockStorage::default(); + let mut server = CloudServer::new(mock); + assert_eq!(server.get_latest().unwrap(), None); + } + + #[test] + fn get_latest_exists() { + let latest = Uuid::new_v4(); + let mut mock = MockStorage::default(); + mock.set_latest(latest); + let mut server = CloudServer::new(mock); + assert_eq!(server.get_latest().unwrap(), Some(latest)); + } + + #[test] + fn get_latest_invalid() { + let mut mock = MockStorage::default(); + mock.0.insert(LATEST.to_vec(), b"not-a-uuid".to_vec()); + let mut server = CloudServer::new(mock); + assert!(server.get_latest().is_err()); + } + + #[test] + fn get_child_versions_empty() { + let mock = MockStorage::default(); + let mut server = CloudServer::new(mock); + assert_eq!(server.get_child_versions(&Uuid::new_v4()).unwrap(), vec![]); + } + + #[test] + fn get_child_versions_single() { + let mut mock = MockStorage::default(); + let (v1, v2) = (Uuid::new_v4(), Uuid::new_v4()); + mock.add_version(v2, v1, b"first"); + let mut server = CloudServer::new(mock); + assert_eq!(server.get_child_versions(&v1).unwrap(), vec![]); + assert_eq!(server.get_child_versions(&v2).unwrap(), vec![v1]); + } + + #[test] + fn get_child_versions_multiple() { + let mut mock = MockStorage::default(); + let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()); + mock.add_version(v3, v1, b"first"); + mock.add_version(v3, v2, b"second"); + let mut server = CloudServer::new(mock); + assert_eq!(server.get_child_versions(&v1).unwrap(), vec![]); + assert_eq!(server.get_child_versions(&v2).unwrap(), vec![]); + let versions = server.get_child_versions(&v3).unwrap(); + assert!(versions == vec![v1, v2] || versions == vec![v2, v1]); + } + + #[test] + fn get_child_version_empty() { + let mock = MockStorage::default(); + let mut server = CloudServer::new(mock); + assert_eq!( + server.get_child_version(Uuid::new_v4()).unwrap(), + GetVersionResult::NoSuchVersion + ); + } + + #[test] + fn get_child_version_single() { + let mut mock = MockStorage::default(); + let (v1, v2) = (Uuid::new_v4(), Uuid::new_v4()); + mock.add_version(v2, v1, b"first"); + let mut server = CloudServer::new(mock); + assert_eq!( + server.get_child_version(v1).unwrap(), + GetVersionResult::NoSuchVersion + ); + assert_eq!( + server.get_child_version(v2).unwrap(), + GetVersionResult::Version { + version_id: v1, + parent_version_id: v2, + history_segment: b"first".to_vec(), + } + ); + } + + #[test] + fn get_child_version_multiple() { + let mut mock = MockStorage::default(); + let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()); + let (vx, vy, vz) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()); + mock.add_version(v1, v2, b"second"); + mock.add_version(v1, vx, b"false start x"); + mock.add_version(v1, vy, b"false start y"); + mock.add_version(v2, v3, b"third"); + mock.add_version(v2, vz, b"false start z"); + mock.set_latest(v3); + let mut server = CloudServer::new(mock); + assert_eq!( + server.get_child_version(v1).unwrap(), + GetVersionResult::Version { + version_id: v2, + parent_version_id: v1, + history_segment: b"second".to_vec(), + } + ); + assert_eq!( + server.get_child_version(v2).unwrap(), + GetVersionResult::Version { + version_id: v3, + parent_version_id: v2, + history_segment: b"third".to_vec(), + } + ); + assert_eq!( + server.get_child_version(v3).unwrap(), + GetVersionResult::NoSuchVersion + ); + } + + #[test] + fn get_chain_empty() { + let mock = MockStorage::default(); + let mut server = CloudServer::new(mock); + let chain = server.get_chain(Uuid::new_v4()).unwrap(); + assert_eq!(chain, HashMap::new()); + } + + #[test] + fn get_chain_linear() { + let mut mock = MockStorage::default(); + let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()); + mock.add_version(NIL_VERSION_ID, v1, b"first"); + mock.add_version(v1, v2, b"second"); + mock.add_version(v2, v3, b"third"); + mock.set_latest(v3); + let mut server = CloudServer::new(mock); + let chain = server.get_chain(v3).unwrap(); + assert_eq!( + chain, + HashMap::from([(NIL_VERSION_ID, v1), (v1, v2), (v2, v3)]) + ); + } + + #[test] + fn get_chain_extra_branches() { + let mut mock = MockStorage::default(); + let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()); + let (vx, vy) = (Uuid::new_v4(), Uuid::new_v4()); + mock.add_version(v1, v2, b"second"); + mock.add_version(v1, vx, b"false start x"); + mock.add_version(v2, v3, b"third"); + mock.add_version(v2, vy, b"false start y"); + mock.set_latest(v3); + let mut server = CloudServer::new(mock); + let chain = server.get_chain(v3).unwrap(); + assert_eq!(chain, HashMap::from([(v1, v2), (v2, v3)])); + } +} diff --git a/taskchampion/taskchampion/src/server/config.rs b/taskchampion/taskchampion/src/server/config.rs index 5b15a6d63..90a094d31 100644 --- a/taskchampion/taskchampion/src/server/config.rs +++ b/taskchampion/taskchampion/src/server/config.rs @@ -1,6 +1,7 @@ use super::types::Server; -use super::{LocalServer, RemoteServer}; use crate::errors::Result; +use crate::server::local::LocalServer; +use crate::server::remote::RemoteServer; use std::path::PathBuf; use uuid::Uuid; diff --git a/taskchampion/taskchampion/src/server/mod.rs b/taskchampion/taskchampion/src/server/mod.rs index f97b9181b..d80af5a49 100644 --- a/taskchampion/taskchampion/src/server/mod.rs +++ b/taskchampion/taskchampion/src/server/mod.rs @@ -11,6 +11,7 @@ However, users who wish to implement their own server interfaces can implement t #[cfg(test)] pub(crate) mod test; +mod cloud; mod config; mod crypto; mod local; @@ -19,8 +20,6 @@ mod remote; mod types; pub use config::ServerConfig; -pub use local::LocalServer; -pub use remote::RemoteServer; pub use types::*; pub(crate) use op::SyncOp; diff --git a/taskchampion/taskchampion/src/server/types.rs b/taskchampion/taskchampion/src/server/types.rs index 5588a720e..8929ae23b 100644 --- a/taskchampion/taskchampion/src/server/types.rs +++ b/taskchampion/taskchampion/src/server/types.rs @@ -1,7 +1,7 @@ use crate::errors::Result; use uuid::Uuid; -/// Versions are referred to with sha2 hashes. +/// Versions are referred to with UUIDs. pub type VersionId = Uuid; /// The distinguished value for "no version" @@ -52,6 +52,11 @@ pub enum GetVersionResult { /// A value implementing this trait can act as a server against which a replica can sync. pub trait Server { /// Add a new version. + /// + /// This must ensure that the new version is the only version with the given + /// `parent_version_id`, and that all versions form a single parent-child chain. Inductively, + /// this means that if there are any versions on the server, then `parent_version_id` must be + /// the only version that does not already have a child. fn add_version( &mut self, parent_version_id: VersionId,