Skip to content

Commit

Permalink
Originator is actually not optional how we use it (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
DavisVaughan authored Oct 10, 2024
1 parent 990109f commit b867136
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 26 deletions.
2 changes: 1 addition & 1 deletion crates/amalthea/src/language/shell_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub trait ShellHandler: Send {
/// Docs: https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute
async fn handle_execute_request(
&mut self,
originator: Option<Originator>,
originator: Originator,
req: &ExecuteRequest,
) -> Result<ExecuteReply, ExecuteReplyException>;

Expand Down
2 changes: 1 addition & 1 deletion crates/amalthea/src/socket/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl Shell {
) -> Result<(), Error> {
log::info!("Received execution request {req:?}");
let originator = Originator::from(&req);
match block_on(handler.handle_execute_request(Some(originator), &req.content)) {
match block_on(handler.handle_execute_request(originator, &req.content)) {
Ok(reply) => {
log::info!("Got execution reply, delivering to frontend: {reply:?}");
let r = req.send_reply(reply, &self.socket);
Expand Down
4 changes: 2 additions & 2 deletions crates/amalthea/src/wire/input_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct InputRequest {
/// An input request originating from a Shell handler
pub struct ShellInputRequest {
/// The identity of the Shell that sent the request
pub originator: Option<Originator>,
pub originator: Originator,

/// The input request itself
pub request: InputRequest,
Expand All @@ -46,7 +46,7 @@ impl MessageType for InputRequest {
pub struct UiCommFrontendRequest {
/// The identity of the currently active `execute_request` that caused this
/// comm request
pub originator: Option<Originator>,
pub originator: Originator,

/// The response channel for the request
pub response_tx: Sender<StdInRpcReply>,
Expand Down
11 changes: 4 additions & 7 deletions crates/amalthea/src/wire/jupyter_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct JupyterMessage<T> {
pub header: JupyterHeader,

/// The header of the message from which this message originated. Optional;
/// not all messages have an originator.
/// not all messages have a parent.
pub parent_header: Option<JupyterHeader>,

/// The body (payload) of the message
Expand Down Expand Up @@ -339,14 +339,11 @@ where

/// Create a new Jupyter message with a specific ZeroMQ identity.
pub fn create_with_identity(
orig: Option<Originator>,
originator: Originator,
content: T,
session: &Session,
) -> JupyterMessage<T> {
let (id, parent_header) = match orig {
Some(orig) => (orig.zmq_id, Some(orig.header)),
None => (Vec::new(), None),
};
let (id, parent_header) = (originator.zmq_id, originator.header);

JupyterMessage::<T> {
zmq_identities: vec![id],
Expand All @@ -355,7 +352,7 @@ where
session.session_id.clone(),
session.username.clone(),
),
parent_header,
parent_header: Some(parent_header),
content,
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/amalthea/tests/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ impl Shell {
}

// Simluates an input request
fn prompt_for_input(&self, originator: Option<Originator>) {
fn prompt_for_input(&self, originator: Originator) {
if let Err(err) = self
.stdin_request_tx
.send(StdInRequest::Input(ShellInputRequest {
originator: originator.clone(),
originator,
request: InputRequest {
prompt: String::from("Amalthea Echo> "),
password: false,
Expand Down Expand Up @@ -137,7 +137,7 @@ impl ShellHandler for Shell {
/// Handles an ExecuteRequest; "executes" the code by echoing it.
async fn handle_execute_request(
&mut self,
originator: Option<Originator>,
originator: Originator,
req: &ExecuteRequest,
) -> Result<ExecuteReply, ExecuteReplyException> {
// Increment counter if we are storing this execution in history
Expand Down
18 changes: 9 additions & 9 deletions crates/ark/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub struct RMain {
struct ActiveReadConsoleRequest {
exec_count: u32,
request: ExecuteRequest,
orig: Option<Originator>,
originator: Originator,
response_tx: Sender<ExecuteResponse>,
}

Expand Down Expand Up @@ -860,7 +860,7 @@ impl RMain {
// Send request to frontend. We'll wait for an `input_reply`
// from the frontend in the event loop in `read_console()`.
// The active request remains active.
self.request_input(req.orig.clone(), info.input_prompt.to_string());
self.request_input(req.originator.clone(), info.input_prompt.to_string());
return None;
} else {
// Invalid input request, propagate error to R
Expand Down Expand Up @@ -923,15 +923,15 @@ impl RMain {
}

let input = match req {
RRequest::ExecuteCode(exec_req, orig, response_tx) => {
RRequest::ExecuteCode(exec_req, originator, response_tx) => {
// Extract input from request
let (input, exec_count) = { self.init_execute_request(&exec_req) };

// Save `ExecuteCode` request so we can respond to it at next prompt
self.active_request = Some(ActiveReadConsoleRequest {
exec_count,
request: exec_req,
orig,
originator,
response_tx,
});

Expand Down Expand Up @@ -1429,7 +1429,7 @@ impl RMain {

/// Request input from frontend in case code like `readline()` is
/// waiting for input
fn request_input(&self, orig: Option<Originator>, prompt: String) {
fn request_input(&self, originator: Originator, prompt: String) {
// TODO: We really should not have to wait on IOPub to be cleared, but
// if an IOPub `'stream'` message arrives on the frontend while an input
// request is being handled, it currently breaks the Console. We should
Expand All @@ -1448,7 +1448,7 @@ impl RMain {
unwrap!(
self.stdin_request_tx
.send(StdInRequest::Input(ShellInputRequest {
originator: orig,
originator,
request: InputRequest {
prompt,
password: false,
Expand Down Expand Up @@ -1687,12 +1687,12 @@ impl RMain {
log::trace!("Calling frontend method '{request:?}'");
let (response_tx, response_rx) = bounded(1);

let originator = if let Some(req) = &self.active_request {
req.orig.clone()
} else {
let Some(req) = &self.active_request else {
anyhow::bail!("Error: No active request");
};

let originator = req.originator.clone();

let comm_request = UiCommFrontendRequest {
originator,
response_tx,
Expand Down
2 changes: 1 addition & 1 deletion crates/ark/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::ui::UiCommMessage;
pub enum RRequest {
/// Fulfill an execution request from the frontend, producing either a
/// Reply or an Exception
ExecuteCode(ExecuteRequest, Option<Originator>, Sender<ExecuteResponse>),
ExecuteCode(ExecuteRequest, Originator, Sender<ExecuteResponse>),

/// Shut down the R execution thread
Shutdown(bool),
Expand Down
2 changes: 1 addition & 1 deletion crates/ark/src/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl ShellHandler for Shell {
/// for processing.
async fn handle_execute_request(
&mut self,
originator: Option<Originator>,
originator: Originator,
req: &ExecuteRequest,
) -> Result<ExecuteReply, ExecuteReplyException> {
let (response_tx, response_rx) = unbounded::<ExecuteResponse>();
Expand Down
2 changes: 1 addition & 1 deletion crates/echo/src/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ShellHandler for Shell {
/// Handles an ExecuteRequest; "executes" the code by echoing it.
async fn handle_execute_request(
&mut self,
_originator: Option<Originator>,
_originator: Originator,
req: &ExecuteRequest,
) -> Result<ExecuteReply, ExecuteReplyException> {
// Increment counter if we are storing this execution in history
Expand Down

0 comments on commit b867136

Please sign in to comment.