Skip to content

Commit

Permalink
Expose agent arguments in sessions.
Browse files Browse the repository at this point in the history
  • Loading branch information
panhania committed Dec 11, 2024
1 parent f90afe4 commit c838321
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
2 changes: 1 addition & 1 deletion crates/rrg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn init(args: &crate::args::Args) {
pub fn listen(args: &crate::args::Args) {
loop {
let request = Request::receive(args.heartbeat_rate);
session::FleetspeakSession::dispatch(args.heartbeat_rate, request);
session::FleetspeakSession::dispatch(args, request);
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/rrg/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub type Result<T> = std::result::Result<T, Error>;

/// Abstraction for various kinds of sessions.
pub trait Session {

/// Provides the arguments passed to the agent.
fn args(&self) -> &crate::args::Args;

/// Sends a reply to the flow that call the action.
fn reply<I>(&mut self, item: I) -> Result<()>
where I: crate::response::Item + 'static;
Expand Down
18 changes: 17 additions & 1 deletion crates/rrg/src/session/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,27 @@ use crate::Sink;
/// Instead, one can use a `Fake` session. It simply accumulates responses
/// that the action sends and lets the creator inspect them later.
pub struct FakeSession {
args: crate::args::Args,
replies: Vec<Box<dyn Any>>,
parcels: std::collections::HashMap<Sink, Vec<Box<dyn Any>>>,
}

impl FakeSession {

/// Constructs a new fake session.
/// Constructs a new fake session with test default agent arguments.
pub fn new() -> FakeSession {
FakeSession::with_args(crate::args::Args {
heartbeat_rate: std::time::Duration::from_secs(0),
verbosity: log::LevelFilter::Debug,
log_to_stdout: false,
log_to_file: None,
})
}

/// Constructs a new fake session with the given agent arguments.
pub fn with_args(args: crate::args::Args) -> FakeSession {
FakeSession {
args,
replies: Vec::new(),
parcels: std::collections::HashMap::new(),
}
Expand Down Expand Up @@ -111,6 +123,10 @@ impl FakeSession {

impl crate::session::Session for FakeSession {

fn args(&self) -> &crate::args::Args {
&self.args
}

fn reply<I>(&mut self, item: I) -> crate::session::Result<()>
where
I: crate::response::Item + 'static,
Expand Down
24 changes: 14 additions & 10 deletions crates/rrg/src/session/fleetspeak.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use log::{error, info};
/// This is a normal session type that that is associated with some flow on the
/// server. It keeps track of the responses it sends and collects statistics
/// about network and runtime utilization to kill the action if it is needed.
pub struct FleetspeakSession {
pub struct FleetspeakSession<'a> {
/// Arguments passed to the agent.
args: &'a crate::args::Args,
/// A builder for responses sent through Fleetspeak to the GRR server.
response_builder: crate::ResponseBuilder,
/// Maximum frequency of heartbeat messages to send to Fleetspeak.
heartbeat_rate: std::time::Duration,
/// Number of bytes sent since the session was created.
network_bytes_sent: u64,
/// Number of bytes we are allowed to send within the session.
Expand All @@ -20,7 +20,7 @@ pub struct FleetspeakSession {
real_time_limit: Option<std::time::Duration>,
}

impl FleetspeakSession {
impl<'a> FleetspeakSession<'a> {

/// Dispatches the given `request` to an appropriate action handler.
///
Expand All @@ -34,9 +34,9 @@ impl FleetspeakSession {
///
/// Long-running actions spawned by requests that need to send heartbeat
/// signal to Fleetspeak will do so with frequency not greater than the one
/// specified `heartbeat_rate`.
/// specified the arguments passed to the agent.
pub fn dispatch(
heartbeat_rate: std::time::Duration,
args: &'a crate::args::Args,
request: Result<crate::Request, crate::ParseRequestError>,
) {
let request_id = match &request {
Expand All @@ -58,8 +58,8 @@ impl FleetspeakSession {
Ok(mut request) => {
let filters = request.take_filters();
let mut session = FleetspeakSession {
args,
response_builder: response_builder.with_filters(filters),
heartbeat_rate,
network_bytes_sent: 0,
network_bytes_limit: request.network_bytes_limit(),
real_time_start: std::time::Instant::now(),
Expand All @@ -81,7 +81,7 @@ impl FleetspeakSession {
}
}

impl FleetspeakSession {
impl<'a> FleetspeakSession<'a> {

/// Checks whether the network bytes limit was crossed.
///
Expand Down Expand Up @@ -121,7 +121,11 @@ impl FleetspeakSession {
}
}

impl crate::session::Session for FleetspeakSession {
impl<'a> crate::session::Session for FleetspeakSession<'a> {

fn args(&self) -> &crate::args::Args {
&self.args
}

fn reply<I>(&mut self, item: I) -> crate::session::Result<()>
where
Expand Down Expand Up @@ -161,6 +165,6 @@ impl crate::session::Session for FleetspeakSession {
}

fn heartbeat(&mut self) {
fleetspeak::heartbeat_with_throttle(self.heartbeat_rate);
fleetspeak::heartbeat_with_throttle(self.args.heartbeat_rate);
}
}

0 comments on commit c838321

Please sign in to comment.