Skip to content

Commit

Permalink
Merge branch 'main' into dependabot-reviewers
Browse files Browse the repository at this point in the history
  • Loading branch information
SirCipher authored Dec 11, 2024
2 parents 49f4f4d + 8a58251 commit d91db87
Show file tree
Hide file tree
Showing 17 changed files with 501 additions and 173 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ num = "0.4"
smol_str = "0.3.1"
http-body-util = "0.1.2"
hyper-util = "0.1.5"
rdkafka = "0.36"
rdkafka = "0.37"
apache-avro = "0.17.0"
time = "0.3.36"
rumqttc = "0.24.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl LocationTracker {
/// it completes. Not that this is cannot be used as a stand-alone decoder as it has no concept of
/// a separator between frames. It needs to be incorporated into another decoder that can determine
/// where one record ends and another begins.
#[derive(Debug)]
pub struct RecognizerDecoder<R> {
parser: IncrementalReconParser,
recognizer: R,
Expand Down
37 changes: 30 additions & 7 deletions server/swimos_agent/src/agent_model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,28 @@ pub trait AgentDescription {
/// although it will not provided any lifecycle events for the agent or its lanes.
pub trait AgentSpec: AgentDescription + Sized + Send {
/// The type of handler to run when a command is received for a value lane.
type ValCommandHandler: HandlerAction<Self, Completion = ()> + Send + 'static;
type ValCommandHandler<'a>: HandlerAction<Self, Completion = ()> + Send + 'a
where
Self: 'a;

/// The type of handler to run when a command is received for a map lane.
type MapCommandHandler: HandlerAction<Self, Completion = ()> + Send + 'static;
type MapCommandHandler<'a>: HandlerAction<Self, Completion = ()> + Send + 'a
where
Self: 'a;

/// The type of handler to run when a request is received to sync with a lane.
type OnSyncHandler: HandlerAction<Self, Completion = ()> + Send + 'static;

/// The type of the handler to run when an HTTP request is received for a lane.
type HttpRequestHandler: HandlerAction<Self, Completion = ()> + Send + 'static;

/// A store of persistent deserializers that may be used by [`AgentSpec::on_value_command`]
/// and [`AgentSpec::on_map_command`].
type Deserializers: Send + 'static;

/// Create th store of deserializers that can be reused any number of times.
fn initialize_deserializers(&self) -> Self::Deserializers;

/// The names and flags of all items (lanes and stores) in the agent.
fn item_specs() -> HashMap<&'static str, ItemSpec>;

Expand All @@ -223,9 +234,15 @@ pub trait AgentSpec: AgentDescription + Sized + Send {
/// accept commands.
///
/// # Arguments
/// * `deserializers` - The store of persistent deserializers for the agent.
/// * `lane` - The name of the lane.
/// * `body` - The content of the command.
fn on_value_command(&self, lane: &str, body: BytesMut) -> Option<Self::ValCommandHandler>;
fn on_value_command<'a>(
&self,
deserializers: &'a mut Self::Deserializers,
lane: &str,
body: BytesMut,
) -> Option<Self::ValCommandHandler<'a>>;

/// Create an initializer that will consume the state of a value-like item, as reported by the runtime.
///
Expand All @@ -247,13 +264,15 @@ pub trait AgentSpec: AgentDescription + Sized + Send {
/// for a map lane. There will be no handler if the lane does not exist or does not
/// accept commands.
/// # Arguments
/// * `deserializers` - The store of persistent deserializers for the agent.
/// * `lane` - The name of the lane.
/// * `body` - The content of the command.
fn on_map_command(
fn on_map_command<'a>(
&self,
deserializers: &'a mut Self::Deserializers,
lane: &str,
body: MapMessage<BytesMut, BytesMut>,
) -> Option<Self::MapCommandHandler>;
) -> Option<Self::MapCommandHandler<'a>>;

/// Create a handler that will update the state of an agent when a request is made to
/// sync with a lane. There will be no handler if the lane does not exist.
Expand Down Expand Up @@ -1217,6 +1236,7 @@ where
let add_commander = |address: Address<Text>| cmd_ids.borrow_mut().get_request(&address);
let add_link = (add_downlink, add_commander);
let add_lane = NoDynLanes;
let mut deserializers = item_model.initialize_deserializers();

// Calling run_handler is very verbose so is pulled out into this macro to make the code easier to read.
macro_rules! exec_handler {
Expand Down Expand Up @@ -1399,7 +1419,8 @@ where
match request {
LaneRequest::Command(body) => {
trace!(name = %name, "Received a command for a value-like lane.");
if let Some(handler) = item_model.on_value_command(name.as_str(), body)
if let Some(handler) =
item_model.on_value_command(&mut deserializers, name.as_str(), body)
{
let result = run_handler(
&mut ActionContext::new(
Expand Down Expand Up @@ -1453,7 +1474,9 @@ where
match request {
LaneRequest::Command(body) => {
trace!(name = %name, "Received a command for a map-like lane.");
if let Some(handler) = item_model.on_map_command(name.as_str(), body) {
if let Some(handler) =
item_model.on_map_command(&mut deserializers, name.as_str(), body)
{
let result = run_handler(
&mut ActionContext::new(
&suspended,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,32 @@ impl AgentDescription for EmptyAgent {
}

impl AgentSpec for EmptyAgent {
type ValCommandHandler = UnitHandler;
type ValCommandHandler<'a> = UnitHandler
where
Self: 'a;

type MapCommandHandler = UnitHandler;
type MapCommandHandler<'a> = UnitHandler
where
Self: 'a;

type OnSyncHandler = UnitHandler;

type HttpRequestHandler = UnitHandler;

type Deserializers = ();

fn initialize_deserializers(&self) -> Self::Deserializers {}

fn item_specs() -> HashMap<&'static str, ItemSpec> {
HashMap::new()
}

fn on_value_command(&self, _lane: &str, _body: BytesMut) -> Option<Self::ValCommandHandler> {
fn on_value_command(
&self,
_: &mut (),
_lane: &str,
_body: BytesMut,
) -> Option<Self::ValCommandHandler<'_>> {
None
}

Expand All @@ -67,11 +80,12 @@ impl AgentSpec for EmptyAgent {
None
}

fn on_map_command(
fn on_map_command<'a>(
&self,
_: &'a mut (),
_lane: &str,
_body: MapMessage<BytesMut, BytesMut>,
) -> Option<Self::MapCommandHandler> {
) -> Option<Self::MapCommandHandler<'a>> {
None
}

Expand Down
24 changes: 19 additions & 5 deletions server/swimos_agent/src/agent_model/tests/fake_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,13 @@ impl AgentDescription for TestAgent {
}

impl AgentSpec for TestAgent {
type ValCommandHandler = TestHandler;
type ValCommandHandler<'a> = TestHandler
where
Self: 'a;

type MapCommandHandler = TestHandler;
type MapCommandHandler<'a> = TestHandler
where
Self: 'a;

type OnSyncHandler = TestHandler;

Expand Down Expand Up @@ -219,7 +223,12 @@ impl AgentSpec for TestAgent {
lanes
}

fn on_value_command(&self, lane: &str, body: BytesMut) -> Option<Self::ValCommandHandler> {
fn on_value_command<'a>(
&self,
_: &'a mut (),
lane: &str,
body: BytesMut,
) -> Option<Self::ValCommandHandler<'a>> {
match lane {
VAL_LANE => Some(
TestEvent::Value {
Expand Down Expand Up @@ -264,11 +273,12 @@ impl AgentSpec for TestAgent {
None
}

fn on_map_command(
fn on_map_command<'a>(
&self,
_: &'a mut (),
lane: &str,
body: MapMessage<BytesMut, BytesMut>,
) -> Option<Self::MapCommandHandler> {
) -> Option<Self::MapCommandHandler<'a>> {
match lane {
MAP_LANE => Some(
TestEvent::Map {
Expand Down Expand Up @@ -426,6 +436,10 @@ impl AgentSpec for TestAgent {
Err(DynamicRegistrationError::DuplicateName(name.to_string()))
}
}

type Deserializers = ();

fn initialize_deserializers(&self) -> Self::Deserializers {}
}

impl HandlerAction<TestAgent> for TestHandler {
Expand Down
34 changes: 15 additions & 19 deletions server/swimos_agent/src/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use swimos_api::{
};
use swimos_form::{read::RecognizerReadable, write::StructuralWritable};
use swimos_model::Text;
use swimos_recon::parser::{AsyncParseError, RecognizerDecoder};
use swimos_recon::parser::AsyncParseError;
use swimos_utilities::{never::Never, routing::RouteUri};
use thiserror::Error;
use tokio::time::Instant;
Expand All @@ -45,6 +45,7 @@ use crate::{
},
lanes::JoinLaneKind,
meta::AgentMetadata,
ReconDecoder,
};

use bitflags::bitflags;
Expand Down Expand Up @@ -1283,23 +1284,21 @@ impl<Context, S: AsRef<str>> HandlerAction<Context> for GetParameter<S> {

/// An event handler that will attempt to decode a [readable](`swimos_form::read::StructuralReadable`) type
/// from a buffer, immediately returning the result or an error.
pub struct Decode<T> {
_target_type: PhantomData<fn() -> T>,
pub struct Decode<'a, T: RecognizerReadable> {
decoder: Option<&'a mut ReconDecoder<T>>,
buffer: BytesMut,
complete: bool,
}

impl<T> Decode<T> {
pub fn new(buffer: BytesMut) -> Self {
impl<'a, T: RecognizerReadable> Decode<'a, T> {
pub fn new(decoder: &'a mut ReconDecoder<T>, buffer: BytesMut) -> Self {
Decode {
_target_type: PhantomData,
decoder: Some(decoder),
buffer,
complete: false,
}
}
}

impl<Context, T: RecognizerReadable> HandlerAction<Context> for Decode<T> {
impl<'a, T: RecognizerReadable, Context> HandlerAction<Context> for Decode<'a, T> {
type Completion = T;

fn step(
Expand All @@ -1308,27 +1307,24 @@ impl<Context, T: RecognizerReadable> HandlerAction<Context> for Decode<T> {
_meta: AgentMetadata,
_context: &Context,
) -> StepResult<Self::Completion> {
let Decode {
buffer, complete, ..
} = self;
if *complete {
StepResult::after_done()
} else {
let mut decoder = RecognizerDecoder::new(T::make_recognizer());
*complete = true;
let Decode { decoder, buffer } = self;
if let Some(decoder) = decoder.take() {
decoder.reset();
match decoder.decode_eof(buffer) {
Ok(Some(value)) => StepResult::done(value),
Ok(_) => StepResult::Fail(EventHandlerError::IncompleteCommand),
Err(e) => StepResult::Fail(EventHandlerError::BadCommand(e)),
}
} else {
StepResult::after_done()
}
}

fn describe(&self, _context: &Context, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
let Decode {
buffer, complete, ..
buffer, decoder, ..
} = self;
let content = if *complete {
let content = if decoder.is_none() {
CONSUMED
} else {
std::str::from_utf8(buffer.as_ref()).unwrap_or("<<BAD UTF8>>")
Expand Down
11 changes: 7 additions & 4 deletions server/swimos_agent/src/event_handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use tokio::time::Instant;

use crate::agent_model::AgentDescription;
use crate::event_handler::check_step::{check_is_complete, check_is_continue};
use crate::event_handler::{GetParameter, ModificationFlags, WithParameters};
use crate::event_handler::{Decode, GetParameter, ModificationFlags, WithParameters};

use crate::test_context::{NO_DOWNLINKS, NO_DYN_LANES};
use crate::ReconDecoder;
use crate::{
event_handler::{
ConstHandler, EventHandlerError, GetAgentUri, HandlerActionExt, Sequentially, SideEffects,
Expand All @@ -36,7 +37,7 @@ use crate::{
};

use super::{
join, ActionContext, Decode, HandlerAction, HandlerFuture, Modification, ScheduleTimerEvent,
join, ActionContext, HandlerAction, HandlerFuture, Modification, ScheduleTimerEvent,
SideEffect, Spawner, StepResult,
};

Expand Down Expand Up @@ -526,14 +527,15 @@ fn followed_by_handler() {

#[test]
fn decoding_handler_success() {
let mut decoder = ReconDecoder::<i32>::default();
let uri = make_uri();
let route_params = HashMap::new();
let meta = make_meta(&uri, &route_params);

let mut buffer = BytesMut::new();
write!(buffer, "56").expect("Write failed.");

let mut handler = Decode::<i32>::new(buffer);
let mut handler = Decode::<i32>::new(&mut decoder, buffer);

let result = handler.step(
&mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()),
Expand Down Expand Up @@ -561,14 +563,15 @@ fn decoding_handler_success() {

#[test]
fn decoding_handler_failure() {
let mut decoder = ReconDecoder::<i32>::default();
let uri = make_uri();
let route_params = HashMap::new();
let meta = make_meta(&uri, &route_params);

let mut buffer = BytesMut::new();
write!(buffer, "boom").expect("Write failed.");

let mut handler = Decode::<i32>::new(buffer);
let mut handler = Decode::<i32>::new(&mut decoder, buffer);

let result = handler.step(
&mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()),
Expand Down
8 changes: 5 additions & 3 deletions server/swimos_agent/src/lanes/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
},
item::AgentItem,
meta::AgentMetadata,
ReconDecoder,
};

use super::{LaneItem, ProjTransform};
Expand Down Expand Up @@ -157,15 +158,16 @@ impl<C, T> HandlerTrans<T> for ProjTransform<C, CommandLane<T>> {
}
}

pub type DecodeAndCommand<C, T> =
AndThen<Decode<T>, DoCommand<C, T>, ProjTransform<C, CommandLane<T>>>;
pub type DecodeAndCommand<'a, C, T> =
AndThen<Decode<'a, T>, DoCommand<C, T>, ProjTransform<C, CommandLane<T>>>;

/// Create an event handler that will decode an incoming command and apply it to a command lane.
pub fn decode_and_command<C: AgentDescription, T: RecognizerReadable>(
decoder: &mut ReconDecoder<T>,
buffer: BytesMut,
projection: fn(&C) -> &CommandLane<T>,
) -> DecodeAndCommand<C, T> {
let decode: Decode<T> = Decode::new(buffer);
let decode: Decode<T> = Decode::new(decoder, buffer);
decode.and_then(ProjTransform::new(projection))
}

Expand Down
Loading

0 comments on commit d91db87

Please sign in to comment.