Skip to content

Commit

Permalink
Add real heartbeat support for sessions.
Browse files Browse the repository at this point in the history
  • Loading branch information
panhania authored Nov 19, 2024
1 parent 879a06c commit 04e65ca
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 5 deletions.
2 changes: 1 addition & 1 deletion crates/rrg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn init(args: &crate::args::Args) {
pub fn listen(args: &crate::args::Args) {
loop {
let request = Request::receive(args.heartbeat_rate);
session::FleetspeakSession::dispatch(request);
session::FleetspeakSession::dispatch(args.heartbeat_rate, request);
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/rrg/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl TryFrom<rrg_proto::rrg::Request> for Request {

let proto_cpu_time_limit = proto.take_cpu_time_limit();
let cpu_time_limit = match try_from_duration(proto_cpu_time_limit) {
// TODO(@panhania): We should always require time limit to be set.
Ok(limit) if limit.is_zero() => None,
Ok(limit) => Some(limit),
Err(error) => return Err(ParseRequestError {
Expand All @@ -323,6 +324,7 @@ impl TryFrom<rrg_proto::rrg::Request> for Request {

let proto_real_time_limit = proto.take_real_time_limit();
let real_time_limit = match try_from_duration(proto_real_time_limit) {
// TODO(@panhania): We should always require time limit to be set.
Ok(limit) if limit.is_zero() => None,
Ok(limit) => Some(limit),
Err(error) => return Err(ParseRequestError {
Expand Down
4 changes: 1 addition & 3 deletions crates/rrg/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ pub trait Session {
where I: crate::response::Item + 'static;

/// Sends a heartbeat signal to the Fleetspeak process.
fn heartbeat(&mut self) {
// TODO: Create a real implementation.
}
fn heartbeat(&mut self);
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions crates/rrg/src/session/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,7 @@ impl crate::session::Session for FakeSession {

Ok(())
}

fn heartbeat(&mut self) {
}
}
16 changes: 15 additions & 1 deletion crates/rrg/src/session/fleetspeak.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use log::{error, info};
pub struct FleetspeakSession {
/// A builder for responses sent through Fleetspeak to the GRR server.
response_builder: crate::ResponseBuilder,
/// Maximum frequency of heartbeat messages to send to Fleetspeak.
heartbeat_rate: std::time::Duration,
/// Number of bytes sent since the session was created.
network_bytes_sent: u64,
/// Number of bytes we are allowed to send within the session.
Expand All @@ -29,7 +31,14 @@ impl FleetspeakSession {
/// send the error (in case on occurred) back to the server. But this we can
/// do only within a sesssion, so we have to create a session from a perhaps
/// invalid request.
pub fn dispatch(request: Result<crate::Request, crate::ParseRequestError>) {
///
/// Long-running actions spawned by requests that need to send heartbeat
/// signal to Fleetspeak will do so with frequency not greater than the one
/// specified `heartbeat_rate`.
pub fn dispatch(
heartbeat_rate: std::time::Duration,
request: Result<crate::Request, crate::ParseRequestError>,
) {
let request_id = match &request {
Ok(request) => request.id(),
Err(error) => match error.request_id() {
Expand All @@ -50,6 +59,7 @@ impl FleetspeakSession {
let filters = request.take_filters();
let mut session = FleetspeakSession {
response_builder: response_builder.with_filters(filters),
heartbeat_rate,
network_bytes_sent: 0,
network_bytes_limit: request.network_bytes_limit(),
real_time_start: std::time::Instant::now(),
Expand Down Expand Up @@ -149,4 +159,8 @@ impl crate::session::Session for FleetspeakSession {

Ok(())
}

fn heartbeat(&mut self) {
fleetspeak::heartbeat_with_throttle(self.heartbeat_rate);
}
}

0 comments on commit 04e65ca

Please sign in to comment.