From c838321b5af1903b4802d81d61b1007071c473ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Hanuszczak?= Date: Wed, 11 Dec 2024 16:11:09 +0100 Subject: [PATCH] Expose agent arguments in sessions. --- crates/rrg/src/lib.rs | 2 +- crates/rrg/src/session.rs | 4 ++++ crates/rrg/src/session/fake.rs | 18 +++++++++++++++++- crates/rrg/src/session/fleetspeak.rs | 24 ++++++++++++++---------- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/crates/rrg/src/lib.rs b/crates/rrg/src/lib.rs index a2f3bbfc..0c4776ba 100644 --- a/crates/rrg/src/lib.rs +++ b/crates/rrg/src/lib.rs @@ -51,7 +51,7 @@ pub fn init(args: &crate::args::Args) { pub fn listen(args: &crate::args::Args) { loop { let request = Request::receive(args.heartbeat_rate); - session::FleetspeakSession::dispatch(args.heartbeat_rate, request); + session::FleetspeakSession::dispatch(args, request); } } diff --git a/crates/rrg/src/session.rs b/crates/rrg/src/session.rs index 70ed0ff0..17e79acc 100644 --- a/crates/rrg/src/session.rs +++ b/crates/rrg/src/session.rs @@ -31,6 +31,10 @@ pub type Result = std::result::Result; /// Abstraction for various kinds of sessions. pub trait Session { + + /// Provides the arguments passed to the agent. + fn args(&self) -> &crate::args::Args; + /// Sends a reply to the flow that call the action. fn reply(&mut self, item: I) -> Result<()> where I: crate::response::Item + 'static; diff --git a/crates/rrg/src/session/fake.rs b/crates/rrg/src/session/fake.rs index 3550b154..0b617c0d 100644 --- a/crates/rrg/src/session/fake.rs +++ b/crates/rrg/src/session/fake.rs @@ -12,15 +12,27 @@ use crate::Sink; /// Instead, one can use a `Fake` session. It simply accumulates responses /// that the action sends and lets the creator inspect them later. pub struct FakeSession { + args: crate::args::Args, replies: Vec>, parcels: std::collections::HashMap>>, } impl FakeSession { - /// Constructs a new fake session. + /// Constructs a new fake session with test default agent arguments. pub fn new() -> FakeSession { + FakeSession::with_args(crate::args::Args { + heartbeat_rate: std::time::Duration::from_secs(0), + verbosity: log::LevelFilter::Debug, + log_to_stdout: false, + log_to_file: None, + }) + } + + /// Constructs a new fake session with the given agent arguments. + pub fn with_args(args: crate::args::Args) -> FakeSession { FakeSession { + args, replies: Vec::new(), parcels: std::collections::HashMap::new(), } @@ -111,6 +123,10 @@ impl FakeSession { impl crate::session::Session for FakeSession { + fn args(&self) -> &crate::args::Args { + &self.args + } + fn reply(&mut self, item: I) -> crate::session::Result<()> where I: crate::response::Item + 'static, diff --git a/crates/rrg/src/session/fleetspeak.rs b/crates/rrg/src/session/fleetspeak.rs index 10039df6..2ce1306e 100644 --- a/crates/rrg/src/session/fleetspeak.rs +++ b/crates/rrg/src/session/fleetspeak.rs @@ -5,11 +5,11 @@ use log::{error, info}; /// This is a normal session type that that is associated with some flow on the /// server. It keeps track of the responses it sends and collects statistics /// about network and runtime utilization to kill the action if it is needed. -pub struct FleetspeakSession { +pub struct FleetspeakSession<'a> { + /// Arguments passed to the agent. + args: &'a crate::args::Args, /// A builder for responses sent through Fleetspeak to the GRR server. response_builder: crate::ResponseBuilder, - /// Maximum frequency of heartbeat messages to send to Fleetspeak. - heartbeat_rate: std::time::Duration, /// Number of bytes sent since the session was created. network_bytes_sent: u64, /// Number of bytes we are allowed to send within the session. @@ -20,7 +20,7 @@ pub struct FleetspeakSession { real_time_limit: Option, } -impl FleetspeakSession { +impl<'a> FleetspeakSession<'a> { /// Dispatches the given `request` to an appropriate action handler. /// @@ -34,9 +34,9 @@ impl FleetspeakSession { /// /// Long-running actions spawned by requests that need to send heartbeat /// signal to Fleetspeak will do so with frequency not greater than the one - /// specified `heartbeat_rate`. + /// specified the arguments passed to the agent. pub fn dispatch( - heartbeat_rate: std::time::Duration, + args: &'a crate::args::Args, request: Result, ) { let request_id = match &request { @@ -58,8 +58,8 @@ impl FleetspeakSession { Ok(mut request) => { let filters = request.take_filters(); let mut session = FleetspeakSession { + args, response_builder: response_builder.with_filters(filters), - heartbeat_rate, network_bytes_sent: 0, network_bytes_limit: request.network_bytes_limit(), real_time_start: std::time::Instant::now(), @@ -81,7 +81,7 @@ impl FleetspeakSession { } } -impl FleetspeakSession { +impl<'a> FleetspeakSession<'a> { /// Checks whether the network bytes limit was crossed. /// @@ -121,7 +121,11 @@ impl FleetspeakSession { } } -impl crate::session::Session for FleetspeakSession { +impl<'a> crate::session::Session for FleetspeakSession<'a> { + + fn args(&self) -> &crate::args::Args { + &self.args + } fn reply(&mut self, item: I) -> crate::session::Result<()> where @@ -161,6 +165,6 @@ impl crate::session::Session for FleetspeakSession { } fn heartbeat(&mut self) { - fleetspeak::heartbeat_with_throttle(self.heartbeat_rate); + fleetspeak::heartbeat_with_throttle(self.args.heartbeat_rate); } }