From abdfda210a8dc66736df6e2b3bc11a8edb11fb65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Sat, 7 Jul 2018 15:11:01 +0200 Subject: [PATCH] [api] use rain_core in rust client --- Cargo.lock | 22 +++ Cargo.toml | 3 + rain_client/Cargo.toml | 31 ++++ {src => rain_client/src}/client/client.rs | 5 +- .../src}/client/communicator.rs | 35 +++-- {src => rain_client/src}/client/dataobject.rs | 12 +- rain_client/src/client/localcluster.rs | 49 +++++++ {src => rain_client/src}/client/mod.rs | 2 + rain_client/src/client/rpc.rs | 46 ++++++ {src => rain_client/src}/client/session.rs | 79 +++++----- rain_client/src/client/task.rs | 18 +++ {src => rain_client/src}/client/tasks.rs | 24 +-- rain_client/src/lib.rs | 12 ++ rain_client_test/Cargo.toml | 10 ++ rain_client_test/src/main.rs | 27 ++++ rain_core/Cargo.toml | 1 + rain_core/src/comm/executor.rs | 2 +- rain_core/src/comm/mod.rs | 2 + rain_core/src/comm/rpc.rs | 19 +++ rain_core/src/lib.rs | 3 +- rain_server/src/common/connection.rs | 2 +- rain_server/src/governor/graph/dataobj.rs | 2 +- rain_server/src/governor/rpc/control.rs | 2 +- rain_server/src/governor/rpc/executor.rs | 2 +- rain_server/src/governor/rpc/fetch.rs | 4 +- rain_server/src/governor/state.rs | 4 +- rain_server/src/main.rs | 11 +- rain_server/src/server/http.rs | 4 +- .../src/server/logging/sqlite_logger.rs | 2 +- rain_server/src/server/rpc/client.rs | 1 + rain_server/src/start/common.rs | 33 ----- rain_server/src/start/mod.rs | 1 - rain_server/src/start/starter.rs | 6 - rain_task/src/macros.rs | 3 +- rain_task/src/tests.rs | 137 ++++++++++-------- src/client/rpc.rs | 82 ----------- src/client/task.rs | 27 ---- src/start/localcluster.rs | 55 ------- 38 files changed, 423 insertions(+), 357 deletions(-) create mode 100644 rain_client/Cargo.toml rename {src => rain_client/src}/client/client.rs (94%) rename {src => rain_client/src}/client/communicator.rs (83%) rename {src => rain_client/src}/client/dataobject.rs (50%) create mode 100644 rain_client/src/client/localcluster.rs rename {src => rain_client/src}/client/mod.rs (73%) create mode 100644 rain_client/src/client/rpc.rs rename {src => rain_client/src}/client/session.rs (68%) create mode 100644 rain_client/src/client/task.rs rename {src => rain_client/src}/client/tasks.rs (64%) create mode 100644 rain_client/src/lib.rs create mode 100644 rain_client_test/Cargo.toml create mode 100644 rain_client_test/src/main.rs create mode 100644 rain_core/src/comm/rpc.rs delete mode 100644 src/client/rpc.rs delete mode 100644 src/client/task.rs delete mode 100644 src/start/localcluster.rs diff --git a/Cargo.lock b/Cargo.lock index 1c1fd29..28716e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,6 +555,27 @@ dependencies = [ "proc-macro2 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rain_client" +version = "0.3.0" +dependencies = [ + "capnp-rpc 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rain_core 0.3.0", + "serde 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rain_client_test" +version = "0.3.0" +dependencies = [ + "rain_client 0.3.0", +] + [[package]] name = "rain_core" version = "0.3.0" @@ -577,6 +598,7 @@ dependencies = [ "serde_derive 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index 7683228..047ef1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,8 @@ version = "0.3.0" [workspace] members = [ + "rain_client", + "rain_client_test", "rain_core", "rain_server", "rain_task", @@ -10,5 +12,6 @@ members = [ ] [patch.crates-io] +rain_client = { path = "rain_client" } rain_core = { path = "rain_core" } rain_task = { path = "rain_task" } diff --git a/rain_client/Cargo.toml b/rain_client/Cargo.toml new file mode 100644 index 0000000..cc49275 --- /dev/null +++ b/rain_client/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "rain_client" +version = "0.3.0" + +description = "Distributed computational framework for large-scale task-based pipelines. Client library in Rust." +# documentation = "https://docs.rs/rain_task/" # default docs.rs +homepage = "https://github.com/substantic/rain" +repository = "https://github.com/substantic/rain/" +readme = "README.md" +authors = [ + "Stanislav Bohm ", + "Tomas Gavenciak ", + "Vojtech Cima ", + ] +license = "MIT" + +exclude = ["testing/**/*"] + +[badges] +travis-ci = { repository = "substantic/rain", branch = "master" } +maintenance = { status = "actively-developed" } + +[dependencies] +capnp-rpc = "0.8" +error-chain = "0.11" +futures = "0.1" +log = "0.4" +rain_core = "0.3.0" +tokio-core = "0.1" +serde = "1" +serde_json = "1" diff --git a/src/client/client.rs b/rain_client/src/client/client.rs similarity index 94% rename from src/client/client.rs rename to rain_client/src/client/client.rs index 6d46e3c..267af9c 100644 --- a/src/client/client.rs +++ b/rain_client/src/client/client.rs @@ -1,9 +1,9 @@ use std::error::Error; use std::net::SocketAddr; -use CLIENT_PROTOCOL_VERSION; -use super::session::Session; use super::communicator::Communicator; +use super::session::Session; +use rain_core::CLIENT_PROTOCOL_VERSION; use std::rc::Rc; pub struct Client { @@ -26,3 +26,4 @@ impl Client { self.comm.terminate_server() } } + diff --git a/src/client/communicator.rs b/rain_client/src/client/communicator.rs similarity index 83% rename from src/client/communicator.rs rename to rain_client/src/client/communicator.rs index c1cd29a..ba3ba95 100644 --- a/src/client/communicator.rs +++ b/rain_client/src/client/communicator.rs @@ -1,21 +1,21 @@ -use tokio_core::reactor::Core; -use std::net::SocketAddr; -use tokio_core::net::TcpStream; -use std::error::Error; -use common::rpc::new_rpc_system; use capnp_rpc::rpc_twoparty_capnp; use futures::Future; +use rain_core::comm::new_rpc_system; +use std::error::Error; +use std::net::SocketAddr; +use tokio_core::net::TcpStream; +use tokio_core::reactor::Core; use super::task::Task; -use common::id::{DataObjectId, TaskId}; -use common::convert::{FromCapnp, ToCapnp}; use client::dataobject::DataObject; -use std::cell::RefCell; -use std::cell::RefMut; +use rain_core::types::{DataObjectId, TaskId}; +use rain_core::utils::{FromCapnp, ToCapnp}; +use rain_core::{client_capnp, common_capnp, server_capnp}; +use std::cell::{RefCell, RefMut}; pub struct Communicator { core: RefCell, - service: ::client_capnp::client_service::Client, + service: client_capnp::client_service::Client, } impl Communicator { @@ -28,7 +28,7 @@ impl Communicator { debug!("Connection to server {} established", scheduler); let mut rpc = Box::new(new_rpc_system(stream, None)); - let bootstrap: ::server_capnp::server_bootstrap::Client = + let bootstrap: server_capnp::server_bootstrap::Client = rpc.bootstrap(rpc_twoparty_capnp::Side::Server); handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err))); @@ -116,19 +116,28 @@ impl Communicator { )) } - pub fn fetch(&self, object_id: DataObjectId) -> Result, Box> { + pub fn fetch(&self, object_id: &DataObjectId) -> Result, Box> { let mut req = self.service.fetch_request(); object_id.to_capnp(&mut req.get().get_id().unwrap()); req.get().set_size(1024); let response = self.comm().run(req.send().promise)?; + // TODO: handle error states let reader = response.get()?; match reader.get_status().which()? { - ::common_capnp::fetch_result::status::Ok(()) => { + common_capnp::fetch_result::status::Ok(()) => { let data = reader.get_data()?; Ok(Vec::from(data)) } + common_capnp::fetch_result::status::Removed(()) => { + print!("Removed"); + Ok(vec![]) + } + common_capnp::fetch_result::status::Error(err) => { + print!("Error: {:?}", err.unwrap().get_message()); + Ok(vec![]) + } _ => bail!("Non-ok status"), } } diff --git a/src/client/dataobject.rs b/rain_client/src/client/dataobject.rs similarity index 50% rename from src/client/dataobject.rs rename to rain_client/src/client/dataobject.rs index 0fdc643..de19c64 100644 --- a/src/client/dataobject.rs +++ b/rain_client/src/client/dataobject.rs @@ -1,19 +1,17 @@ -use common::Attributes; -use common::id::DataObjectId; -use common::DataType; +use rain_core::types::{DataObjectId, ObjectSpec}; use std::cell::Cell; pub struct DataObject { - pub id: DataObjectId, - pub label: String, pub keep: Cell, pub data: Option>, - pub attributes: Attributes, - pub data_type: DataType, + pub spec: ObjectSpec, } impl DataObject { pub fn keep(&self) { self.keep.set(true); } + pub fn id(&self) -> DataObjectId { + self.spec.id + } } diff --git a/rain_client/src/client/localcluster.rs b/rain_client/src/client/localcluster.rs new file mode 100644 index 0000000..01c92c4 --- /dev/null +++ b/rain_client/src/client/localcluster.rs @@ -0,0 +1,49 @@ +use std::error::Error; +use std::net::SocketAddr; +use super::client::Client; +use std::process::{Child, Command}; +use std::path::PathBuf; +use std::{thread, time}; +use std::process::Stdio; + +pub struct LocalCluster { + listen_addr: SocketAddr, + binary: PathBuf +} + +impl LocalCluster { + pub fn new(binary: &str) -> Result> { + let mut cluster = LocalCluster { + binary: PathBuf::from(binary), + listen_addr: SocketAddr::new("127.0.0.1".parse()?, 7210) + }; + cluster.start()?; + + Ok(cluster) + } + + fn start(&mut self) -> Result<(), Box> { + Command::new(&self.binary) + .arg("start") + .arg("--listen") + .arg(self.listen_addr.to_string()) + .arg("--simple") + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn()? + .wait()?; + + Ok(()) + } + + pub fn create_client(&self) -> Result> { + Client::new(self.listen_addr) + } +} + +impl Drop for LocalCluster { + fn drop(&mut self) { + Client::new(self.listen_addr).unwrap().terminate_server(); + } +} diff --git a/src/client/mod.rs b/rain_client/src/client/mod.rs similarity index 73% rename from src/client/mod.rs rename to rain_client/src/client/mod.rs index 975fed6..d40fe1d 100644 --- a/src/client/mod.rs +++ b/rain_client/src/client/mod.rs @@ -1,6 +1,7 @@ pub mod client; pub mod session; pub mod tasks; +pub mod localcluster; #[macro_use] mod rpc; @@ -10,3 +11,4 @@ mod task; pub use self::client::Client; pub use self::session::Session; +pub use self::localcluster::LocalCluster; diff --git a/rain_client/src/client/rpc.rs b/rain_client/src/client/rpc.rs new file mode 100644 index 0000000..9cf15f8 --- /dev/null +++ b/rain_client/src/client/rpc.rs @@ -0,0 +1,46 @@ +use super::task::Task; +use client::dataobject::DataObject; +use rain_core::client_capnp; +use rain_core::utils::ToCapnp; +use serde_json; + +macro_rules! to_capnp_list { + ($builder:expr, $items:expr, $name:ident) => {{ + let mut builder = $builder.$name($items.len() as u32); + for (i, obj) in $items.iter().enumerate() { + obj.to_capnp(&mut builder.reborrow().get(i as u32)); + } + }}; +} +macro_rules! from_capnp_list { + ($builder:expr, $items:ident, $obj:ident) => {{ + $builder + .$items()? + .iter() + .map(|item| $obj::from_capnp(&item)) + .collect() + }}; +} + +impl<'a> ToCapnp<'a> for Task { + type Builder = client_capnp::task::Builder<'a>; + + fn to_capnp(&self, builder: &mut Self::Builder) { + builder.set_spec(&serde_json::to_string(&self.spec).unwrap()); + } +} +impl<'a> ToCapnp<'a> for DataObject { + type Builder = client_capnp::data_object::Builder<'a>; + + fn to_capnp(&self, builder: &mut Self::Builder) { + builder.set_spec(&serde_json::to_string(&self.spec).unwrap()); + builder.set_keep(self.keep.get()); + + if let &Some(ref data) = &self.data { + builder.set_data(&data); + builder.set_has_data(true); + } else { + builder.set_has_data(false); + } + } +} diff --git a/src/client/session.rs b/rain_client/src/client/session.rs similarity index 68% rename from src/client/session.rs rename to rain_client/src/client/session.rs index 4a16f0d..9097f07 100644 --- a/src/client/session.rs +++ b/rain_client/src/client/session.rs @@ -1,15 +1,13 @@ use super::communicator::Communicator; use client::dataobject::DataObject; use client::task::Task; +use rain_core::common_capnp; +use rain_core::types::{DataObjectId, DataType, ObjectSpec, Resources, SId, TaskId, TaskSpec, + TaskSpecInput, UserAttrs}; +use serde_json; +use std::cell::Cell; use std::collections::HashMap; use std::error::Error; -use client::task::TaskInput; -use common::Attributes; -use common::id::TaskId; -use common::id::DataObjectId; -use common::id::SId; -use common::DataType; -use std::cell::Cell; use std::rc::Rc; pub type DataObjectPtr = Rc; @@ -45,14 +43,19 @@ impl Session { } pub fn unkeep(&mut self, objects: &[DataObjectPtr]) -> Result<(), Box> { - self.comm - .unkeep(&objects.iter().map(|o| o.id).collect::>()) + self.comm.unkeep(&objects + .iter() + .map(|o| o.id()) + .collect::>()) } pub fn wait(&mut self, tasks: &[TaskPtr], objects: &[DataObjectPtr]) -> Result<(), Box> { self.comm.wait( - &tasks.iter().map(|t| t.id).collect::>(), - &objects.iter().map(|o| o.id).collect::>(), + &tasks.iter().map(|t| t.id()).collect::>(), + &objects + .iter() + .map(|o| o.id()) + .collect::>(), ) } pub fn wait_some( @@ -60,13 +63,16 @@ impl Session { tasks: &[TaskPtr], objects: &[DataObjectPtr], ) -> Result<(Vec, Vec), Box> { - let task_map: HashMap = tasks.iter().map(|t| (t.id, t)).collect(); + let task_map: HashMap = tasks.iter().map(|t| (t.id(), t)).collect(); let object_map: HashMap = - objects.iter().map(|o| (o.id, o)).collect(); + objects.iter().map(|o| (o.id(), o)).collect(); let (task_ids, object_ids) = self.comm.wait_some( - &tasks.iter().map(|t| t.id).collect::>(), - &objects.iter().map(|o| o.id).collect::>(), + &tasks.iter().map(|t| t.id()).collect::>(), + &objects + .iter() + .map(|o| o.id()) + .collect::>(), )?; Ok(( @@ -82,13 +88,13 @@ impl Session { } pub fn wait_all(&mut self) -> Result<(), Box> { self.comm.wait( - &vec![TaskId::new(self.id, ::common_capnp::ALL_TASKS_ID)], + &vec![TaskId::new(self.id, common_capnp::ALL_TASKS_ID)], &vec![], ) } - pub fn fetch(&mut self, object: &DataObject) -> Result, Box> { - self.comm.fetch(object.id) + pub fn fetch(&mut self, o: &DataObjectPtr) -> Result, Box> { + self.comm.fetch(&o.id()) } pub fn blob(&mut self, data: Vec) -> DataObjectPtr { @@ -102,13 +108,17 @@ impl Session { DataObjectId::new(self.id, id) } pub(crate) fn create_object(&mut self, label: String, data: Option>) -> DataObjectPtr { - let object = DataObject { + let spec = ObjectSpec { id: self.create_object_id(), - keep: Cell::new(false), label, - data, - attributes: Attributes::new(), data_type: DataType::Blob, + content_type: "".to_owned(), + user: UserAttrs::new(), + }; + let object = DataObject { + keep: Cell::new(false), + data, + spec, }; let rc = Rc::new(object); self.data_objects.push(rc.clone()); @@ -124,27 +134,24 @@ impl Session { } pub fn create_task( &mut self, - command: String, - inputs: Vec, + task_type: String, + inputs: Vec, outputs: Vec, config: HashMap, - cpus: i32, + cpus: u32, ) -> TaskPtr { - let mut attributes = Attributes::new(); - attributes.set("config", config).unwrap(); - - let mut resources: HashMap = HashMap::new(); - resources.insert("cpus".to_owned(), cpus); - attributes.set("resources", resources).unwrap(); - - let task = Task { + let spec = TaskSpec { id: self.create_task_id(), - command, inputs, - outputs, - attributes, + task_type, + outputs: outputs.iter().map(|o| o.id()).collect(), + config: Some(serde_json::to_value(&config).unwrap()), + resources: Resources { cpus }, + user: UserAttrs::new(), }; + let task = Task { spec, outputs }; + let rc = Rc::new(task); self.tasks.push(rc.clone()); diff --git a/rain_client/src/client/task.rs b/rain_client/src/client/task.rs new file mode 100644 index 0000000..e31cc3d --- /dev/null +++ b/rain_client/src/client/task.rs @@ -0,0 +1,18 @@ +use super::session::DataObjectPtr; +use rain_core::types::{TaskId, TaskSpec}; + +pub struct Task { + pub spec: TaskSpec, + pub outputs: Vec, +} + +impl Task { + pub fn output(&self) -> DataObjectPtr { + assert_eq!(self.outputs.len(), 1, "Task has multiple outputs"); + + self.outputs[0].clone() + } + pub fn id(&self) -> TaskId { + self.spec.id + } +} diff --git a/src/client/tasks.rs b/rain_client/src/client/tasks.rs similarity index 64% rename from src/client/tasks.rs rename to rain_client/src/client/tasks.rs index 1926857..1cfe687 100644 --- a/src/client/tasks.rs +++ b/rain_client/src/client/tasks.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; -use super::task::TaskInput; use super::session::{DataObjectPtr, Session, TaskPtr}; +use rain_core::types::TaskSpecInput; pub trait CommonTasks { fn concat(&mut self, objects: &[DataObjectPtr]) -> TaskPtr; @@ -9,19 +9,23 @@ pub trait CommonTasks { fn export(&mut self, object: DataObjectPtr, filename: String) -> TaskPtr; } +fn builtin(action: &str) -> String { + return format!("buildin/{}", action); +} + impl CommonTasks for Session { fn concat(&mut self, objects: &[DataObjectPtr]) -> TaskPtr { let inputs = objects .iter() - .map(|o| TaskInput { - label: None, - data_object: o.clone(), + .map(|o| TaskSpecInput { + label: "".to_owned(), + id: o.id(), }) .collect(); let outputs = vec![self.create_object("".to_owned(), None)]; - self.create_task("!concat".to_owned(), inputs, outputs, HashMap::new(), 1) + self.create_task(builtin("concat"), inputs, outputs, HashMap::new(), 1) } fn open(&mut self, filename: String) -> TaskPtr { let mut config = HashMap::new(); @@ -29,17 +33,17 @@ impl CommonTasks for Session { let outputs = vec![self.create_object("".to_owned(), None)]; - self.create_task("!open".to_owned(), vec![], outputs, config, 1) + self.create_task(builtin("open"), vec![], outputs, config, 1) } fn export(&mut self, object: DataObjectPtr, filename: String) -> TaskPtr { let mut config = HashMap::new(); config.insert("path".to_owned(), filename); - let input = TaskInput { - label: None, - data_object: object.clone(), + let input = TaskSpecInput { + label: "".to_owned(), + id: object.id(), }; - self.create_task("!export".to_owned(), vec![input], vec![], config, 1) + self.create_task(builtin("export"), vec![input], vec![], config, 1) } } diff --git a/rain_client/src/lib.rs b/rain_client/src/lib.rs new file mode 100644 index 0000000..cb390ec --- /dev/null +++ b/rain_client/src/lib.rs @@ -0,0 +1,12 @@ +extern crate capnp_rpc; +#[macro_use] +extern crate error_chain; +extern crate futures; +#[macro_use] +extern crate log; +extern crate rain_core; +extern crate serde_json; +extern crate tokio_core; + +pub mod client; +pub use client::*; diff --git a/rain_client_test/Cargo.toml b/rain_client_test/Cargo.toml new file mode 100644 index 0000000..57ebf91 --- /dev/null +++ b/rain_client_test/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "rain_client_test" +version = "0.3.0" +authors = ["Jakub Beranek "] +description = "A testing Rain client in Rust." +license = "MIT" +publish = false + +[dependencies] +rain_client = "0.3.0" diff --git a/rain_client_test/src/main.rs b/rain_client_test/src/main.rs new file mode 100644 index 0000000..d12eef0 --- /dev/null +++ b/rain_client_test/src/main.rs @@ -0,0 +1,27 @@ +extern crate rain_client; + +use std::error::Error; +use rain_client::tasks::CommonTasks; +use rain_client::client::LocalCluster; + +fn test() -> Result<(), Box> { + let cluster = LocalCluster::new("/home/kobzol/projects/it4i/rain/target/debug/rain")?; + + let client = cluster.create_client()?; + let mut s = client.new_session()?; + + let a = s.open("/tmp/asd.txt".to_owned()); + let b = s.open("/tmp/asd2.txt".to_owned()); + + let c = s.concat(&[a.output(), b.output()]); + c.output().keep(); + s.submit()?; + let res = s.fetch(&c.output())?; + println!("{}", String::from_utf8(res)?); + + Ok(()) +} + +fn main() { + test().unwrap(); +} diff --git a/rain_core/Cargo.toml b/rain_core/Cargo.toml index 01cdae3..db4daa9 100644 --- a/rain_core/Cargo.toml +++ b/rain_core/Cargo.toml @@ -38,6 +38,7 @@ serde_cbor = "0.8" serde_derive = "1.0" serde_json = "1.0" tokio-core="0.1" +tokio-io="0.1" tokio-timer = "0.2" [build-dependencies] diff --git a/rain_core/src/comm/executor.rs b/rain_core/src/comm/executor.rs index 6dbf044..f64f7ee 100644 --- a/rain_core/src/comm/executor.rs +++ b/rain_core/src/comm/executor.rs @@ -143,7 +143,7 @@ pub struct DropCachedMsg { #[cfg(test)] mod tests { use super::*; - use serde::{de::DeserializeOwned, Serialize}; + use serde::{Serialize, de::DeserializeOwned}; use serde_cbor; use serde_json; use std::fmt::Debug; diff --git a/rain_core/src/comm/mod.rs b/rain_core/src/comm/mod.rs index cac5179..c087a25 100644 --- a/rain_core/src/comm/mod.rs +++ b/rain_core/src/comm/mod.rs @@ -1,5 +1,7 @@ pub(crate) mod executor; +pub mod rpc; pub use self::executor::{CallMsg, DataLocation, DropCachedMsg, ExecutorToGovernorMessage, GovernorToExecutorMessage, LocalObjectIn, LocalObjectOut, RegisterMsg, ResultMsg}; +pub use self::rpc::new_rpc_system; diff --git a/rain_core/src/comm/rpc.rs b/rain_core/src/comm/rpc.rs new file mode 100644 index 0000000..fed2a2d --- /dev/null +++ b/rain_core/src/comm/rpc.rs @@ -0,0 +1,19 @@ +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use tokio_io::{AsyncRead, AsyncWrite}; + +pub fn new_rpc_system( + stream: Stream, + bootstrap: Option<::capnp::capability::Client>, +) -> RpcSystem +where + Stream: AsyncRead + AsyncWrite + 'static, +{ + let (reader, writer) = stream.split(); + let network = Box::new(twoparty::VatNetwork::new( + reader, + writer, + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + RpcSystem::new(network, bootstrap) +} diff --git a/rain_core/src/lib.rs b/rain_core/src/lib.rs index ff60b45..b212178 100644 --- a/rain_core/src/lib.rs +++ b/rain_core/src/lib.rs @@ -3,7 +3,7 @@ //! This documentation is minimalistic but still hopefully useful. //! As an user, you may be interested in the //! [rain_task lirary documentation](https://docs.rs/rain_task/). -//! +//! //! See `README.md` and the [project page](https://github.com/substantic/rain/) //! for general information. @@ -30,6 +30,7 @@ extern crate serde_cbor; extern crate serde_derive; extern crate serde_json; extern crate tokio_core; +extern crate tokio_io; extern crate tokio_timer; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/rain_server/src/common/connection.rs b/rain_server/src/common/connection.rs index ab7ac4f..0cf3f8c 100644 --- a/rain_server/src/common/connection.rs +++ b/rain_server/src/common/connection.rs @@ -1,8 +1,8 @@ use bytes::BytesMut; use futures::{Future, Stream}; -use tokio_io::codec::length_delimited::{Builder, Framed}; use tokio_io::AsyncRead; use tokio_io::AsyncWrite; +use tokio_io::codec::length_delimited::{Builder, Framed}; use rain_core::errors::{Error, Result}; diff --git a/rain_server/src/governor/graph/dataobj.rs b/rain_server/src/governor/graph/dataobj.rs index d6081b5..64d5325 100644 --- a/rain_server/src/governor/graph/dataobj.rs +++ b/rain_server/src/governor/graph/dataobj.rs @@ -5,9 +5,9 @@ use std::path::Path; use std::sync::Arc; use super::{Graph, TaskRef}; +use governor::WorkDir; use governor::data::Data; use governor::graph::ExecutorRef; -use governor::WorkDir; use wrapped::WrappedRcRefCell; #[derive(Debug)] diff --git a/rain_server/src/governor/rpc/control.rs b/rain_server/src/governor/rpc/control.rs index 90ee9eb..ba3ff12 100644 --- a/rain_server/src/governor/rpc/control.rs +++ b/rain_server/src/governor/rpc/control.rs @@ -3,8 +3,8 @@ use futures::future::Future; use rain_core::{errors::*, types::*, utils::*}; use std::sync::Arc; -use governor::graph::DataObjectState; use governor::StateRef; +use governor::graph::DataObjectState; use rain_core::governor_capnp::governor_control; pub struct GovernorControlImpl { diff --git a/rain_server/src/governor/rpc/executor.rs b/rain_server/src/governor/rpc/executor.rs index ed5f6c3..20fff0a 100644 --- a/rain_server/src/governor/rpc/executor.rs +++ b/rain_server/src/governor/rpc/executor.rs @@ -3,8 +3,8 @@ use rain_core::{comm::*, errors::*, types::*}; use std::path::Path; use std::sync::Arc; -use governor::data::{Data, Storage}; use governor::State; +use governor::data::{Data, Storage}; static PROTOCOL_VERSION: &'static str = "cbor-1"; diff --git a/rain_server/src/governor/rpc/fetch.rs b/rain_server/src/governor/rpc/fetch.rs index ca0af8a..ecc70b4 100644 --- a/rain_server/src/governor/rpc/fetch.rs +++ b/rain_server/src/governor/rpc/fetch.rs @@ -1,12 +1,12 @@ -use futures::future::Either; use futures::IntoFuture; +use futures::future::Either; use futures::{future, Future}; use rain_core::{errors::*, types::*, utils::*}; use std::rc::Rc; +use governor::StateRef; use governor::data::{Data, DataBuilder}; use governor::graph::DataObjectRef; -use governor::StateRef; pub struct FetchContext { pub state_ref: StateRef, diff --git a/rain_server/src/governor/state.rs b/rain_server/src/governor/state.rs index fe99231..998144a 100644 --- a/rain_server/src/governor/state.rs +++ b/rain_server/src/governor/state.rs @@ -12,14 +12,14 @@ use std::time::{Duration, Instant}; use common::Monitor; use common::{create_protocol_stream, new_rpc_system, Connection}; -use governor::data::transport::TransportView; use governor::data::Data; +use governor::data::transport::TransportView; use governor::fs::workdir::WorkDir; use governor::graph::executor::get_log_tails; use governor::graph::{executor_command, DataObject, DataObjectRef, DataObjectState, ExecutorRef, Graph, TaskRef, TaskState}; -use governor::rpc::executor::check_registration; use governor::rpc::GovernorControlImpl; +use governor::rpc::executor::check_registration; use governor::tasks::TaskInstance; use wrapped::WrappedRcRefCell; diff --git a/rain_server/src/main.rs b/rain_server/src/main.rs index 7f96d39..94ac0fb 100644 --- a/rain_server/src/main.rs +++ b/rain_server/src/main.rs @@ -12,6 +12,7 @@ extern crate chrono; #[macro_use] extern crate clap; extern crate env_logger; +#[macro_use] extern crate error_chain; extern crate fs_extra; extern crate futures; @@ -60,7 +61,6 @@ use std::process::exit; use rain_core::sys::{create_ready_file, get_hostname}; use rain_core::{errors::*, utils::*}; -use clap::{App, Arg, ArgMatches, SubCommand}; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); const DEFAULT_SERVER_PORT: u16 = 7210; @@ -141,7 +141,6 @@ fn run_server(_global_args: &ArgMatches, cmd_args: &ArgMatches) { } } -<<<<<<< 7999023457517e7570c2373ab414486b9fe0163c:rain_server/src/main.rs fn default_working_directory() -> PathBuf { let pid = getpid(); let hostname = get_hostname(); @@ -171,8 +170,6 @@ fn ensure_directory(dir: &Path, name: &str) -> Result<()> { Ok(()) } -======= ->>>>>>> [api] add local cluster:src/bin.rs // TODO: Do some serious configuration file and unify configurations // Right now, it is just a quick hack for supporting executors @@ -424,7 +421,7 @@ fn run_starter(_global_args: &ArgMatches, cmd_args: &ArgMatches) { } fn stop_server(_global_args: &ArgMatches, cmd_args: &ArgMatches) { - let default_address = format!("localhost:{}", DEFAULT_SERVER_PORT); + /*let default_address = format!("localhost:{}", DEFAULT_SERVER_PORT); let mut address = cmd_args .value_of("SERVER_ADDRESS") .unwrap_or(&default_address) @@ -444,7 +441,7 @@ fn stop_server(_global_args: &ArgMatches, cmd_args: &ArgMatches) { exit(1); }); - println!("Server at {} was successfully stopped", address); + println!("Server at {} was successfully stopped", address);*/ } fn init_log() { @@ -611,7 +608,7 @@ fn main() { .takes_value(true))) .subcommand( // ---- STOP ---- SubCommand::with_name("stop") - .about("Stop server and all workers connected to it") + .about("Stop server and all governors connected to it") .arg(Arg::with_name("SERVER_ADDRESS") .help("Address of the server (default = localhost:7210)") .takes_value(true))) diff --git a/rain_server/src/server/http.rs b/rain_server/src/server/http.rs index 5b63579..682f545 100644 --- a/rain_server/src/server/http.rs +++ b/rain_server/src/server/http.rs @@ -145,9 +145,7 @@ impl Service for RequestHandler { &include_bytes!("./../../dashboard/dist/main.css.gz")[..], ) } - _ => static_data_response( - &include_bytes!("./../../dashboard/dist/index.html")[..], - ), + _ => static_data_response(&include_bytes!("./../../dashboard/dist/index.html")[..]), /*path => { warn!("Invalid HTTP request: {}", path); Response::new().with_status(StatusCode::NotFound) diff --git a/rain_server/src/server/logging/sqlite_logger.rs b/rain_server/src/server/logging/sqlite_logger.rs index 2b9cc10..17590bf 100644 --- a/rain_server/src/server/logging/sqlite_logger.rs +++ b/rain_server/src/server/logging/sqlite_logger.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; -use futures::sync::{mpsc, oneshot}; use futures::Future; use futures::Stream; +use futures::sync::{mpsc, oneshot}; use rusqlite::Connection; use serde_json; use std::path::PathBuf; diff --git a/rain_server/src/server/rpc/client.rs b/rain_server/src/server/rpc/client.rs index 4f34f10..cb81f41 100644 --- a/rain_server/src/server/rpc/client.rs +++ b/rain_server/src/server/rpc/client.rs @@ -7,6 +7,7 @@ use std::net::SocketAddr; use server::graph::{ClientRef, TaskRef}; use server::graph::{DataObjectRef, DataObjectState}; use server::state::StateRef; +use std::process::exit; pub struct ClientServiceImpl { state: StateRef, diff --git a/rain_server/src/start/common.rs b/rain_server/src/start/common.rs index c96f6c3..10289e0 100644 --- a/rain_server/src/start/common.rs +++ b/rain_server/src/start/common.rs @@ -1,9 +1,4 @@ use std::path::PathBuf; -use nix::unistd::getpid; -use std::path::Path; -use errors::Result; -use std::fs::create_dir_all; -use std::error::Error; pub enum Readiness { /// Ready file is a file that @@ -11,31 +6,3 @@ pub enum Readiness { WaitingForReadyFile(PathBuf), IsReady, } - -pub fn default_working_directory() -> PathBuf { - let pid = getpid(); - let hostname = ::common::sys::get_hostname(); - PathBuf::from("/tmp/rain-work").join(format!("worker-{}-{}", hostname, pid)) -} - -pub fn default_logging_directory(basename: &str) -> PathBuf { - let pid = getpid(); - let hostname = ::common::sys::get_hostname(); - PathBuf::from("/tmp/rain-logs").join(format!("{}-{}-{}", basename, hostname, pid)) -} -pub fn ensure_directory(dir: &Path, name: &str) -> Result<()> { - if !dir.exists() { - debug!("{} not found, creating ... {:?}", name, dir); - if let Err(e) = create_dir_all(dir) { - bail!(format!( - "{} {:?} cannot by created: {}", - name, - dir, - e.description() - )); - } - } else if !dir.is_dir() { - bail!("{} {:?} exists but it is not a directory", name, dir); - } - Ok(()) -} diff --git a/rain_server/src/start/mod.rs b/rain_server/src/start/mod.rs index 0b7d57e..0519e43 100644 --- a/rain_server/src/start/mod.rs +++ b/rain_server/src/start/mod.rs @@ -1,5 +1,4 @@ pub mod common; -pub mod localcluster; pub mod process; pub mod ssh; pub mod starter; diff --git a/rain_server/src/start/starter.rs b/rain_server/src/start/starter.rs index 3024f72..75fe49a 100644 --- a/rain_server/src/start/starter.rs +++ b/rain_server/src/start/starter.rs @@ -11,12 +11,6 @@ use std::process::Command; use start::common::Readiness; use start::process::Process; use start::ssh::RemoteProcess; -use errors::Result; - -use nix::unistd::getpid; -use std::io::BufReader; -use std::io::BufRead; -use std::fs::File; pub struct StarterConfig { /// Number of local governor that will be spawned diff --git a/rain_task/src/macros.rs b/rain_task/src/macros.rs index d75ece6..dd5b261 100644 --- a/rain_task/src/macros.rs +++ b/rain_task/src/macros.rs @@ -88,7 +88,8 @@ macro_rules! register_task_make_call { #[macro_export] macro_rules! register_task { ($executor: expr, $name: expr, [$($params: tt)*], $taskfn: expr) => ({ - $executor.register_task($name, |ctx: &mut Context, ins: &[DataInstance], outs: &mut [Output]| -> TaskResult<()> { + $executor.register_task($name, |ctx: &mut Context, ins: &[DataInstance], + outs: &mut [Output]| -> TaskResult<()> { register_task_make_call!($taskfn, ins, outs, ($($params)*), (ctx)) }) }); diff --git a/rain_task/src/tests.rs b/rain_task/src/tests.rs index dc1462a..d0d175c 100644 --- a/rain_task/src/tests.rs +++ b/rain_task/src/tests.rs @@ -202,7 +202,8 @@ fn register_task() { register_task!(s, "task4", [I I O O], |_ctx, _in1, _in2, _out1, _out2| Ok(())); register_task!(s, "task5", [Is Os], task1); register_task!(s, "task6", [I O Is Os], - |_ctx, _i1: &DataInstance, _o1: &mut Output, _is: &[DataInstance], _os: &mut [Output]| Ok(())); + |_ctx, _i1: &DataInstance, _o1: &mut Output, _is: &[DataInstance], + _os: &mut [Output]| Ok(())); } fn task_cat(_ctx: &mut Context, inputs: &[DataInstance], outputs: &mut [Output]) -> TaskResult<()> { @@ -264,25 +265,27 @@ fn run_cat_task() { fn run_long_cat() { let (mut s, handle) = setup( "run_long_cat", - vec![call_msg( - 1, - "run_long_cat/cat", - vec![ - data_spec( - 1, - "in1", - Some(DataLocation::Memory( - [0u8; MEM_BACKED_LIMIT - 1].as_ref().into(), - )), - ), - data_spec( - 2, - "in2", - Some(DataLocation::Memory([0u8; 2].as_ref().into())), - ), - ], - vec![data_spec(6, "out", None)], - )], + vec![ + call_msg( + 1, + "run_long_cat/cat", + vec![ + data_spec( + 1, + "in1", + Some(DataLocation::Memory( + [0u8; MEM_BACKED_LIMIT - 1].as_ref().into(), + )), + ), + data_spec( + 2, + "in2", + Some(DataLocation::Memory([0u8; 2].as_ref().into())), + ), + ], + vec![data_spec(6, "out", None)], + ), + ], ); s.register_task("cat", task_cat); s.run(); @@ -301,12 +304,14 @@ fn run_long_cat() { fn run_empty_cat() { let (mut s, handle) = setup( "run_empty_cat", - vec![call_msg( - 1, - "run_empty_cat/cat", - vec![], - vec![data_spec(3, "out", None)], - )], + vec![ + call_msg( + 1, + "run_empty_cat/cat", + vec![], + vec![data_spec(3, "out", None)], + ), + ], ); s.register_task("cat", task_cat); s.run(); @@ -322,16 +327,16 @@ fn run_empty_cat() { fn run_pass_cat() { let (mut s, handle) = setup( "run_pass_cat", - vec![call_msg( - 2, - "run_pass_cat/cat", - vec![data_spec( - 1, - "in", - Some(DataLocation::Memory("drip".into())), - )], - vec![data_spec(2, "out", None)], - )], + vec![ + call_msg( + 2, + "run_pass_cat/cat", + vec![ + data_spec(1, "in", Some(DataLocation::Memory("drip".into()))), + ], + vec![data_spec(2, "out", None)], + ), + ], ); s.register_task("cat", task_cat); s.run(); @@ -347,12 +352,14 @@ fn run_pass_cat() { fn test_make_file_backed() { let (mut s, handle) = setup( "test_make_file_backed", - vec![call_msg( - 2, - "test_make_file_backed/mfb", - vec![], - vec![data_spec(3, "out", None)], - )], + vec![ + call_msg( + 2, + "test_make_file_backed/mfb", + vec![], + vec![data_spec(3, "out", None)], + ), + ], ); s.register_task("mfb", |_ctx, _ins, outs| { write!(outs[0], "Rainfall")?; @@ -373,16 +380,16 @@ fn test_make_file_backed() { fn test_get_path_writing() { let (mut s, handle) = setup( "test_get_path_writing", - vec![call_msg( - 2, - "test_get_path_writing/gp", - vec![data_spec( - 1, - "in", - Some(DataLocation::Memory("drizzle".into())), - )], - vec![], - )], + vec![ + call_msg( + 2, + "test_get_path_writing/gp", + vec![ + data_spec(1, "in", Some(DataLocation::Memory("drizzle".into()))), + ], + vec![], + ), + ], ); s.register_task("gp", |_ctx, ins, _outs| { let p = ins[0].get_path(); @@ -404,12 +411,14 @@ fn test_get_path_writing() { fn run_stage_file() { let (mut s, handle) = setup( "run_stage_file", - vec![call_msg( - 2, - "run_stage_file/stage", - vec![], - vec![data_spec(2, "out", None)], - )], + vec![ + call_msg( + 2, + "run_stage_file/stage", + vec![], + vec![data_spec(2, "out", None)], + ), + ], ); s.register_task("stage", |_ctx, _inp, outp| { let mut f = fs::File::create("testfile.txt").unwrap(); @@ -494,11 +503,13 @@ fn read_set_content_type() { let mut call = call_msg( 2, "read_set_content_type/foo", - vec![data_spec( - 2, - "out", - Some(DataLocation::Memory((b"content!" as &[u8]).into())), - )], + vec![ + data_spec( + 2, + "out", + Some(DataLocation::Memory((b"content!" as &[u8]).into())), + ), + ], vec![data_spec(3, "out", None)], ); call.inputs[0].info = Some(ObjectInfo { diff --git a/src/client/rpc.rs b/src/client/rpc.rs deleted file mode 100644 index 082fc12..0000000 --- a/src/client/rpc.rs +++ /dev/null @@ -1,82 +0,0 @@ -use super::task::Task; -use client::dataobject::DataObject; -use client::task::TaskInput; -use common::convert::ToCapnp; -use common::id::DataObjectId; - -macro_rules! to_capnp_list { - ($builder:expr, $items:expr, $name:ident) => { - { - let mut builder = $builder.$name($items.len() as u32); - for (i, obj) in $items.iter().enumerate() { - obj.to_capnp(&mut builder.reborrow().get(i as u32)); - } - } - } -} -macro_rules! from_capnp_list { - ($builder:expr, $items:ident, $obj:ident) => { - { - $builder - .$items()? - .iter() - .map(|item| $obj::from_capnp(&item)) - .collect() - } - } -} - -impl<'a> ToCapnp<'a> for TaskInput { - type Builder = ::client_capnp::task::in_data_object::Builder<'a>; - - fn to_capnp(&self, builder: &mut Self::Builder) { - self.data_object - .id - .to_capnp(&mut builder.reborrow().get_id().unwrap()); - - if let &Some(ref label) = &self.label { - builder.reborrow().set_label(label); - } - } -} - -impl<'a> ToCapnp<'a> for Task { - type Builder = ::client_capnp::task::Builder<'a>; - - fn to_capnp(&self, builder: &mut Self::Builder) { - self.id.to_capnp(&mut builder.reborrow().get_id().unwrap()); - builder.set_task_type(&self.command); - - to_capnp_list!(builder.reborrow(), self.inputs, init_inputs); - to_capnp_list!( - builder.reborrow(), - self.outputs - .iter() - .map(|o| o.id) - .collect::>(), - init_outputs - ); - self.attributes - .to_capnp(&mut builder.reborrow().get_attributes().unwrap()); - } -} -impl<'a> ToCapnp<'a> for DataObject { - type Builder = ::client_capnp::data_object::Builder<'a>; - - fn to_capnp(&self, builder: &mut Self::Builder) { - self.id.to_capnp(&mut builder.reborrow().get_id().unwrap()); - builder.set_keep(self.keep.get()); - builder.set_label(&self.label); - builder.set_data_type(self.data_type.to_capnp()); - - if let &Some(ref data) = &self.data { - builder.set_data(&data); - builder.set_has_data(true); - } else { - builder.set_has_data(false); - } - - self.attributes - .to_capnp(&mut builder.reborrow().get_attributes().unwrap()); - } -} diff --git a/src/client/task.rs b/src/client/task.rs deleted file mode 100644 index a3342a0..0000000 --- a/src/client/task.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::error::Error; -use common::Attributes; -use common::id::TaskId; -use super::session::DataObjectPtr; - -pub struct TaskInput { - pub label: Option, - pub data_object: DataObjectPtr, -} - -pub struct Task { - pub id: TaskId, - pub command: String, - pub inputs: Vec, - pub outputs: Vec, - pub attributes: Attributes, -} - -impl Task { - pub fn output(&self) -> Result> { - if self.outputs.len() == 1 { - return Ok(self.outputs[0].clone()); - } - - bail!("There is not a single output") - } -} diff --git a/src/start/localcluster.rs b/src/start/localcluster.rs deleted file mode 100644 index 9fc0751..0000000 --- a/src/start/localcluster.rs +++ /dev/null @@ -1,55 +0,0 @@ -use start::starter::{Starter, StarterConfig}; -use std::net::SocketAddr; -use std::error::Error; -use client::client::Client; -use start::common::{default_logging_directory, ensure_directory}; - -pub struct LocalCluster { - listen_addr: SocketAddr, - starter: Starter, -} - -impl LocalCluster { - pub fn new( - worker_cpus: Vec>, - listen_port: u16, - http_port: u16, - ) -> Result> { - let listen_addr = SocketAddr::new("127.0.0.1".parse()?, listen_port); - let http_addr = SocketAddr::new("127.0.0.1".parse()?, http_port); - - let log_dir = default_logging_directory("rain"); - ensure_directory(&log_dir, "logging directory")?; - - let config = StarterConfig::new( - worker_cpus, - listen_addr, - http_addr, - &log_dir, - "".to_owned(), - false, - vec![], - ); - - let mut cluster = LocalCluster { - listen_addr, - starter: Starter::new(config), - }; - cluster.starter.start()?; - - Ok(cluster) - } - pub fn new_simple(listen_port: u16, http_port: u16) -> Result> { - Self::new(vec![None], listen_port, http_port) - } - - pub fn create_client(&self) -> Result> { - Client::new(self.listen_addr) - } -} - -impl Drop for LocalCluster { - fn drop(&mut self) { - self.starter.kill_all(); - } -}