Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

データパイプラインの概形を作る #63

Merged
merged 6 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ comment:
component_management:
individual_components:
- component_id: app
name: tauri-app
name: GUI
paths:
- app/**

- component_id: core
name: Backend
paths:
- nusamai/**

- component_id: nusamai-*
name: nusamai-*
name: Libraries
paths:
- nusamai-*/**
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"nusamai-geojson",
"nusamai-plateau",
"nusamai-mvt",
"nusamai",
"nusamai-projection",
]
resolver = "2"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Notion: [BRIDGE 都市デジタルツイン・GISコンバータの開発](https

- アプリケーション:
- [`app`](./app/) — Tauri による GUI アプリケーション
- [`nusamai`](./nusamai/) — アプリケーションのバックエンド (およびCLI実装?)
- 基盤:
- [`nusamai-geometry`](./nusamai-geometry/) — ジオメトリ型
- [`nusamai-plateau`](./nusamai-plateau/) — PLATEAU CityGML パーサ
Expand Down
12 changes: 12 additions & 0 deletions nusamai/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "nusamai"
version = "0.1.0"
edition = "2021"

[dependencies]
indexmap = { version = "2.1.0", features = ["serde"] }
rayon = "1.8.0"
serde = { version = "1.0.193", features = ["derive"] }

[dev-dependencies]
rand = "0.8.5"
30 changes: 30 additions & 0 deletions nusamai/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! Configuration mechanism for Nusamai

use indexmap::map::Entry;
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Config {
items: IndexMap<String, ConfigItem>,
}

impl Config {
pub fn add(&mut self, item: ConfigItem) {
match self.items.entry(item.key.clone()) {
Entry::Occupied(entry) => {
panic!("Configuration key={} already exists", entry.key())

Check warning on line 16 in nusamai/src/configuration.rs

View check run for this annotation

Codecov / codecov/patch

nusamai/src/configuration.rs#L13-L16

Added lines #L13 - L16 were not covered by tests
}
Entry::Vacant(entry) => {
entry.insert(item);
}
}
}

Check warning on line 22 in nusamai/src/configuration.rs

View check run for this annotation

Codecov / codecov/patch

nusamai/src/configuration.rs#L18-L22

Added lines #L18 - L22 were not covered by tests
}

#[derive(Debug, Serialize, Deserialize)]

Check warning on line 25 in nusamai/src/configuration.rs

View check run for this annotation

Codecov / codecov/patch

nusamai/src/configuration.rs#L25

Added line #L25 was not covered by tests
pub struct ConfigItem {
pub key: String,
pub description: String,
// pub value: ...
}
2 changes: 2 additions & 0 deletions nusamai/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod configuration;
pub mod pipeline;
74 changes: 74 additions & 0 deletions nusamai/src/pipeline/feedback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! Feedback messages from the pipeline components.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

const FEEDBACK_CHANNEL_BOUND: usize = 10000;

#[derive(Debug)]
pub struct FeedbackMessage {
pub message: String,
// severity:
// progress:
// source:
// etc.
}

#[derive(Clone)]
pub struct Feedback {
cancelled: Arc<AtomicBool>,
sender: std::sync::mpsc::SyncSender<FeedbackMessage>,
}

impl Feedback {
/// Checks if the pipeline is requested to be cancelled
#[inline]
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}

/// Send a message to the feedback channel
#[inline]
pub fn send(&self, msg: FeedbackMessage) {
// don't care if the receiver is dropped.
let _ = self.sender.send(msg);
}
}

pub struct Watcher {
receiver: std::sync::mpsc::Receiver<FeedbackMessage>,
}

impl IntoIterator for Watcher {
type Item = FeedbackMessage;
type IntoIter = std::sync::mpsc::IntoIter<FeedbackMessage>;

fn into_iter(self) -> Self::IntoIter {
self.receiver.into_iter()
}
}

pub struct Canceller {
cancelled: Arc<AtomicBool>,
}

impl Canceller {
/// Cancel the pipeline
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}
}

pub(crate) fn watcher() -> (Watcher, Feedback, Canceller) {
let cancelled = Arc::new(AtomicBool::new(false));
let (sender, receiver) = std::sync::mpsc::sync_channel(FEEDBACK_CHANNEL_BOUND);
let watcher = Watcher { receiver };
let canceller = Canceller {
cancelled: cancelled.clone(),
};
let feedback = Feedback {
cancelled: cancelled.clone(),
sender,
};
(watcher, feedback, canceller)
}
23 changes: 23 additions & 0 deletions nusamai/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
pub mod feedback;
pub mod runner;
pub mod sink;
pub mod source;
pub mod transform;

pub use feedback::*;
pub use runner::*;
pub use sink::*;
pub use source::*;
pub use transform::*;

use std::sync::mpsc;

