From 19f01d327190d6166e54259d5c2f0629c034267a Mon Sep 17 00:00:00 2001 From: Taku Fukada Date: Thu, 14 Dec 2023 17:41:17 +0900 Subject: [PATCH] =?UTF-8?q?=E3=83=87=E3=83=BC=E3=82=BF=E3=83=91=E3=82=A4?= =?UTF-8?q?=E3=83=97=E3=83=A9=E3=82=A4=E3=83=B3=E3=81=AE=E6=A6=82=E5=BD=A2?= =?UTF-8?q?=E3=82=92=E4=BD=9C=E3=82=8B=20(#63)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See #44 データ処理パイプライン `[Source (Input)] ==> [Transform] ==> [Sink (Output)]` の概形を作りました。 - Source や Sink から Controller にフィードバック(エラー、進捗状況、etc.)を送る仕組みも実装してあります。 - パイプラインを停止する(ユーザなどがキャンセルする)仕組みも備えています。 - Source や Sink のための Configurations の仕組みのなんとなくの大枠を用意してありますが、具体的な実装はまだありません。 テストは、「パイプライン走らせて、途中でキャンセルする」ことだけしている。test coverage: Screenshot 2023-12-14 at 14 26 15 Closes: #44 --- .github/codecov.yml | 10 ++- Cargo.toml | 1 + README.md | 1 + nusamai/Cargo.toml | 12 +++ nusamai/src/configuration.rs | 30 +++++++ nusamai/src/lib.rs | 2 + nusamai/src/pipeline/feedback.rs | 74 +++++++++++++++++ nusamai/src/pipeline/mod.rs | 23 ++++++ nusamai/src/pipeline/runner.rs | 99 +++++++++++++++++++++++ nusamai/src/pipeline/sink.rs | 21 +++++ nusamai/src/pipeline/source.rs | 22 +++++ nusamai/src/pipeline/transform.rs | 5 ++ nusamai/tests/pipeline.rs | 128 ++++++++++++++++++++++++++++++ 13 files changed, 426 insertions(+), 2 deletions(-) create mode 100644 nusamai/Cargo.toml create mode 100644 nusamai/src/configuration.rs create mode 100644 nusamai/src/lib.rs create mode 100644 nusamai/src/pipeline/feedback.rs create mode 100644 nusamai/src/pipeline/mod.rs create mode 100644 nusamai/src/pipeline/runner.rs create mode 100644 nusamai/src/pipeline/sink.rs create mode 100644 nusamai/src/pipeline/source.rs create mode 100644 nusamai/src/pipeline/transform.rs create mode 100644 nusamai/tests/pipeline.rs diff --git a/.github/codecov.yml b/.github/codecov.yml index e97428649..839569035 100644 --- a/.github/codecov.yml +++ b/.github/codecov.yml @@ -16,10 +16,16 @@ comment: component_management: individual_components: - component_id: app - name: tauri-app + name: GUI paths: - app/** + + - component_id: core + name: Backend + paths: + - nusamai/** + - component_id: nusamai-* - name: nusamai-* + name: Libraries paths: - nusamai-*/** diff --git a/Cargo.toml b/Cargo.toml index c3a0e6813..e7409a9f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "nusamai-geojson", "nusamai-plateau", "nusamai-mvt", + "nusamai", "nusamai-projection", ] resolver = "2" diff --git a/README.md b/README.md index 1b0b7a2f6..d28c848ec 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ - アプリケーション: - [`app`](./app/) — Tauri による GUI アプリケーション + - [`nusamai`](./nusamai/) — アプリケーションのバックエンド (およびCLI実装?) - 基盤: - [`nusamai-geometry`](./nusamai-geometry/) — ジオメトリ型 - [`nusamai-plateau`](./nusamai-plateau/) — PLATEAU CityGML パーサ diff --git a/nusamai/Cargo.toml b/nusamai/Cargo.toml new file mode 100644 index 000000000..131ee0987 --- /dev/null +++ b/nusamai/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "nusamai" +version = "0.1.0" +edition = "2021" + +[dependencies] +indexmap = { version = "2.1.0", features = ["serde"] } +rayon = "1.8.0" +serde = { version = "1.0.193", features = ["derive"] } + +[dev-dependencies] +rand = "0.8.5" diff --git a/nusamai/src/configuration.rs b/nusamai/src/configuration.rs new file mode 100644 index 000000000..628ff9984 --- /dev/null +++ b/nusamai/src/configuration.rs @@ -0,0 +1,30 @@ +//! Configuration mechanism for Nusamai + +use indexmap::map::Entry; +use indexmap::IndexMap; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct Config { + items: IndexMap, +} + +impl Config { + pub fn add(&mut self, item: ConfigItem) { + match self.items.entry(item.key.clone()) { + Entry::Occupied(entry) => { + panic!("Configuration key={} already exists", entry.key()) + } + Entry::Vacant(entry) => { + entry.insert(item); + } + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ConfigItem { + pub key: String, + pub description: String, + // pub value: ... +} diff --git a/nusamai/src/lib.rs b/nusamai/src/lib.rs new file mode 100644 index 000000000..0b41113db --- /dev/null +++ b/nusamai/src/lib.rs @@ -0,0 +1,2 @@ +pub mod configuration; +pub mod pipeline; diff --git a/nusamai/src/pipeline/feedback.rs b/nusamai/src/pipeline/feedback.rs new file mode 100644 index 000000000..edf95a613 --- /dev/null +++ b/nusamai/src/pipeline/feedback.rs @@ -0,0 +1,74 @@ +//! Feedback messages from the pipeline components. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +const FEEDBACK_CHANNEL_BOUND: usize = 10000; + +#[derive(Debug)] +pub struct FeedbackMessage { + pub message: String, + // severity: + // progress: + // source: + // etc. +} + +#[derive(Clone)] +pub struct Feedback { + cancelled: Arc, + sender: std::sync::mpsc::SyncSender, +} + +impl Feedback { + /// Checks if the pipeline is requested to be cancelled + #[inline] + pub fn is_cancelled(&self) -> bool { + self.cancelled.load(Ordering::Relaxed) + } + + /// Send a message to the feedback channel + #[inline] + pub fn send(&self, msg: FeedbackMessage) { + // don't care if the receiver is dropped. + let _ = self.sender.send(msg); + } +} + +pub struct Watcher { + receiver: std::sync::mpsc::Receiver, +} + +impl IntoIterator for Watcher { + type Item = FeedbackMessage; + type IntoIter = std::sync::mpsc::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.receiver.into_iter() + } +} + +pub struct Canceller { + cancelled: Arc, +} + +impl Canceller { + /// Cancel the pipeline + pub fn cancel(&self) { + self.cancelled.store(true, Ordering::Relaxed); + } +} + +pub(crate) fn watcher() -> (Watcher, Feedback, Canceller) { + let cancelled = Arc::new(AtomicBool::new(false)); + let (sender, receiver) = std::sync::mpsc::sync_channel(FEEDBACK_CHANNEL_BOUND); + let watcher = Watcher { receiver }; + let canceller = Canceller { + cancelled: cancelled.clone(), + }; + let feedback = Feedback { + cancelled: cancelled.clone(), + sender, + }; + (watcher, feedback, canceller) +} diff --git a/nusamai/src/pipeline/mod.rs b/nusamai/src/pipeline/mod.rs new file mode 100644 index 000000000..a65826a17 --- /dev/null +++ b/nusamai/src/pipeline/mod.rs @@ -0,0 +1,23 @@ +pub mod feedback; +pub mod runner; +pub mod sink; +pub mod source; +pub mod transform; + +pub use feedback::*; +pub use runner::*; +pub use sink::*; +pub use source::*; +pub use transform::*; + +use std::sync::mpsc; + +pub type Sender = mpsc::SyncSender; +pub type Receiver = mpsc::Receiver; + +/// Message passing through pipeline stages +#[derive(Debug)] +pub struct Percel { + pub dummy_value: i32, + // pub cityobj: TopLevelCityObject +} diff --git a/nusamai/src/pipeline/runner.rs b/nusamai/src/pipeline/runner.rs new file mode 100644 index 000000000..8de8c963e --- /dev/null +++ b/nusamai/src/pipeline/runner.rs @@ -0,0 +1,99 @@ +use std::sync::mpsc::sync_channel; + +use rayon::prelude::*; + +use super::{ + feedback::{watcher, Feedback, Watcher}, + Canceller, +}; +use crate::pipeline::{Receiver, Sink, Source, Transformer}; + +const SOURCE_OUTPUT_CHANNEL_BOUND: usize = 10000; +const TRANSFORMER_OUTPUT_CHANNEL_BOUND: usize = 10000; + +fn start_source_thread( + mut source: Box, + feedback: Feedback, +) -> (std::thread::JoinHandle<()>, Receiver) { + let (sender, receiver) = sync_channel(SOURCE_OUTPUT_CHANNEL_BOUND); + let handle = std::thread::spawn(move || { + source.run(sender, &feedback); + }); + (handle, receiver) +} + +fn start_transformer_thread( + transformer: Box, + upstream_receiver: Receiver, + feedback: Feedback, +) -> (std::thread::JoinHandle<()>, Receiver) { + let (sender, receiver) = sync_channel(TRANSFORMER_OUTPUT_CHANNEL_BOUND); + let handle = std::thread::spawn(move || { + let _ = upstream_receiver + .into_iter() + .par_bridge() + .try_for_each(|mut obj| { + transformer.transform(&mut obj, &feedback); + if sender.send(obj).is_err() || feedback.is_cancelled() { + Err(()) + } else { + Ok(()) + } + }); + }); + (handle, receiver) +} + +fn start_sink_thread( + mut sink: Box, + receiver: Receiver, + mut feedback: Feedback, +) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || { + for obj in receiver { + if feedback.is_cancelled() { + break; + } + sink.feed(obj, &mut feedback); + } + }) +} + +pub struct PipelineHandle { + source_thread: std::thread::JoinHandle<()>, + transformer_thread: std::thread::JoinHandle<()>, + sink_thread: std::thread::JoinHandle<()>, +} + +impl PipelineHandle { + // Wait for the pipeline to finish + pub fn join(self) { + self.source_thread.join().unwrap(); + self.transformer_thread.join().unwrap(); + self.sink_thread.join().unwrap(); + } +} + +/// Run the pipeline +/// +/// `[Source] ==> [Transformer] ==> [Sink]` +pub fn run( + source: Box, + transformer: Box, + sink: Box, +) -> (PipelineHandle, Watcher, Canceller) { + let (watcher, feedback, canceller) = watcher(); + + // Start the pipeline + let (source_thread, source_receiver) = start_source_thread(source, feedback.clone()); + let (transformer_thread, transformer_receiver) = + start_transformer_thread(transformer, source_receiver, feedback.clone()); + let sink_thread = start_sink_thread(sink, transformer_receiver, feedback.clone()); + + let handle = PipelineHandle { + source_thread, + transformer_thread, + sink_thread, + }; + (handle, watcher, canceller) +} diff --git a/nusamai/src/pipeline/sink.rs b/nusamai/src/pipeline/sink.rs new file mode 100644 index 000000000..9a530edc8 --- /dev/null +++ b/nusamai/src/pipeline/sink.rs @@ -0,0 +1,21 @@ +use crate::configuration::Config; +use crate::pipeline::{Feedback, Percel}; + +pub struct SinkInfo { + pub name: String, +} + +pub trait SinkProvider { + /// Creates a sink instance. + fn create(&self, config: &Config) -> Box; + + /// Gets basic information about the sink. + fn info(&self) -> SinkInfo; + + /// Gets the configurable parameters of the sink. + fn config(&self) -> Config; +} + +pub trait Sink: Send { + fn feed(&mut self, obj: Percel, feedback: &mut Feedback); +} diff --git a/nusamai/src/pipeline/source.rs b/nusamai/src/pipeline/source.rs new file mode 100644 index 000000000..c6de72c47 --- /dev/null +++ b/nusamai/src/pipeline/source.rs @@ -0,0 +1,22 @@ +use crate::configuration::Config; +use crate::pipeline::{Feedback, Sender}; + +pub struct SourceInfo { + pub name: String, + // ... +} + +pub trait SourceProvider { + /// Creates a source instance. + fn create(&self, config: &Config) -> Box; + + /// Gets basic information about the sink. + fn info(&self) -> SourceInfo; + + /// Gets the configurable parameters of the source. + fn config(&self) -> Config; +} + +pub trait Source: Send { + fn run(&mut self, sink: Sender, feedback: &Feedback); +} diff --git a/nusamai/src/pipeline/transform.rs b/nusamai/src/pipeline/transform.rs new file mode 100644 index 000000000..b760bccb0 --- /dev/null +++ b/nusamai/src/pipeline/transform.rs @@ -0,0 +1,5 @@ +use super::{feedback, Percel}; + +pub trait Transformer: Send + Sync { + fn transform(&self, obj: &mut Percel, feedback: &feedback::Feedback); +} diff --git a/nusamai/tests/pipeline.rs b/nusamai/tests/pipeline.rs new file mode 100644 index 000000000..3686e5c17 --- /dev/null +++ b/nusamai/tests/pipeline.rs @@ -0,0 +1,128 @@ +use nusamai::configuration::Config; +use nusamai::pipeline; +use nusamai::pipeline::{ + feedback, Feedback, FeedbackMessage, Percel, Sender, Sink, SinkInfo, SinkProvider, Source, + SourceInfo, SourceProvider, Transformer, +}; +use rand::prelude::*; + +pub struct DummySourceProvider {} + +impl SourceProvider for DummySourceProvider { + fn create(&self, _config: &Config) -> Box { + Box::new(DummySource {}) + } + + fn info(&self) -> SourceInfo { + SourceInfo { + name: "Dummy Source".to_string(), + } + } + + fn config(&self) -> Config { + Config::default() + } +} + +pub struct DummySource {} + +impl Source for DummySource { + fn run(&mut self, sink: Sender, feedback: &Feedback) { + for i in 0..100 { + if feedback.is_cancelled() { + break; + } + std::thread::sleep(std::time::Duration::from_millis( + (5.0 + random::() * 10.0) as u64, + )); + let obj = Percel { dummy_value: i }; + feedback.send(FeedbackMessage { + message: format!("generating: {:?}", obj), + }); + sink.send(obj).unwrap(); + } + } +} + +struct DummyTransformer {} + +impl Transformer for DummyTransformer { + fn transform(&self, obj: &mut Percel, feedback: &feedback::Feedback) { + std::thread::sleep(std::time::Duration::from_millis( + (5.0 + random::() * 10.0) as u64, + )); + obj.dummy_value *= 5; + feedback.send(FeedbackMessage { + message: format!("transformed object: {:?}", obj), + }) + } +} + +struct DummySinkProvider {} + +impl SinkProvider for DummySinkProvider { + fn create(&self, _config: &Config) -> Box { + Box::new(DummySink {}) + } + + fn info(&self) -> SinkInfo { + SinkInfo { + name: "Dummy Sink".to_string(), + } + } + + fn config(&self) -> Config { + Config::default() + } +} + +struct DummySink {} + +impl Sink for DummySink { + fn feed(&mut self, percel: Percel, feedback: &mut Feedback) { + std::thread::sleep(std::time::Duration::from_millis( + (5.0 + random::() * 20.0) as u64, + )); + feedback.send(FeedbackMessage { + message: format!("dummy sink received: {:?}", percel), + }) + } +} + +#[test] +fn test_run_pipeline() { + let input_driver_factory: Box = Box::new(DummySourceProvider {}); + let output_driver_factory: Box = Box::new(DummySinkProvider {}); + + let input_driver = input_driver_factory.create(&input_driver_factory.config()); + let transformer = Box::new(DummyTransformer {}); + let output_driver = output_driver_factory.create(&input_driver_factory.config()); + + // start the pipeline + let (handle, watcher, canceller) = pipeline::run(input_driver, transformer, output_driver); + + std::thread::scope(|scope| { + // cancel the pipeline + scope.spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(300)); + canceller.cancel(); + }); + // log watcher + let watcher_handle = scope.spawn(move || { + let mut sink_counter = 0; + for msg in watcher { + println!("Feedback message from the pipeline: {:?}", msg); + if msg.message.contains("dummy sink") { + sink_counter += 1; + } + } + sink_counter + }); + let sink_counter = watcher_handle.join().unwrap(); + assert!(sink_counter > 10); // sink should receive more than 10 objects + assert!(sink_counter < 80); // pipeline should be cancelled before 50 + }); + + // wait for the pipeline to finish + handle.join(); +}