Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
node: Type-safe timestamps
Browse files Browse the repository at this point in the history
Use a struct to represent timestamps, to improve type safety.
  • Loading branch information
cloudhead committed Apr 24, 2024
1 parent 0fcfbb3 commit 2b77192
Show file tree
Hide file tree
Showing 16 changed files with 229 additions and 99 deletions.
6 changes: 3 additions & 3 deletions radicle-cli/tests/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@ fn rad_clone_partial_fail() {
node::Features::SEED,
Alias::new("carol"),
0,
localtime::LocalTime::now().as_secs(),
localtime::LocalTime::now().into(),
[node::KnownAddress::new(
// Eve will fail to connect to this address.
node::Address::from(net::SocketAddr::from(([0, 0, 0, 0], 19873))),
Expand All @@ -1231,7 +1231,7 @@ fn rad_clone_partial_fail() {
.unwrap();
eve.db
.routing_mut()
.insert([&acme], carol, localtime::LocalTime::now().as_secs())
.insert([&acme], carol, localtime::LocalTime::now().into())
.unwrap();
eve.config.peers = node::config::PeerConfig::Static;

Expand Down Expand Up @@ -1263,7 +1263,7 @@ fn rad_clone_connect() {
let bob = environment.node(Config::test(Alias::new("bob")));
let mut eve = environment.node(Config::test(Alias::new("eve")));
let acme = RepoId::from_str("z42hL2jL4XNk6K8oHQaSWfMgCL7ji").unwrap();
let now = localtime::LocalTime::now().as_secs();
let now = localtime::LocalTime::now().into();

fixtures::repository(working.join("acme"));

Expand Down
8 changes: 3 additions & 5 deletions radicle-node/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ impl Runtime {
// If our announcement was made some time ago, the timestamp on it will be old,
// and it might not get gossiped to new nodes since it will be purged from caches.
// Therefore, we make sure it's never too old.
if clock.as_millis() - ann.timestamp
<= config.limits.gossip_max_age.as_millis() as u64
{
if clock - ann.timestamp.to_local_time() <= config.limits.gossip_max_age {
Some(ann)
} else {
None
Expand All @@ -202,7 +200,7 @@ impl Runtime {
);
ann
} else {
service::gossip::node(&config, clock.as_millis())
service::gossip::node(&config, clock.into())
.solve(Default::default())
.expect("Runtime::init: unable to solve proof-of-work puzzle")
};
Expand All @@ -218,7 +216,7 @@ impl Runtime {
radicle::node::Features::SEED,
alias,
0,
clock.as_millis(),
clock.into(),
[node::KnownAddress::new(addr, address::Source::Bootstrap)],
)?;
}
Expand Down
41 changes: 21 additions & 20 deletions radicle-node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,10 @@ pub struct Service<D, S, G> {
last_sync: LocalTime,
/// Last time the service routing table was pruned.
last_prune: LocalTime,
/// Last time the service announced something.
last_timestamp: LocalTime,
/// Last time the inventory was announced.
last_announce: LocalTime,
/// Last timestamp used for announcements.
last_timestamp: Timestamp,
/// Time when the service was initialized, or `None` if it wasn't initialized.
started_at: Option<LocalTime>,
/// Publishes events to subscribers.
Expand Down Expand Up @@ -457,7 +457,7 @@ where
last_idle: LocalTime::default(),
last_sync: LocalTime::default(),
last_prune: LocalTime::default(),
last_timestamp: LocalTime::default(),
last_timestamp: Timestamp::MIN,
last_announce: LocalTime::default(),
started_at: None,
emitter,
Expand Down Expand Up @@ -601,7 +601,7 @@ where
// all of it. It can happen that inventory is not properly seeded if for eg. the
// user creates a new repository while the node is stopped.
let rids = self.storage.inventory()?;
self.db.routing_mut().insert(&rids, nid, time.as_millis())?;
self.db.routing_mut().insert(&rids, nid, time.into())?;

let announced = self
.db
Expand Down Expand Up @@ -632,7 +632,7 @@ where
&rid,
&nid,
updated_at.oid,
updated_at.timestamp.as_millis(),
updated_at.timestamp.into(),
)? {
debug!(target: "service", "Saved local sync status for {rid}..");
}
Expand Down Expand Up @@ -722,7 +722,7 @@ where
if let Err(err) = self
.db
.gossip_mut()
.prune((now - self.config.limits.gossip_max_age).as_millis())
.prune((now - self.config.limits.gossip_max_age).into())
{
error!(target: "service", "Error pruning gossip entries: {err}");
}
Expand Down Expand Up @@ -782,7 +782,7 @@ where

// Let all our peers know that we're interested in this repo from now on.
self.outbox.broadcast(
Message::subscribe(self.filter(), self.clock.as_millis(), Timestamp::MAX),
Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
self.sessions.connected().map(|(_, s)| s),
);
}
Expand Down Expand Up @@ -1047,7 +1047,7 @@ where
info!(target: "service", "Fetched {rid} from {remote} successfully");
// Update our routing table in case this fetch was user-initiated and doesn't
// come from an announcement.
self.seed_discovered(rid, remote, self.clock.as_millis());
self.seed_discovered(rid, remote, self.clock.into());

for update in &updated {
if update.is_skipped() {
Expand Down Expand Up @@ -1181,7 +1181,7 @@ where
if let Err(e) =
self.db
.addresses_mut()
.connected(&remote, &peer.addr, self.clock.as_millis())
.connected(&remote, &peer.addr, self.clock.into())
{
error!(target: "service", "Error updating address book with connection: {e}");
}
Expand Down Expand Up @@ -1782,8 +1782,8 @@ where
// If this is our first connection to the network, we just ask for a fixed backlog
// of messages to get us started.
let since = match self.db.gossip().last() {
Ok(Some(last)) => last - MAX_TIME_DELTA.as_millis() as Timestamp,
Ok(None) => (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).as_millis() as Timestamp,
Ok(Some(last)) => Timestamp::from(last.to_local_time() - MAX_TIME_DELTA),
Ok(None) => (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into(),
Err(e) => {
error!(target: "service", "Error getting the lastest gossip message from storage: {e}");
return vec![];
Expand Down Expand Up @@ -1811,7 +1811,7 @@ where
/// Update our routing table with our local node's inventory.
fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
let inventory = self.storage.inventory()?;
let result = self.sync_routing(inventory, self.node_id(), self.clock.as_millis())?;
let result = self.sync_routing(inventory, self.node_id(), self.clock.into())?;

Ok(result)
}
Expand Down Expand Up @@ -1914,7 +1914,7 @@ where
&r.remote,
&SIGREFS_BRANCH,
r.at,
LocalTime::from_millis(timestamp as u128),
timestamp.to_local_time(),
) {
error!(
target: "service",
Expand Down Expand Up @@ -2013,7 +2013,7 @@ where
return false;
}
let persistent = self.config.is_persistent(&nid);
let timestamp = self.clock().as_millis();
let timestamp: Timestamp = self.clock.into();

if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
error!(target: "service", "Error updating address book with connection attempt: {e}");
Expand Down Expand Up @@ -2091,12 +2091,13 @@ where
/// Get a timestamp for using in announcements.
/// Never returns the same timestamp twice.
fn timestamp(&mut self) -> Timestamp {
if self.clock > self.last_timestamp {
self.last_timestamp = self.clock;
let now = Timestamp::from(self.clock);
if *now > *self.last_timestamp {
self.last_timestamp = now;
} else {
self.last_timestamp = self.last_timestamp + LocalDuration::from_millis(1);
self.last_timestamp = self.last_timestamp + 1;
}
self.last_timestamp.as_millis()
self.last_timestamp
}

////////////////////////////////////////////////////////////////////////////
Expand All @@ -2113,7 +2114,7 @@ where
self.sessions.connected().map(|(_, p)| p),
self.db.gossip_mut(),
);
self.last_announce = LocalTime::from_millis(time as u128);
self.last_announce = time.to_local_time();

Ok(())
}
Expand All @@ -2126,7 +2127,7 @@ where

let delta = count - self.config.limits.routing_max_size;
self.db.routing_mut().prune(
(*now - self.config.limits.routing_max_age).as_millis(),
(*now - self.config.limits.routing_max_age).into(),
Some(delta),
)?;
Ok(())
Expand Down
25 changes: 15 additions & 10 deletions radicle-node/src/service/gossip/store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::num::TryFromIntError;
use std::{fmt, io};

use radicle::crypto::Signature;
Expand All @@ -17,6 +18,9 @@ pub enum Error {
/// An Internal error.
#[error("internal error: {0}")]
Internal(#[from] sql::Error),
/// Unit overflow.
#[error("unit overflow:: {0}")]
UnitOverflow(#[from] TryFromIntError),
}

/// A database that has access to historical gossip messages.
Expand Down Expand Up @@ -54,7 +58,7 @@ impl Store for Database {
.db
.prepare("DELETE FROM `announcements` WHERE timestamp < ?1")?;

stmt.bind((1, cutoff.try_into().unwrap_or(i64::MAX)))?;
stmt.bind((1, &cutoff))?;
stmt.next()?;

Ok(self.db.change_count())
Expand All @@ -66,9 +70,10 @@ impl Store for Database {
.prepare("SELECT MAX(timestamp) AS latest FROM `announcements`")?;

if let Some(Ok(row)) = stmt.into_iter().next() {
let latest = row.try_read::<Option<i64>, _>(0)?;

return Ok(latest.map(|l| l as Timestamp));
return match row.try_read::<Option<i64>, _>(0)? {
Some(i) => Ok(Some(Timestamp::from(u64::try_from(i)?))),
None => Ok(None),
};
}
Ok(None)
}
Expand Down Expand Up @@ -101,7 +106,7 @@ impl Store for Database {
}
}
stmt.bind((5, &ann.signature))?;
stmt.bind((6, ann.message.timestamp().try_into().unwrap_or(i64::MAX)))?;
stmt.bind((6, &ann.message.timestamp()))?;
stmt.next()?;

Ok(self.db.change_count() > 0)
Expand All @@ -119,10 +124,10 @@ impl Store for Database {
WHERE timestamp >= ?1 and timestamp < ?2
ORDER BY timestamp, node, type",
)?;
assert!(from <= to);
assert!(*from <= *to);

stmt.bind((1, i64::try_from(from).unwrap_or(i64::MAX)))?;
stmt.bind((2, i64::try_from(to).unwrap_or(i64::MAX)))?;
stmt.bind((1, &from))?;
stmt.bind((2, &to))?;

Ok(Box::new(
stmt.into_iter()
Expand All @@ -145,9 +150,9 @@ impl Store for Database {
}
};
let signature = row.read::<Signature, _>("signature");
let timestamp = row.read::<i64, _>("timestamp");
let timestamp = row.read::<Timestamp, _>("timestamp");

debug_assert_eq!(timestamp, message.timestamp() as i64);
debug_assert_eq!(timestamp, message.timestamp());

Ok(Announcement {
node,
Expand Down
8 changes: 4 additions & 4 deletions radicle-node/src/service/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {
let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
rid: arbitrary::gen(1),
refs,
timestamp: LocalTime::now().as_millis(),
timestamp: LocalTime::now().into(),
})
.signed(&MockSigner::default())
.into();
Expand All @@ -621,7 +621,7 @@ mod tests {
inventory: arbitrary::vec(INVENTORY_LIMIT)
.try_into()
.expect("size within bounds limit"),
timestamp: LocalTime::now().as_millis(),
timestamp: LocalTime::now().into(),
},
&MockSigner::default(),
);
Expand All @@ -646,7 +646,7 @@ mod tests {
#[quickcheck]
fn prop_refs_announcement_signing(rid: RepoId) {
let signer = MockSigner::new(&mut fastrand::Rng::new());
let timestamp = 0;
let timestamp = Timestamp::EPOCH;
let at = raw::Oid::zero().into();
let refs = BoundedVec::collect_from(
&mut [RefsAt {
Expand All @@ -669,7 +669,7 @@ mod tests {
fn test_node_announcement_validate() {
let ann = NodeAnnouncement {
features: node::Features::SEED,
timestamp: 42491841,
timestamp: Timestamp::from(42491841),
alias: Alias::new("alice"),
addresses: BoundedVec::new(),
nonce: 0,
Expand Down
4 changes: 2 additions & 2 deletions radicle-node/src/service/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::node::Severity;
use crate::service::message;
use crate::service::message::Message;
use crate::service::{Address, LocalTime, NodeId, Outbox, RepoId, Rng};
use crate::Link;
use crate::{Link, Timestamp};

pub use crate::node::{PingState, State};

Expand All @@ -15,7 +15,7 @@ pub enum Error {
/// The remote peer sent an invalid announcement timestamp,
/// for eg. a timestamp far in the future.
#[error("invalid announcement timestamp: {0}")]
InvalidTimestamp(u64),
InvalidTimestamp(Timestamp),
/// The remote peer sent git protocol messages while we were expecting
/// gossip messages. Or vice-versa.
#[error("protocol mismatch")]
Expand Down
4 changes: 2 additions & 2 deletions radicle-node/src/test/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Messa
msgs.push(Message::node(
NodeAnnouncement {
features: node::Features::SEED,
timestamp: time.as_millis(),
timestamp: time.into(),
alias: node::Alias::new(gen::string(5)),
addresses: None.into(),
nonce: 0,
Expand All @@ -41,7 +41,7 @@ pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Messa
msgs.push(Message::inventory(
InventoryAnnouncement {
inventory: arbitrary::vec(3).try_into().unwrap(),
timestamp: time.as_millis(),
timestamp: time.into(),
},
&signer,
));
Expand Down
4 changes: 2 additions & 2 deletions radicle-node/src/test/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ where
for rid in storage.inventory().unwrap() {
policies.seed(&rid, Scope::Followed).unwrap();
}
let announcement = service::gossip::node(&config.config, config.local_time.as_secs());
let announcement = service::gossip::node(&config.config, config.local_time.into());
let emitter: Emitter<Event> = Default::default();
let service = Service::new(
config.config,
Expand Down Expand Up @@ -249,7 +249,7 @@ where
}

pub fn timestamp(&self) -> Timestamp {
self.clock().as_millis()
(*self.clock()).into()
}

pub fn inventory(&self) -> Inventory {
Expand Down
Loading

0 comments on commit 2b77192

Please sign in to comment.