diff --git a/Cargo.lock b/Cargo.lock index 1c1fd29..3ccb132 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,6 +555,20 @@ dependencies = [ "proc-macro2 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rain_client" +version = "0.3.0" +dependencies = [ + "capnp-rpc 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rain_core 0.3.0", + "serde 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rain_core" version = "0.3.0" @@ -577,6 +591,7 @@ dependencies = [ "serde_derive 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index 7683228..516b1a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ version = "0.3.0" [workspace] members = [ + "rain_client", "rain_core", "rain_server", "rain_task", @@ -10,5 +11,6 @@ members = [ ] [patch.crates-io] +rain_client = { path = "rain_client" } rain_core = { path = "rain_core" } rain_task = { path = "rain_task" } diff --git a/rain_client/Cargo.toml b/rain_client/Cargo.toml new file mode 100644 index 0000000..cc49275 --- /dev/null +++ b/rain_client/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "rain_client" +version = "0.3.0" + +description = "Distributed computational framework for large-scale task-based pipelines. Client library in Rust." +# documentation = "https://docs.rs/rain_task/" # default docs.rs +homepage = "https://github.com/substantic/rain" +repository = "https://github.com/substantic/rain/" +readme = "README.md" +authors = [ + "Stanislav Bohm ", + "Tomas Gavenciak ", + "Vojtech Cima ", + ] +license = "MIT" + +exclude = ["testing/**/*"] + +[badges] +travis-ci = { repository = "substantic/rain", branch = "master" } +maintenance = { status = "actively-developed" } + +[dependencies] +capnp-rpc = "0.8" +error-chain = "0.11" +futures = "0.1" +log = "0.4" +rain_core = "0.3.0" +tokio-core = "0.1" +serde = "1" +serde_json = "1" diff --git a/rain_client/src/client/client.rs b/rain_client/src/client/client.rs new file mode 100644 index 0000000..7f86e86 --- /dev/null +++ b/rain_client/src/client/client.rs @@ -0,0 +1,73 @@ +use std::error::Error; +use std::net::SocketAddr; + +use super::communicator::Communicator; +use super::session::Session; +use rain_core::CLIENT_PROTOCOL_VERSION; +use std::rc::Rc; + +pub struct Client { + comm: Rc, +} + +impl Client { + pub fn new(scheduler: SocketAddr) -> Result> { + let comm = Rc::new(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?); + + Ok(Client { comm }) + } + + pub fn new_session(&self) -> Result> { + let session_id = self.comm.new_session()?; + Ok(Session::new(session_id, self.comm.clone())) + } + + pub fn terminate_server(&self) -> Result<(), Box> { + self.comm.terminate_server() + } +} + +#[cfg(test)] +mod tests { + use super::super::localcluster::LocalCluster; + use super::super::tasks::CommonTasks; + use super::Client; + use super::Session; + use std::env; + + #[allow(dead_code)] + struct TestContext { + cluster: LocalCluster, + client: Client, + session: Session, + } + + fn ctx() -> TestContext { + let rain = env::var("RAIN_BINARY").unwrap(); + + let cluster = LocalCluster::new(&rain).unwrap(); + let client = cluster.create_client().unwrap(); + let session = client.new_session().unwrap(); + + TestContext { + cluster, + client, + session, + } + } + + #[test] + fn concat() { + let mut ctx = ctx(); + let a = ctx.session.blob(vec![1, 2, 3]); + let b = ctx.session.blob(vec![4, 5, 6]); + let c = ctx.session.concat(&[a, b]); + c.output().keep(); + ctx.session.submit().unwrap(); + ctx.session.wait(&[c.clone()], &[]).unwrap(); + assert_eq!( + ctx.session.fetch(&c.output()).unwrap(), + vec![1, 2, 3, 4, 5, 6] + ); + } +} diff --git a/rain_client/src/client/communicator.rs b/rain_client/src/client/communicator.rs new file mode 100644 index 0000000..ba3ba95 --- /dev/null +++ b/rain_client/src/client/communicator.rs @@ -0,0 +1,154 @@ +use capnp_rpc::rpc_twoparty_capnp; +use futures::Future; +use rain_core::comm::new_rpc_system; +use std::error::Error; +use std::net::SocketAddr; +use tokio_core::net::TcpStream; +use tokio_core::reactor::Core; + +use super::task::Task; +use client::dataobject::DataObject; +use rain_core::types::{DataObjectId, TaskId}; +use rain_core::utils::{FromCapnp, ToCapnp}; +use rain_core::{client_capnp, common_capnp, server_capnp}; +use std::cell::{RefCell, RefMut}; + +pub struct Communicator { + core: RefCell, + service: client_capnp::client_service::Client, +} + +impl Communicator { + pub fn new(scheduler: SocketAddr, version: i32) -> Result> { + let mut core = Core::new()?; + let handle = core.handle(); + let stream = core.run(TcpStream::connect(&scheduler, &handle))?; + stream.set_nodelay(true)?; + + debug!("Connection to server {} established", scheduler); + + let mut rpc = Box::new(new_rpc_system(stream, None)); + let bootstrap: server_capnp::server_bootstrap::Client = + rpc.bootstrap(rpc_twoparty_capnp::Side::Server); + handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err))); + + let mut request = bootstrap.register_as_client_request(); + request.get().set_version(version); + + let service = core.run(request.send().promise)?.get()?.get_service()?; + + Ok(Self { + core: RefCell::new(core), + service, + }) + } + + pub fn new_session(&self) -> Result> { + let id: i32 = self.comm() + .run(self.service.new_session_request().send().promise)? + .get()? + .get_session_id(); + + Ok(id) + } + pub fn close_session(&self, id: i32) -> Result<(), Box> { + self.comm().run({ + let mut req = self.service.close_session_request(); + req.get().set_session_id(id); + req.send().promise + })?; + + Ok(()) + } + + pub fn submit(&self, tasks: &[T], data_objects: &[D]) -> Result<(), Box> + where + T: AsRef, + D: AsRef, + { + let mut req = self.service.submit_request(); + + to_capnp_list!( + req.get(), + tasks.iter().map(|t| t.as_ref()).collect::>(), + init_tasks + ); + to_capnp_list!( + req.get(), + data_objects + .iter() + .map(|t| t.as_ref()) + .collect::>(), + init_objects + ); + self.comm().run(req.send().promise)?; + + Ok(()) + } + + pub fn unkeep(&self, objects: &[DataObjectId]) -> Result<(), Box> { + let mut req = self.service.unkeep_request(); + to_capnp_list!(req.get(), objects, init_object_ids); + self.comm().run(req.send().promise)?; + Ok(()) + } + + pub fn wait(&self, tasks: &[TaskId], objects: &[DataObjectId]) -> Result<(), Box> { + let mut req = self.service.wait_request(); + to_capnp_list!(req.get(), tasks, init_task_ids); + to_capnp_list!(req.get(), objects, init_object_ids); + self.comm().run(req.send().promise)?; + Ok(()) + } + pub fn wait_some( + &self, + tasks: &[TaskId], + objects: &[DataObjectId], + ) -> Result<(Vec, Vec), Box> { + let mut req = self.service.wait_some_request(); + to_capnp_list!(req.get(), tasks, init_task_ids); + to_capnp_list!(req.get(), objects, init_object_ids); + let res = self.comm().run(req.send().promise)?; + + Ok(( + from_capnp_list!(res.get()?, get_finished_tasks, TaskId), + from_capnp_list!(res.get()?, get_finished_objects, DataObjectId), + )) + } + + pub fn fetch(&self, object_id: &DataObjectId) -> Result, Box> { + let mut req = self.service.fetch_request(); + object_id.to_capnp(&mut req.get().get_id().unwrap()); + req.get().set_size(1024); + + let response = self.comm().run(req.send().promise)?; + + // TODO: handle error states + let reader = response.get()?; + match reader.get_status().which()? { + common_capnp::fetch_result::status::Ok(()) => { + let data = reader.get_data()?; + Ok(Vec::from(data)) + } + common_capnp::fetch_result::status::Removed(()) => { + print!("Removed"); + Ok(vec![]) + } + common_capnp::fetch_result::status::Error(err) => { + print!("Error: {:?}", err.unwrap().get_message()); + Ok(vec![]) + } + _ => bail!("Non-ok status"), + } + } + + pub fn terminate_server(&self) -> Result<(), Box> { + self.comm() + .run(self.service.terminate_server_request().send().promise)?; + Ok(()) + } + + fn comm(&self) -> RefMut { + self.core.borrow_mut() + } +} diff --git a/rain_client/src/client/dataobject.rs b/rain_client/src/client/dataobject.rs new file mode 100644 index 0000000..de19c64 --- /dev/null +++ b/rain_client/src/client/dataobject.rs @@ -0,0 +1,17 @@ +use rain_core::types::{DataObjectId, ObjectSpec}; +use std::cell::Cell; + +pub struct DataObject { + pub keep: Cell, + pub data: Option>, + pub spec: ObjectSpec, +} + +impl DataObject { + pub fn keep(&self) { + self.keep.set(true); + } + pub fn id(&self) -> DataObjectId { + self.spec.id + } +} diff --git a/rain_client/src/client/localcluster.rs b/rain_client/src/client/localcluster.rs new file mode 100644 index 0000000..45e1b11 --- /dev/null +++ b/rain_client/src/client/localcluster.rs @@ -0,0 +1,48 @@ +use super::client::Client; +use std::error::Error; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::process::{Command, Stdio}; + +pub struct LocalCluster { + listen_addr: SocketAddr, + binary: PathBuf, +} + +impl LocalCluster { + pub fn new(binary: &str) -> Result> { + let mut cluster = LocalCluster { + binary: PathBuf::from(binary), + listen_addr: SocketAddr::new("127.0.0.1".parse()?, 7210), + }; + cluster.start()?; + + Ok(cluster) + } + + fn start(&mut self) -> Result<(), Box> { + Command::new(&self.binary) + .arg("start") + .arg("--listen") + .arg(self.listen_addr.to_string()) + .arg("--simple") + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn()? + .wait()?; + + Ok(()) + } + + pub fn create_client(&self) -> Result> { + Client::new(self.listen_addr) + } +} + +impl Drop for LocalCluster { + #![allow(unused_must_use)] + fn drop(&mut self) { + Client::new(self.listen_addr).unwrap().terminate_server(); + } +} diff --git a/rain_client/src/client/mod.rs b/rain_client/src/client/mod.rs new file mode 100644 index 0000000..1dc689a --- /dev/null +++ b/rain_client/src/client/mod.rs @@ -0,0 +1,14 @@ +pub mod client; +pub mod localcluster; +pub mod session; +pub mod tasks; + +#[macro_use] +mod rpc; +mod communicator; +mod dataobject; +mod task; + +pub use self::client::Client; +pub use self::localcluster::LocalCluster; +pub use self::session::Session; diff --git a/rain_client/src/client/rpc.rs b/rain_client/src/client/rpc.rs new file mode 100644 index 0000000..9cf15f8 --- /dev/null +++ b/rain_client/src/client/rpc.rs @@ -0,0 +1,46 @@ +use super::task::Task; +use client::dataobject::DataObject; +use rain_core::client_capnp; +use rain_core::utils::ToCapnp; +use serde_json; + +macro_rules! to_capnp_list { + ($builder:expr, $items:expr, $name:ident) => {{ + let mut builder = $builder.$name($items.len() as u32); + for (i, obj) in $items.iter().enumerate() { + obj.to_capnp(&mut builder.reborrow().get(i as u32)); + } + }}; +} +macro_rules! from_capnp_list { + ($builder:expr, $items:ident, $obj:ident) => {{ + $builder + .$items()? + .iter() + .map(|item| $obj::from_capnp(&item)) + .collect() + }}; +} + +impl<'a> ToCapnp<'a> for Task { + type Builder = client_capnp::task::Builder<'a>; + + fn to_capnp(&self, builder: &mut Self::Builder) { + builder.set_spec(&serde_json::to_string(&self.spec).unwrap()); + } +} +impl<'a> ToCapnp<'a> for DataObject { + type Builder = client_capnp::data_object::Builder<'a>; + + fn to_capnp(&self, builder: &mut Self::Builder) { + builder.set_spec(&serde_json::to_string(&self.spec).unwrap()); + builder.set_keep(self.keep.get()); + + if let &Some(ref data) = &self.data { + builder.set_data(&data); + builder.set_has_data(true); + } else { + builder.set_has_data(false); + } + } +} diff --git a/rain_client/src/client/session.rs b/rain_client/src/client/session.rs new file mode 100644 index 0000000..aae30f1 --- /dev/null +++ b/rain_client/src/client/session.rs @@ -0,0 +1,168 @@ +use super::communicator::Communicator; +use client::dataobject::DataObject; +use client::task::Task; +use rain_core::common_capnp; +use rain_core::types::{DataObjectId, DataType, ObjectSpec, Resources, SId, TaskId, TaskSpec, + TaskSpecInput, UserAttrs}; +use serde_json; +use std::cell::Cell; +use std::collections::HashMap; +use std::error::Error; +use std::rc::Rc; + +pub type DataObjectPtr = Rc; +pub type TaskPtr = Rc; + +pub struct Session { + pub id: i32, + comm: Rc, + tasks: Vec, + data_objects: Vec, + id_counter: i32, +} + +impl Session { + pub fn new(id: i32, comm: Rc) -> Self { + debug!("Session {} created", id); + + Session { + id, + comm, + tasks: vec![], + data_objects: vec![], + id_counter: 0, + } + } + + pub fn submit(&mut self) -> Result<(), Box> { + self.comm.submit(&self.tasks, &self.data_objects)?; + self.tasks.clear(); + self.data_objects.clear(); + + Ok(()) + } + + pub fn unkeep(&mut self, objects: &[DataObjectPtr]) -> Result<(), Box> { + self.comm.unkeep(&objects + .iter() + .map(|o| o.id()) + .collect::>()) + } + + pub fn wait(&mut self, tasks: &[TaskPtr], objects: &[DataObjectPtr]) -> Result<(), Box> { + self.comm.wait( + &tasks.iter().map(|t| t.id()).collect::>(), + &objects + .iter() + .map(|o| o.id()) + .collect::>(), + ) + } + pub fn wait_some( + &mut self, + tasks: &[TaskPtr], + objects: &[DataObjectPtr], + ) -> Result<(Vec, Vec), Box> { + let task_map: HashMap = tasks.iter().map(|t| (t.id(), t)).collect(); + let object_map: HashMap = + objects.iter().map(|o| (o.id(), o)).collect(); + + let (task_ids, object_ids) = self.comm.wait_some( + &tasks.iter().map(|t| t.id()).collect::>(), + &objects + .iter() + .map(|o| o.id()) + .collect::>(), + )?; + + Ok(( + task_ids + .iter() + .filter_map(|id| task_map.get(id).map(|t| (*t).clone())) + .collect(), + object_ids + .iter() + .filter_map(|id| object_map.get(id).map(|o| (*o).clone())) + .collect(), + )) + } + pub fn wait_all(&mut self) -> Result<(), Box> { + self.comm.wait( + &vec![TaskId::new(self.id, common_capnp::ALL_TASKS_ID)], + &vec![], + ) + } + + pub fn fetch(&mut self, o: &DataObjectPtr) -> Result, Box> { + self.comm.fetch(&o.id()) + } + + pub fn blob(&mut self, data: Vec) -> DataObjectPtr { + self.create_object("".to_owned(), Some(data)) + } + + pub(crate) fn create_object_id(&mut self) -> DataObjectId { + let id = self.id_counter; + self.id_counter += 1; + + DataObjectId::new(self.id, id) + } + pub(crate) fn create_object(&mut self, label: String, data: Option>) -> DataObjectPtr { + let spec = ObjectSpec { + id: self.create_object_id(), + label, + data_type: DataType::Blob, + content_type: "".to_owned(), + user: UserAttrs::new(), + }; + let object = DataObject { + keep: Cell::new(false), + data, + spec, + }; + let rc = Rc::new(object); + self.data_objects.push(rc.clone()); + + rc + } + + pub(crate) fn create_task_id(&mut self) -> TaskId { + let id = self.id_counter; + self.id_counter += 1; + + TaskId::new(self.id, id) + } + pub fn create_task( + &mut self, + task_type: String, + inputs: Vec, + outputs: Vec, + config: HashMap, + cpus: u32, + ) -> TaskPtr { + let spec = TaskSpec { + id: self.create_task_id(), + inputs, + task_type, + outputs: outputs.iter().map(|o| o.id()).collect(), + config: Some(serde_json::to_value(&config).unwrap()), + resources: Resources { cpus }, + user: UserAttrs::new(), + }; + + let task = Task { spec, outputs }; + + let rc = Rc::new(task); + self.tasks.push(rc.clone()); + + rc + } +} + +impl Drop for Session { + #![allow(unused_must_use)] + fn drop(&mut self) { + debug!("Session {} destroyed", self.id); + self.comm.close_session(self.id); + } +} diff --git a/rain_client/src/client/task.rs b/rain_client/src/client/task.rs new file mode 100644 index 0000000..e31cc3d --- /dev/null +++ b/rain_client/src/client/task.rs @@ -0,0 +1,18 @@ +use super::session::DataObjectPtr; +use rain_core::types::{TaskId, TaskSpec}; + +pub struct Task { + pub spec: TaskSpec, + pub outputs: Vec, +} + +impl Task { + pub fn output(&self) -> DataObjectPtr { + assert_eq!(self.outputs.len(), 1, "Task has multiple outputs"); + + self.outputs[0].clone() + } + pub fn id(&self) -> TaskId { + self.spec.id + } +} diff --git a/rain_client/src/client/tasks.rs b/rain_client/src/client/tasks.rs new file mode 100644 index 0000000..1cfe687 --- /dev/null +++ b/rain_client/src/client/tasks.rs @@ -0,0 +1,49 @@ +use std::collections::HashMap; + +use super::session::{DataObjectPtr, Session, TaskPtr}; +use rain_core::types::TaskSpecInput; + +pub trait CommonTasks { + fn concat(&mut self, objects: &[DataObjectPtr]) -> TaskPtr; + fn open(&mut self, filename: String) -> TaskPtr; + fn export(&mut self, object: DataObjectPtr, filename: String) -> TaskPtr; +} + +fn builtin(action: &str) -> String { + return format!("buildin/{}", action); +} + +impl CommonTasks for Session { + fn concat(&mut self, objects: &[DataObjectPtr]) -> TaskPtr { + let inputs = objects + .iter() + .map(|o| TaskSpecInput { + label: "".to_owned(), + id: o.id(), + }) + .collect(); + + let outputs = vec![self.create_object("".to_owned(), None)]; + + self.create_task(builtin("concat"), inputs, outputs, HashMap::new(), 1) + } + fn open(&mut self, filename: String) -> TaskPtr { + let mut config = HashMap::new(); + config.insert("path".to_owned(), filename); + + let outputs = vec![self.create_object("".to_owned(), None)]; + + self.create_task(builtin("open"), vec![], outputs, config, 1) + } + fn export(&mut self, object: DataObjectPtr, filename: String) -> TaskPtr { + let mut config = HashMap::new(); + config.insert("path".to_owned(), filename); + + let input = TaskSpecInput { + label: "".to_owned(), + id: object.id(), + }; + + self.create_task(builtin("export"), vec![input], vec![], config, 1) + } +} diff --git a/rain_client/src/lib.rs b/rain_client/src/lib.rs new file mode 100644 index 0000000..cb390ec --- /dev/null +++ b/rain_client/src/lib.rs @@ -0,0 +1,12 @@ +extern crate capnp_rpc; +#[macro_use] +extern crate error_chain; +extern crate futures; +#[macro_use] +extern crate log; +extern crate rain_core; +extern crate serde_json; +extern crate tokio_core; + +pub mod client; +pub use client::*; diff --git a/rain_core/Cargo.toml b/rain_core/Cargo.toml index 01cdae3..db4daa9 100644 --- a/rain_core/Cargo.toml +++ b/rain_core/Cargo.toml @@ -38,6 +38,7 @@ serde_cbor = "0.8" serde_derive = "1.0" serde_json = "1.0" tokio-core="0.1" +tokio-io="0.1" tokio-timer = "0.2" [build-dependencies] diff --git a/rain_core/src/comm/executor.rs b/rain_core/src/comm/executor.rs index 6dbf044..f64f7ee 100644 --- a/rain_core/src/comm/executor.rs +++ b/rain_core/src/comm/executor.rs @@ -143,7 +143,7 @@ pub struct DropCachedMsg { #[cfg(test)] mod tests { use super::*; - use serde::{de::DeserializeOwned, Serialize}; + use serde::{Serialize, de::DeserializeOwned}; use serde_cbor; use serde_json; use std::fmt::Debug; diff --git a/rain_core/src/comm/mod.rs b/rain_core/src/comm/mod.rs index cac5179..c087a25 100644 --- a/rain_core/src/comm/mod.rs +++ b/rain_core/src/comm/mod.rs @@ -1,5 +1,7 @@ pub(crate) mod executor; +pub mod rpc; pub use self::executor::{CallMsg, DataLocation, DropCachedMsg, ExecutorToGovernorMessage, GovernorToExecutorMessage, LocalObjectIn, LocalObjectOut, RegisterMsg, ResultMsg}; +pub use self::rpc::new_rpc_system; diff --git a/rain_core/src/comm/rpc.rs b/rain_core/src/comm/rpc.rs new file mode 100644 index 0000000..fed2a2d --- /dev/null +++ b/rain_core/src/comm/rpc.rs @@ -0,0 +1,19 @@ +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use tokio_io::{AsyncRead, AsyncWrite}; + +pub fn new_rpc_system( + stream: Stream, + bootstrap: Option<::capnp::capability::Client>, +) -> RpcSystem +where + Stream: AsyncRead + AsyncWrite + 'static, +{ + let (reader, writer) = stream.split(); + let network = Box::new(twoparty::VatNetwork::new( + reader, + writer, + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + RpcSystem::new(network, bootstrap) +} diff --git a/rain_core/src/lib.rs b/rain_core/src/lib.rs index ff60b45..b212178 100644 --- a/rain_core/src/lib.rs +++ b/rain_core/src/lib.rs @@ -3,7 +3,7 @@ //! This documentation is minimalistic but still hopefully useful. //! As an user, you may be interested in the //! [rain_task lirary documentation](https://docs.rs/rain_task/). -//! +//! //! See `README.md` and the [project page](https://github.com/substantic/rain/) //! for general information. @@ -30,6 +30,7 @@ extern crate serde_cbor; extern crate serde_derive; extern crate serde_json; extern crate tokio_core; +extern crate tokio_io; extern crate tokio_timer; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/rain_server/src/common/connection.rs b/rain_server/src/common/connection.rs index ab7ac4f..0cf3f8c 100644 --- a/rain_server/src/common/connection.rs +++ b/rain_server/src/common/connection.rs @@ -1,8 +1,8 @@ use bytes::BytesMut; use futures::{Future, Stream}; -use tokio_io::codec::length_delimited::{Builder, Framed}; use tokio_io::AsyncRead; use tokio_io::AsyncWrite; +use tokio_io::codec::length_delimited::{Builder, Framed}; use rain_core::errors::{Error, Result}; diff --git a/rain_server/src/governor/graph/dataobj.rs b/rain_server/src/governor/graph/dataobj.rs index d6081b5..64d5325 100644 --- a/rain_server/src/governor/graph/dataobj.rs +++ b/rain_server/src/governor/graph/dataobj.rs @@ -5,9 +5,9 @@ use std::path::Path; use std::sync::Arc; use super::{Graph, TaskRef}; +use governor::WorkDir; use governor::data::Data; use governor::graph::ExecutorRef; -use governor::WorkDir; use wrapped::WrappedRcRefCell; #[derive(Debug)] diff --git a/rain_server/src/governor/rpc/control.rs b/rain_server/src/governor/rpc/control.rs index 90ee9eb..ba3ff12 100644 --- a/rain_server/src/governor/rpc/control.rs +++ b/rain_server/src/governor/rpc/control.rs @@ -3,8 +3,8 @@ use futures::future::Future; use rain_core::{errors::*, types::*, utils::*}; use std::sync::Arc; -use governor::graph::DataObjectState; use governor::StateRef; +use governor::graph::DataObjectState; use rain_core::governor_capnp::governor_control; pub struct GovernorControlImpl { diff --git a/rain_server/src/governor/rpc/executor.rs b/rain_server/src/governor/rpc/executor.rs index ed5f6c3..20fff0a 100644 --- a/rain_server/src/governor/rpc/executor.rs +++ b/rain_server/src/governor/rpc/executor.rs @@ -3,8 +3,8 @@ use rain_core::{comm::*, errors::*, types::*}; use std::path::Path; use std::sync::Arc; -use governor::data::{Data, Storage}; use governor::State; +use governor::data::{Data, Storage}; static PROTOCOL_VERSION: &'static str = "cbor-1"; diff --git a/rain_server/src/governor/rpc/fetch.rs b/rain_server/src/governor/rpc/fetch.rs index ca0af8a..ecc70b4 100644 --- a/rain_server/src/governor/rpc/fetch.rs +++ b/rain_server/src/governor/rpc/fetch.rs @@ -1,12 +1,12 @@ -use futures::future::Either; use futures::IntoFuture; +use futures::future::Either; use futures::{future, Future}; use rain_core::{errors::*, types::*, utils::*}; use std::rc::Rc; +use governor::StateRef; use governor::data::{Data, DataBuilder}; use governor::graph::DataObjectRef; -use governor::StateRef; pub struct FetchContext { pub state_ref: StateRef, diff --git a/rain_server/src/governor/state.rs b/rain_server/src/governor/state.rs index fe99231..998144a 100644 --- a/rain_server/src/governor/state.rs +++ b/rain_server/src/governor/state.rs @@ -12,14 +12,14 @@ use std::time::{Duration, Instant}; use common::Monitor; use common::{create_protocol_stream, new_rpc_system, Connection}; -use governor::data::transport::TransportView; use governor::data::Data; +use governor::data::transport::TransportView; use governor::fs::workdir::WorkDir; use governor::graph::executor::get_log_tails; use governor::graph::{executor_command, DataObject, DataObjectRef, DataObjectState, ExecutorRef, Graph, TaskRef, TaskState}; -use governor::rpc::executor::check_registration; use governor::rpc::GovernorControlImpl; +use governor::rpc::executor::check_registration; use governor::tasks::TaskInstance; use wrapped::WrappedRcRefCell; diff --git a/rain_server/src/main.rs b/rain_server/src/main.rs index 3904dd6..94ac0fb 100644 --- a/rain_server/src/main.rs +++ b/rain_server/src/main.rs @@ -50,6 +50,7 @@ mod wrapped; use clap::{App, Arg, ArgMatches, SubCommand}; use nix::unistd::getpid; + use std::collections::HashMap; use std::error::Error; use std::io::Read; @@ -191,6 +192,22 @@ impl GovernorConfig { } } +fn resolve_server_address(address: &str) -> SocketAddr { + match address.to_socket_addrs() { + Err(_) => { + error!("Cannot resolve server address"); + exit(1); + } + Ok(mut addrs) => match addrs.next() { + None => { + error!("Cannot resolve server address"); + exit(1); + } + Some(ref addr) => *addr, + }, + } +} + fn run_governor(_global_args: &ArgMatches, cmd_args: &ArgMatches) { info!("Starting Rain {} governor", VERSION); let ready_file = cmd_args.value_of("READY_FILE"); @@ -202,19 +219,7 @@ fn run_governor(_global_args: &ArgMatches, cmd_args: &ArgMatches) { server_address = format!("{}:{}", server_address, DEFAULT_SERVER_PORT); } - let server_addr = match server_address.to_socket_addrs() { - Err(_) => { - error!("Cannot resolve server address: "); - exit(1); - } - Ok(mut addrs) => match addrs.next() { - None => { - error!("Cannot resolve server address"); - exit(1); - } - Some(ref addr) => *addr, - }, - }; + let server_addr = resolve_server_address(&server_address); let state = { let config = cmd_args.value_of("GOVERNOR_CONFIG").map(|path| { @@ -415,8 +420,32 @@ fn run_starter(_global_args: &ArgMatches, cmd_args: &ArgMatches) { } } +fn stop_server(_global_args: &ArgMatches, cmd_args: &ArgMatches) { + /*let default_address = format!("localhost:{}", DEFAULT_SERVER_PORT); + let mut address = cmd_args + .value_of("SERVER_ADDRESS") + .unwrap_or(&default_address) + .to_string(); + + if !address.contains(':') { + address = format!("{}:{}", address, DEFAULT_SERVER_PORT); + } + + let scheduler: SocketAddr = resolve_server_address(&address); + let client = client::Client::new(scheduler).unwrap_or_else(|err| { + error!("Couldn't connect to server at {}: {}", address, err); + exit(1); + }); + client.terminate_server().unwrap_or_else(|err| { + error!("Couldn't stop server: {}", err); + exit(1); + }); + + println!("Server at {} was successfully stopped", address);*/ +} + fn init_log() { - // T emporary simple logger for better module log control, default level is INFO + // Temporary simple logger for better module log control, default level is INFO // TODO: replace with Fern or log4rs later if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "info"); @@ -577,15 +606,22 @@ fn main() { .long("--logdir") .help("Logging directory for governors & server (default /tmp/rain-logs/run-$HOSTANE-$PID)") .takes_value(true))) + .subcommand( // ---- STOP ---- + SubCommand::with_name("stop") + .about("Stop server and all governors connected to it") + .arg(Arg::with_name("SERVER_ADDRESS") + .help("Address of the server (default = localhost:7210)") + .takes_value(true))) .get_matches(); match args.subcommand() { ("server", Some(cmd_args)) => run_server(&args, cmd_args), ("governor", Some(cmd_args)) => run_governor(&args, cmd_args), ("start", Some(cmd_args)) => run_starter(&args, cmd_args), + ("stop", Some(cmd_args)) => stop_server(&args, cmd_args), _ => { error!("No subcommand provided."); - ::std::process::exit(1); + exit(1); } } } diff --git a/rain_server/src/server/http.rs b/rain_server/src/server/http.rs index 5b63579..682f545 100644 --- a/rain_server/src/server/http.rs +++ b/rain_server/src/server/http.rs @@ -145,9 +145,7 @@ impl Service for RequestHandler { &include_bytes!("./../../dashboard/dist/main.css.gz")[..], ) } - _ => static_data_response( - &include_bytes!("./../../dashboard/dist/index.html")[..], - ), + _ => static_data_response(&include_bytes!("./../../dashboard/dist/index.html")[..]), /*path => { warn!("Invalid HTTP request: {}", path); Response::new().with_status(StatusCode::NotFound) diff --git a/rain_server/src/server/logging/sqlite_logger.rs b/rain_server/src/server/logging/sqlite_logger.rs index 2b9cc10..17590bf 100644 --- a/rain_server/src/server/logging/sqlite_logger.rs +++ b/rain_server/src/server/logging/sqlite_logger.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; -use futures::sync::{mpsc, oneshot}; use futures::Future; use futures::Stream; +use futures::sync::{mpsc, oneshot}; use rusqlite::Connection; use serde_json; use std::path::PathBuf; diff --git a/rain_server/src/server/rpc/client.rs b/rain_server/src/server/rpc/client.rs index bad6331..cb81f41 100644 --- a/rain_server/src/server/rpc/client.rs +++ b/rain_server/src/server/rpc/client.rs @@ -7,6 +7,7 @@ use std::net::SocketAddr; use server::graph::{ClientRef, TaskRef}; use server::graph::{DataObjectRef, DataObjectState}; use server::state::StateRef; +use std::process::exit; pub struct ClientServiceImpl { state: StateRef, @@ -283,7 +284,7 @@ impl client_service::Server for ClientServiceImpl { object_ids.len() ); Promise::err(::capnp::Error::failed( - "wait_sone is not implemented yet".to_string(), + "wait_some is not implemented yet".to_string(), )) } @@ -489,4 +490,14 @@ impl client_service::Server for ClientServiceImpl { results.get_state().unwrap().set_ok(()); Promise::ok(()) } + + #[allow(unreachable_code)] + fn terminate_server( + &mut self, + _params: client_service::TerminateServerParams, + _results: client_service::TerminateServerResults, + ) -> Promise<(), ::capnp::Error> { + exit(0); + Promise::ok(()) + } } diff --git a/rain_server/src/start/process.rs b/rain_server/src/start/process.rs index fbde693..8a1b4de 100644 --- a/rain_server/src/start/process.rs +++ b/rain_server/src/start/process.rs @@ -1,4 +1,3 @@ -use std::os::unix::io::{FromRawFd, IntoRawFd}; use std::path::Path; use std::process::{Child, Command, Stdio}; @@ -30,16 +29,12 @@ impl Process { ready: Readiness, command: &mut Command, ) -> Result { - let log_path_out_id = - ::std::fs::File::create(log_dir.join(&format!("{}.out", name)))?.into_raw_fd(); - let log_path_err_id = - ::std::fs::File::create(log_dir.join(&format!("{}.err", name)))?.into_raw_fd(); - - let log_path_out_pipe = unsafe { Stdio::from_raw_fd(log_path_out_id) }; - let log_path_err_pipe = unsafe { Stdio::from_raw_fd(log_path_err_id) }; - - command.stdout(log_path_out_pipe); - command.stderr(log_path_err_pipe); + command.stdout(::std::fs::File::create( + log_dir.join(&format!("{}.out", name)), + )?); + command.stderr(::std::fs::File::create( + log_dir.join(&format!("{}.err", name)), + )?); Ok(Self { name: name.to_string(), diff --git a/rain_task/src/macros.rs b/rain_task/src/macros.rs index d75ece6..dd5b261 100644 --- a/rain_task/src/macros.rs +++ b/rain_task/src/macros.rs @@ -88,7 +88,8 @@ macro_rules! register_task_make_call { #[macro_export] macro_rules! register_task { ($executor: expr, $name: expr, [$($params: tt)*], $taskfn: expr) => ({ - $executor.register_task($name, |ctx: &mut Context, ins: &[DataInstance], outs: &mut [Output]| -> TaskResult<()> { + $executor.register_task($name, |ctx: &mut Context, ins: &[DataInstance], + outs: &mut [Output]| -> TaskResult<()> { register_task_make_call!($taskfn, ins, outs, ($($params)*), (ctx)) }) }); diff --git a/rain_task/src/tests.rs b/rain_task/src/tests.rs index dc1462a..d0d175c 100644 --- a/rain_task/src/tests.rs +++ b/rain_task/src/tests.rs @@ -202,7 +202,8 @@ fn register_task() { register_task!(s, "task4", [I I O O], |_ctx, _in1, _in2, _out1, _out2| Ok(())); register_task!(s, "task5", [Is Os], task1); register_task!(s, "task6", [I O Is Os], - |_ctx, _i1: &DataInstance, _o1: &mut Output, _is: &[DataInstance], _os: &mut [Output]| Ok(())); + |_ctx, _i1: &DataInstance, _o1: &mut Output, _is: &[DataInstance], + _os: &mut [Output]| Ok(())); } fn task_cat(_ctx: &mut Context, inputs: &[DataInstance], outputs: &mut [Output]) -> TaskResult<()> { @@ -264,25 +265,27 @@ fn run_cat_task() { fn run_long_cat() { let (mut s, handle) = setup( "run_long_cat", - vec![call_msg( - 1, - "run_long_cat/cat", - vec![ - data_spec( - 1, - "in1", - Some(DataLocation::Memory( - [0u8; MEM_BACKED_LIMIT - 1].as_ref().into(), - )), - ), - data_spec( - 2, - "in2", - Some(DataLocation::Memory([0u8; 2].as_ref().into())), - ), - ], - vec![data_spec(6, "out", None)], - )], + vec![ + call_msg( + 1, + "run_long_cat/cat", + vec![ + data_spec( + 1, + "in1", + Some(DataLocation::Memory( + [0u8; MEM_BACKED_LIMIT - 1].as_ref().into(), + )), + ), + data_spec( + 2, + "in2", + Some(DataLocation::Memory([0u8; 2].as_ref().into())), + ), + ], + vec![data_spec(6, "out", None)], + ), + ], ); s.register_task("cat", task_cat); s.run(); @@ -301,12 +304,14 @@ fn run_long_cat() { fn run_empty_cat() { let (mut s, handle) = setup( "run_empty_cat", - vec![call_msg( - 1, - "run_empty_cat/cat", - vec![], - vec![data_spec(3, "out", None)], - )], + vec![ + call_msg( + 1, + "run_empty_cat/cat", + vec![], + vec![data_spec(3, "out", None)], + ), + ], ); s.register_task("cat", task_cat); s.run(); @@ -322,16 +327,16 @@ fn run_empty_cat() { fn run_pass_cat() { let (mut s, handle) = setup( "run_pass_cat", - vec![call_msg( - 2, - "run_pass_cat/cat", - vec![data_spec( - 1, - "in", - Some(DataLocation::Memory("drip".into())), - )], - vec![data_spec(2, "out", None)], - )], + vec![ + call_msg( + 2, + "run_pass_cat/cat", + vec![ + data_spec(1, "in", Some(DataLocation::Memory("drip".into()))), + ], + vec![data_spec(2, "out", None)], + ), + ], ); s.register_task("cat", task_cat); s.run(); @@ -347,12 +352,14 @@ fn run_pass_cat() { fn test_make_file_backed() { let (mut s, handle) = setup( "test_make_file_backed", - vec![call_msg( - 2, - "test_make_file_backed/mfb", - vec![], - vec![data_spec(3, "out", None)], - )], + vec![ + call_msg( + 2, + "test_make_file_backed/mfb", + vec![], + vec![data_spec(3, "out", None)], + ), + ], ); s.register_task("mfb", |_ctx, _ins, outs| { write!(outs[0], "Rainfall")?; @@ -373,16 +380,16 @@ fn test_make_file_backed() { fn test_get_path_writing() { let (mut s, handle) = setup( "test_get_path_writing", - vec![call_msg( - 2, - "test_get_path_writing/gp", - vec![data_spec( - 1, - "in", - Some(DataLocation::Memory("drizzle".into())), - )], - vec![], - )], + vec![ + call_msg( + 2, + "test_get_path_writing/gp", + vec![ + data_spec(1, "in", Some(DataLocation::Memory("drizzle".into()))), + ], + vec![], + ), + ], ); s.register_task("gp", |_ctx, ins, _outs| { let p = ins[0].get_path(); @@ -404,12 +411,14 @@ fn test_get_path_writing() { fn run_stage_file() { let (mut s, handle) = setup( "run_stage_file", - vec![call_msg( - 2, - "run_stage_file/stage", - vec![], - vec![data_spec(2, "out", None)], - )], + vec![ + call_msg( + 2, + "run_stage_file/stage", + vec![], + vec![data_spec(2, "out", None)], + ), + ], ); s.register_task("stage", |_ctx, _inp, outp| { let mut f = fs::File::create("testfile.txt").unwrap(); @@ -494,11 +503,13 @@ fn read_set_content_type() { let mut call = call_msg( 2, "read_set_content_type/foo", - vec![data_spec( - 2, - "out", - Some(DataLocation::Memory((b"content!" as &[u8]).into())), - )], + vec![ + data_spec( + 2, + "out", + Some(DataLocation::Memory((b"content!" as &[u8]).into())), + ), + ], vec![data_spec(3, "out", None)], ); call.inputs[0].info = Some(ObjectInfo {