Skip to content

Commit

Permalink
データパイプラインの概形を作る (#63)
Browse files Browse the repository at this point in the history
See #44 

データ処理パイプライン `[Source (Input)] ==> [Transform] ==> [Sink (Output)]`
の概形を作りました。

- Source や Sink から Controller にフィードバック(エラー、進捗状況、etc.)を送る仕組みも実装してあります。
- パイプラインを停止する(ユーザなどがキャンセルする)仕組みも備えています。
- Source や Sink のための Configurations
の仕組みのなんとなくの大枠を用意してありますが、具体的な実装はまだありません。

テストは、「パイプライン走らせて、途中でキャンセルする」ことだけしている。test coverage:

<img width="356" alt="Screenshot 2023-12-14 at 14 26 15"
src="https://github.com/MIERUNE/nusamai/assets/5351911/e9ab9324-e770-4803-af32-25e529afa7ee">

Closes: #44
  • Loading branch information
ciscorn committed Dec 27, 2023
1 parent 3821410 commit 19f01d3
Show file tree
Hide file tree
Showing 13 changed files with 426 additions and 2 deletions.
10 changes: 8 additions & 2 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ comment:
component_management:
individual_components:
- component_id: app
name: tauri-app
name: GUI
paths:
- app/**

- component_id: core
name: Backend
paths:
- nusamai/**

- component_id: nusamai-*
name: nusamai-*
name: Libraries
paths:
- nusamai-*/**
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"nusamai-geojson",
"nusamai-plateau",
"nusamai-mvt",
"nusamai",
"nusamai-projection",
]
resolver = "2"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- アプリケーション:
- [`app`](./app/) &mdash; Tauri による GUI アプリケーション
- [`nusamai`](./nusamai/) &mdash; アプリケーションのバックエンド (およびCLI実装?)
- 基盤:
- [`nusamai-geometry`](./nusamai-geometry/) &mdash; ジオメトリ型
- [`nusamai-plateau`](./nusamai-plateau/) &mdash; PLATEAU CityGML パーサ
Expand Down
12 changes: 12 additions & 0 deletions nusamai/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "nusamai"
version = "0.1.0"
edition = "2021"

[dependencies]
indexmap = { version = "2.1.0", features = ["serde"] }
rayon = "1.8.0"
serde = { version = "1.0.193", features = ["derive"] }

[dev-dependencies]
rand = "0.8.5"
30 changes: 30 additions & 0 deletions nusamai/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! Configuration mechanism for Nusamai
use indexmap::map::Entry;
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Config {
items: IndexMap<String, ConfigItem>,
}

impl Config {
pub fn add(&mut self, item: ConfigItem) {
match self.items.entry(item.key.clone()) {
Entry::Occupied(entry) => {
panic!("Configuration key={} already exists", entry.key())
}
Entry::Vacant(entry) => {
entry.insert(item);
}
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ConfigItem {
pub key: String,
pub description: String,
// pub value: ...
}
2 changes: 2 additions & 0 deletions nusamai/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod configuration;
pub mod pipeline;
74 changes: 74 additions & 0 deletions nusamai/src/pipeline/feedback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! Feedback messages from the pipeline components.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

const FEEDBACK_CHANNEL_BOUND: usize = 10000;

#[derive(Debug)]
pub struct FeedbackMessage {
pub message: String,
// severity:
// progress:
// source:
// etc.
}

#[derive(Clone)]
pub struct Feedback {
cancelled: Arc<AtomicBool>,
sender: std::sync::mpsc::SyncSender<FeedbackMessage>,
}

impl Feedback {
/// Checks if the pipeline is requested to be cancelled
#[inline]
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}

/// Send a message to the feedback channel
#[inline]
pub fn send(&self, msg: FeedbackMessage) {
// don't care if the receiver is dropped.
let _ = self.sender.send(msg);
}
}

pub struct Watcher {
receiver: std::sync::mpsc::Receiver<FeedbackMessage>,
}

impl IntoIterator for Watcher {
type Item = FeedbackMessage;
type IntoIter = std::sync::mpsc::IntoIter<FeedbackMessage>;

fn into_iter(self) -> Self::IntoIter {
self.receiver.into_iter()
}
}

pub struct Canceller {
cancelled: Arc<AtomicBool>,
}

impl Canceller {
/// Cancel the pipeline
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}
}

pub(crate) fn watcher() -> (Watcher, Feedback, Canceller) {
let cancelled = Arc::new(AtomicBool::new(false));
let (sender, receiver) = std::sync::mpsc::sync_channel(FEEDBACK_CHANNEL_BOUND);
let watcher = Watcher { receiver };
let canceller = Canceller {
cancelled: cancelled.clone(),
};
let feedback = Feedback {
cancelled: cancelled.clone(),
sender,
};
(watcher, feedback, canceller)
}
23 changes: 23 additions & 0 deletions nusamai/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
pub mod feedback;
pub mod runner;
pub mod sink;
pub mod source;
pub mod transform;

pub use feedback::*;
pub use runner::*;
pub use sink::*;
pub use source::*;
pub use transform::*;

use std::sync::mpsc;

pub type Sender = mpsc::SyncSender<Percel>;
pub type Receiver = mpsc::Receiver<Percel>;

/// Message passing through pipeline stages
#[derive(Debug)]
pub struct Percel {
pub dummy_value: i32,
// pub cityobj: TopLevelCityObject
}
99 changes: 99 additions & 0 deletions nusamai/src/pipeline/runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::mpsc::sync_channel;

use rayon::prelude::*;

use super::{
feedback::{watcher, Feedback, Watcher},
Canceller,
};
use crate::pipeline::{Receiver, Sink, Source, Transformer};

const SOURCE_OUTPUT_CHANNEL_BOUND: usize = 10000;
const TRANSFORMER_OUTPUT_CHANNEL_BOUND: usize = 10000;

fn start_source_thread(
mut source: Box<dyn Source>,
feedback: Feedback,
) -> (std::thread::JoinHandle<()>, Receiver) {
let (sender, receiver) = sync_channel(SOURCE_OUTPUT_CHANNEL_BOUND);
let handle = std::thread::spawn(move || {
source.run(sender, &feedback);
});
(handle, receiver)
}

fn start_transformer_thread(
transformer: Box<dyn Transformer>,
upstream_receiver: Receiver,
feedback: Feedback,
) -> (std::thread::JoinHandle<()>, Receiver) {
let (sender, receiver) = sync_channel(TRANSFORMER_OUTPUT_CHANNEL_BOUND);
let handle = std::thread::spawn(move || {
let _ = upstream_receiver
.into_iter()
.par_bridge()
.try_for_each(|mut obj| {
transformer.transform(&mut obj, &feedback);
if sender.send(obj).is_err() || feedback.is_cancelled() {
Err(())
} else {
Ok(())
}
});
});
(handle, receiver)
}

fn start_sink_thread(
mut sink: Box<dyn Sink>,
receiver: Receiver,
mut feedback: Feedback,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
for obj in receiver {
if feedback.is_cancelled() {
break;
}
sink.feed(obj, &mut feedback);
}
})
}