pub type Sender = mpsc::SyncSender<Percel>;
pub type Receiver = mpsc::Receiver<Percel>;

/// Message passing through pipeline stages
#[derive(Debug)]
pub struct Percel {
pub dummy_value: i32,
// pub cityobj: TopLevelCityObject
}
ciscorn marked this conversation as resolved.
Show resolved Hide resolved
99 changes: 99 additions & 0 deletions nusamai/src/pipeline/runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::mpsc::sync_channel;

use rayon::prelude::*;

use super::{
feedback::{watcher, Feedback, Watcher},
Canceller,
};
use crate::pipeline::{Receiver, Sink, Source, Transformer};

const SOURCE_OUTPUT_CHANNEL_BOUND: usize = 10000;
const TRANSFORMER_OUTPUT_CHANNEL_BOUND: usize = 10000;

fn start_source_thread(
mut source: Box<dyn Source>,
feedback: Feedback,
) -> (std::thread::JoinHandle<()>, Receiver) {
let (sender, receiver) = sync_channel(SOURCE_OUTPUT_CHANNEL_BOUND);
let handle = std::thread::spawn(move || {
source.run(sender, &feedback);
});
(handle, receiver)
}

fn start_transformer_thread(
transformer: Box<dyn Transformer>,
upstream_receiver: Receiver,
feedback: Feedback,
) -> (std::thread::JoinHandle<()>, Receiver) {
let (sender, receiver) = sync_channel(TRANSFORMER_OUTPUT_CHANNEL_BOUND);
let handle = std::thread::spawn(move || {
let _ = upstream_receiver
.into_iter()
.par_bridge()
.try_for_each(|mut obj| {
transformer.transform(&mut obj, &feedback);
if sender.send(obj).is_err() || feedback.is_cancelled() {
Err(())
} else {
Ok(())
}
});
});
(handle, receiver)
}

fn start_sink_thread(
mut sink: Box<dyn Sink>,
receiver: Receiver,
mut feedback: Feedback,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
for obj in receiver {
if feedback.is_cancelled() {
break;
}
sink.feed(obj, &mut feedback);
}
})
}

pub struct PipelineHandle {
source_thread: std::thread::JoinHandle<()>,
transformer_thread: std::thread::JoinHandle<()>,
sink_thread: std::thread::JoinHandle<()>,
}

impl PipelineHandle {
// Wait for the pipeline to finish
pub fn join(self) {
self.source_thread.join().unwrap();
self.transformer_thread.join().unwrap();
self.sink_thread.join().unwrap();
}
}

/// Run the pipeline
///
/// `[Source] ==> [Transformer] ==> [Sink]`
pub fn run(
source: Box<dyn Source>,
transformer: Box<dyn Transformer>,
sink: Box<dyn Sink>,
) -> (PipelineHandle, Watcher, Canceller) {
let (watcher, feedback, canceller) = watcher();

// Start the pipeline
let (source_thread, source_receiver) = start_source_thread(source, feedback.clone());
let (transformer_thread, transformer_receiver) =
start_transformer_thread(transformer, source_receiver, feedback.clone());
let sink_thread = start_sink_thread(sink, transformer_receiver, feedback.clone());

let handle = PipelineHandle {
source_thread,
transformer_thread,
sink_thread,
};
(handle, watcher, canceller)
}
21 changes: 21 additions & 0 deletions nusamai/src/pipeline/sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::configuration::Config;
use crate::pipeline::{Feedback, Percel};

pub struct SinkInfo {
pub name: String,
}

pub trait SinkProvider {
/// Creates a sink instance.
fn create(&self, config: &Config) -> Box<dyn Sink>;

/// Gets basic information about the sink.
fn info(&self) -> SinkInfo;

/// Gets the configurable parameters of the sink.
fn config(&self) -> Config;
}

pub trait Sink: Send {
fn feed(&mut self, obj: Percel, feedback: &mut Feedback);
}
22 changes: 22 additions & 0 deletions nusamai/src/pipeline/source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::configuration::Config;
use crate::pipeline::{Feedback, Sender};

pub struct SourceInfo {
pub name: String,
// ...
}

pub trait SourceProvider {
/// Creates a source instance.
fn create(&self, config: &Config) -> Box<dyn Source>;

/// Gets basic information about the sink.
fn info(&self) -> SourceInfo;

/// Gets the configurable parameters of the source.
fn config(&self) -> Config;
}

pub trait Source: Send {
fn run(&mut self, sink: Sender, feedback: &Feedback);
}
5 changes: 5 additions & 0 deletions nusamai/src/pipeline/transform.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use super::{feedback, Percel};

pub trait Transformer: Send + Sync {
fn transform(&self, obj: &mut Percel, feedback: &feedback::Feedback);
}
Loading