diff --git a/crates/rrg/src/lib.rs b/crates/rrg/src/lib.rs index cb5facec..a2f3bbfc 100644 --- a/crates/rrg/src/lib.rs +++ b/crates/rrg/src/lib.rs @@ -51,7 +51,7 @@ pub fn init(args: &crate::args::Args) { pub fn listen(args: &crate::args::Args) { loop { let request = Request::receive(args.heartbeat_rate); - session::FleetspeakSession::dispatch(request); + session::FleetspeakSession::dispatch(args.heartbeat_rate, request); } } diff --git a/crates/rrg/src/request.rs b/crates/rrg/src/request.rs index 83e86881..b86dfc7f 100644 --- a/crates/rrg/src/request.rs +++ b/crates/rrg/src/request.rs @@ -312,6 +312,7 @@ impl TryFrom for Request { let proto_cpu_time_limit = proto.take_cpu_time_limit(); let cpu_time_limit = match try_from_duration(proto_cpu_time_limit) { + // TODO(@panhania): We should always require time limit to be set. Ok(limit) if limit.is_zero() => None, Ok(limit) => Some(limit), Err(error) => return Err(ParseRequestError { @@ -323,6 +324,7 @@ impl TryFrom for Request { let proto_real_time_limit = proto.take_real_time_limit(); let real_time_limit = match try_from_duration(proto_real_time_limit) { + // TODO(@panhania): We should always require time limit to be set. Ok(limit) if limit.is_zero() => None, Ok(limit) => Some(limit), Err(error) => return Err(ParseRequestError { diff --git a/crates/rrg/src/session.rs b/crates/rrg/src/session.rs index 15a09870..70ed0ff0 100644 --- a/crates/rrg/src/session.rs +++ b/crates/rrg/src/session.rs @@ -40,9 +40,7 @@ pub trait Session { where I: crate::response::Item + 'static; /// Sends a heartbeat signal to the Fleetspeak process. - fn heartbeat(&mut self) { - // TODO: Create a real implementation. - } + fn heartbeat(&mut self); } #[cfg(test)] diff --git a/crates/rrg/src/session/fake.rs b/crates/rrg/src/session/fake.rs index 28918975..3550b154 100644 --- a/crates/rrg/src/session/fake.rs +++ b/crates/rrg/src/session/fake.rs @@ -129,4 +129,7 @@ impl crate::session::Session for FakeSession { Ok(()) } + + fn heartbeat(&mut self) { + } } diff --git a/crates/rrg/src/session/fleetspeak.rs b/crates/rrg/src/session/fleetspeak.rs index 0bb1926f..10039df6 100644 --- a/crates/rrg/src/session/fleetspeak.rs +++ b/crates/rrg/src/session/fleetspeak.rs @@ -8,6 +8,8 @@ use log::{error, info}; pub struct FleetspeakSession { /// A builder for responses sent through Fleetspeak to the GRR server. response_builder: crate::ResponseBuilder, + /// Maximum frequency of heartbeat messages to send to Fleetspeak. + heartbeat_rate: std::time::Duration, /// Number of bytes sent since the session was created. network_bytes_sent: u64, /// Number of bytes we are allowed to send within the session. @@ -29,7 +31,14 @@ impl FleetspeakSession { /// send the error (in case on occurred) back to the server. But this we can /// do only within a sesssion, so we have to create a session from a perhaps /// invalid request. - pub fn dispatch(request: Result) { + /// + /// Long-running actions spawned by requests that need to send heartbeat + /// signal to Fleetspeak will do so with frequency not greater than the one + /// specified `heartbeat_rate`. + pub fn dispatch( + heartbeat_rate: std::time::Duration, + request: Result, + ) { let request_id = match &request { Ok(request) => request.id(), Err(error) => match error.request_id() { @@ -50,6 +59,7 @@ impl FleetspeakSession { let filters = request.take_filters(); let mut session = FleetspeakSession { response_builder: response_builder.with_filters(filters), + heartbeat_rate, network_bytes_sent: 0, network_bytes_limit: request.network_bytes_limit(), real_time_start: std::time::Instant::now(), @@ -149,4 +159,8 @@ impl crate::session::Session for FleetspeakSession { Ok(()) } + + fn heartbeat(&mut self) { + fleetspeak::heartbeat_with_throttle(self.heartbeat_rate); + } }