pub struct PipelineHandle {
source_thread: std::thread::JoinHandle<()>,
transformer_thread: std::thread::JoinHandle<()>,
sink_thread: std::thread::JoinHandle<()>,
}

impl PipelineHandle {
// Wait for the pipeline to finish
pub fn join(self) {
self.source_thread.join().unwrap();
self.transformer_thread.join().unwrap();
self.sink_thread.join().unwrap();
}
}

/// Run the pipeline
///
/// `[Source] ==> [Transformer] ==> [Sink]`
pub fn run(
source: Box<dyn Source>,
transformer: Box<dyn Transformer>,
sink: Box<dyn Sink>,
) -> (PipelineHandle, Watcher, Canceller) {
let (watcher, feedback, canceller) = watcher();

// Start the pipeline
let (source_thread, source_receiver) = start_source_thread(source, feedback.clone());
let (transformer_thread, transformer_receiver) =
start_transformer_thread(transformer, source_receiver, feedback.clone());
let sink_thread = start_sink_thread(sink, transformer_receiver, feedback.clone());

let handle = PipelineHandle {
source_thread,
transformer_thread,
sink_thread,
};
(handle, watcher, canceller)
}
21 changes: 21 additions & 0 deletions nusamai/src/pipeline/sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::configuration::Config;
use crate::pipeline::{Feedback, Percel};

pub struct SinkInfo {
pub name: String,
}

pub trait SinkProvider {
/// Creates a sink instance.
fn create(&self, config: &Config) -> Box<dyn Sink>;

/// Gets basic information about the sink.
fn info(&self) -> SinkInfo;

/// Gets the configurable parameters of the sink.
fn config(&self) -> Config;
}

pub trait Sink: Send {
fn feed(&mut self, obj: Percel, feedback: &mut Feedback);
}
22 changes: 22 additions & 0 deletions nusamai/src/pipeline/source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::configuration::Config;
use crate::pipeline::{Feedback, Sender};

pub struct SourceInfo {
pub name: String,
// ...
}

pub trait SourceProvider {
/// Creates a source instance.
fn create(&self, config: &Config) -> Box<dyn Source>;

/// Gets basic information about the sink.
fn info(&self) -> SourceInfo;

/// Gets the configurable parameters of the source.
fn config(&self) -> Config;
}

pub trait Source: Send {
fn run(&mut self, sink: Sender, feedback: &Feedback);
}
5 changes: 5 additions & 0 deletions nusamai/src/pipeline/transform.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use super::{feedback, Percel};

pub trait Transformer: Send + Sync {
fn transform(&self, obj: &mut Percel, feedback: &feedback::Feedback);
}
Loading

0 comments on commit 19f01d3

Please sign in to comment.