Skip to content

Commit

Permalink
(#85) Network viewer - auto-subscribe for event monitoring on publish
Browse files Browse the repository at this point in the history
  • Loading branch information
mario4tier committed Nov 20, 2023
1 parent 21b90ea commit e51cc55
Show file tree
Hide file tree
Showing 19 changed files with 1,487 additions and 242 deletions.
252 changes: 244 additions & 8 deletions rust/demo-app/Cargo.lock

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions rust/helper-uniffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions rust/helper/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 6 additions & 25 deletions rust/suibase/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion rust/suibase/crates/suibase-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition.workspace = true

[dependencies]
log = "0.4.0"
pretty_env_logger = "0.4"
env_logger = "0.10"
home = "0.5.5"
twox-hash = "1.6.1"

Expand Down Expand Up @@ -51,3 +51,7 @@ thiserror.workspace = true

tower.workspace = true
tower-http.workspace = true

[dev-dependencies]
env_logger = "0.10"
log = "0.4"
89 changes: 79 additions & 10 deletions rust/suibase/crates/suibase-daemon/src/admin_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::workdirs_watcher::WorkdirsWatcher;
use crate::workers::ShellWorker;
use crate::workers::{EventsWriterWorker, EventsWriterWorkerParams};

use anyhow::Result;
use anyhow::{anyhow, Result};

use tokio_graceful_shutdown::{FutureExt, NestedSubsystem, SubsystemBuilder, SubsystemHandle};

Expand Down Expand Up @@ -51,10 +51,10 @@ pub type AdminControllerRx = tokio::sync::mpsc::Receiver<AdminControllerMsg>;
struct WorkdirTracking {
last_read_config: Option<WorkdirProxyConfig>,

shell_worker_tx: Option<AdminControllerTx>,
shell_worker_tx: Option<GenericTx>,
shell_worker_handle: Option<NestedSubsystem<Box<dyn Error + Send + Sync>>>, // Set when the shell_worker is started.

events_writer_worker_tx: Option<AdminControllerTx>,
events_writer_worker_tx: Option<GenericTx>,
events_writer_worker_handle: Option<NestedSubsystem<Box<dyn Error + Send + Sync>>>, // Set when the events_writer_worker is started.
}

Expand Down Expand Up @@ -116,9 +116,10 @@ impl std::fmt::Debug for AdminControllerMsg {

// Events ID
pub type AdminControllerEventID = u8;
pub const EVENT_NOTIF_CONFIG_FILE_CHANGE: u8 = 1;
pub const EVENT_DEBUG_PRINT: u8 = 2;
pub const EVENT_SHELL_EXEC: u8 = 3;
pub const EVENT_NOTIF_CONFIG_FILE_CHANGE: u8 = 128;
pub const EVENT_DEBUG_PRINT: u8 = 129;
pub const EVENT_SHELL_EXEC: u8 = 130;
pub const EVENT_POST_PUBLISH: u8 = 131;

impl AdminController {
pub fn new(
Expand All @@ -138,6 +139,68 @@ impl AdminController {
}
}

pub async fn send_event_audit(tx_channel: &AdminControllerTx) -> Result<()> {
let mut msg = AdminControllerMsg::new();
msg.event_id = EVENT_AUDIT;
tx_channel.send(msg).await.map_err(|e| {
log::debug!("failed {}", e);
anyhow!("failed {}", e)
})
}

async fn process_audit_msg(&mut self, msg: AdminControllerMsg) {
if msg.event_id != EVENT_AUDIT {
log::error!("Unexpected event_id {:?}", msg.event_id);
// Do nothing. Consume the message.
return;
}

// Forward an audit message to every events writer.
for (workdir_idx, wd_tracking) in self.wd_tracking.iter_mut() {
if wd_tracking.events_writer_worker_tx.is_none() {
continue;
}
let worker_tx = wd_tracking.events_writer_worker_tx.as_ref().unwrap();

let mut worker_msg = GenericChannelMsg::new();
worker_msg.event_id = EVENT_AUDIT;
worker_msg.workdir_idx = Some(workdir_idx);
worker_tx.send(worker_msg).await.unwrap();
}
}

pub async fn send_event_post_publish(tx_channel: &AdminControllerTx) -> Result<()> {
let mut msg = AdminControllerMsg::new();
msg.event_id = EVENT_POST_PUBLISH;
tx_channel.send(msg).await.map_err(|e| {
log::debug!("failed {}", e);
anyhow!("failed {}", e)
})
}

async fn process_post_publish_msg(&mut self, msg: AdminControllerMsg) {
if msg.event_id != EVENT_POST_PUBLISH {
log::error!("Unexpected event_id {:?}", msg.event_id);
return;
}

if msg.workdir_idx.is_none() {
log::error!("EVENT_POST_PUBLISH missing workdir_idx");
return;
}

let msg_workdir_idx = msg.workdir_idx.unwrap();
// Forward an update message to the related workdir events writer.
let wd_tracking = self.wd_tracking.get_mut(msg_workdir_idx);

if let Some(worker_tx) = wd_tracking.events_writer_worker_tx.as_ref() {
let mut worker_msg = GenericChannelMsg::new();
worker_msg.event_id = EVENT_UPDATE;
worker_msg.workdir_idx = Some(msg_workdir_idx);
worker_tx.send(worker_msg).await.unwrap();
}
}

async fn process_shell_exec_msg(&mut self, msg: AdminControllerMsg, subsys: &SubsystemHandle) {
// Simply forward to the proper ShellWorker (one worker per workdir).
if msg.event_id != EVENT_SHELL_EXEC {
Expand Down Expand Up @@ -174,8 +237,12 @@ impl AdminController {
let shell_worker_tx = wd_tracking.shell_worker_tx.as_ref().unwrap();

// Forward the message to the ShellWorker.
// TODO Error handling
shell_worker_tx.send(msg).await.unwrap();
let mut worker_msg = GenericChannelMsg::new();
worker_msg.event_id = EVENT_EXEC;
worker_msg.data_string = msg.data_string;
worker_msg.workdir_idx = Some(123); //msg.workdir_idx;
worker_msg.resp_channel = msg.resp_channel;
shell_worker_tx.send(worker_msg).await.unwrap();
}

async fn process_debug_print_msg(&mut self, msg: AdminControllerMsg) {
Expand Down Expand Up @@ -402,14 +469,13 @@ impl AdminController {
let events_writer_worker_params = EventsWriterWorkerParams::new(
self.globals.clone(),
events_writer_worker_rx,
Some(workdir_idx),
workdir_idx,
);
let events_writer_worker = EventsWriterWorker::new(events_writer_worker_params);
let nested = subsys.start(SubsystemBuilder::new("events-writer-worker", |a| {
events_writer_worker.run(a)
}));
wd_tracking.events_writer_worker_handle = Some(nested);
// TODO Detect "events" related config change and inform the worker about it.
}

// Remember the changes that were applied.
Expand All @@ -421,6 +487,9 @@ impl AdminController {
// Wait for a message.
if let Some(msg) = self.admctrl_rx.recv().await {
match msg.event_id {
EVENT_AUDIT => {
self.process_audit_msg(msg).await;
}
EVENT_DEBUG_PRINT => {
self.process_debug_print_msg(msg).await;
}
Expand Down
5 changes: 5 additions & 0 deletions rust/suibase/crates/suibase-daemon/src/api/def_methods.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;

use crate::workers::PackageTrackingState;

// Defines the JSON-RPC API.
//
// Design:
Expand Down Expand Up @@ -331,6 +333,8 @@ pub struct MoveConfig {
// Useful for tracking older package id for debug browsing.
#[serde(skip_serializing_if = "Vec::is_empty")]
pub older_packages: Vec<PackageInstance>,

pub tracking_state: u32, // Helpful for debugging.
}

impl MoveConfig {
Expand All @@ -339,6 +343,7 @@ impl MoveConfig {
path: None,
latest_package: None,
older_packages: Vec::new(),
tracking_state: PackageTrackingState::new().into(),
}
}
}
Expand Down
Loading

0 comments on commit e51cc55

Please sign in to comment.