Skip to content

Commit

Permalink
[api] use rain_core in rust client
Browse files Browse the repository at this point in the history
  • Loading branch information
Kobzol committed Jul 7, 2018
1 parent ffa94b1 commit abdfda2
Show file tree
Hide file tree
Showing 38 changed files with 423 additions and 357 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ version = "0.3.0"
[workspace]

members = [
"rain_client",
"rain_client_test",
"rain_core",
"rain_server",
"rain_task",
"rain_task_test",
]

[patch.crates-io]
rain_client = { path = "rain_client" }
rain_core = { path = "rain_core" }
rain_task = { path = "rain_task" }
31 changes: 31 additions & 0 deletions rain_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "rain_client"
version = "0.3.0"

description = "Distributed computational framework for large-scale task-based pipelines. Client library in Rust."
# documentation = "https://docs.rs/rain_task/" # default docs.rs
homepage = "https://github.com/substantic/rain"
repository = "https://github.com/substantic/rain/"
readme = "README.md"
authors = [
"Stanislav Bohm <[email protected]>",
"Tomas Gavenciak <[email protected]>",
"Vojtech Cima <[email protected]>",
]
license = "MIT"

exclude = ["testing/**/*"]

[badges]
travis-ci = { repository = "substantic/rain", branch = "master" }
maintenance = { status = "actively-developed" }

[dependencies]
capnp-rpc = "0.8"
error-chain = "0.11"
futures = "0.1"
log = "0.4"
rain_core = "0.3.0"
tokio-core = "0.1"
serde = "1"
serde_json = "1"
5 changes: 3 additions & 2 deletions src/client/client.rs → rain_client/src/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::error::Error;
use std::net::SocketAddr;

use CLIENT_PROTOCOL_VERSION;
use super::session::Session;
use super::communicator::Communicator;
use super::session::Session;
use rain_core::CLIENT_PROTOCOL_VERSION;
use std::rc::Rc;

pub struct Client {
Expand All @@ -26,3 +26,4 @@ impl Client {
self.comm.terminate_server()
}
}

Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use tokio_core::reactor::Core;
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
use std::error::Error;
use common::rpc::new_rpc_system;
use capnp_rpc::rpc_twoparty_capnp;
use futures::Future;
use rain_core::comm::new_rpc_system;
use std::error::Error;
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;

use super::task::Task;
use common::id::{DataObjectId, TaskId};
use common::convert::{FromCapnp, ToCapnp};
use client::dataobject::DataObject;
use std::cell::RefCell;
use std::cell::RefMut;
use rain_core::types::{DataObjectId, TaskId};
use rain_core::utils::{FromCapnp, ToCapnp};
use rain_core::{client_capnp, common_capnp, server_capnp};
use std::cell::{RefCell, RefMut};

pub struct Communicator {
core: RefCell<Core>,
service: ::client_capnp::client_service::Client,
service: client_capnp::client_service::Client,
}

impl Communicator {
Expand All @@ -28,7 +28,7 @@ impl Communicator {
debug!("Connection to server {} established", scheduler);

let mut rpc = Box::new(new_rpc_system(stream, None));
let bootstrap: ::server_capnp::server_bootstrap::Client =
let bootstrap: server_capnp::server_bootstrap::Client =
rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err)));

Expand Down Expand Up @@ -116,19 +116,28 @@ impl Communicator {
))
}

pub fn fetch(&self, object_id: DataObjectId) -> Result<Vec<u8>, Box<Error>> {
pub fn fetch(&self, object_id: &DataObjectId) -> Result<Vec<u8>, Box<Error>> {
let mut req = self.service.fetch_request();
object_id.to_capnp(&mut req.get().get_id().unwrap());
req.get().set_size(1024);

let response = self.comm().run(req.send().promise)?;

// TODO: handle error states
let reader = response.get()?;
match reader.get_status().which()? {
::common_capnp::fetch_result::status::Ok(()) => {
common_capnp::fetch_result::status::Ok(()) => {
let data = reader.get_data()?;
Ok(Vec::from(data))
}
common_capnp::fetch_result::status::Removed(()) => {
print!("Removed");
Ok(vec![])
}
common_capnp::fetch_result::status::Error(err) => {
print!("Error: {:?}", err.unwrap().get_message());
Ok(vec![])
}
_ => bail!("Non-ok status"),
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use common::Attributes;
use common::id::DataObjectId;
use common::DataType;
use rain_core::types::{DataObjectId, ObjectSpec};
use std::cell::Cell;

pub struct DataObject {
pub id: DataObjectId,
pub label: String,
pub keep: Cell<bool>,
pub data: Option<Vec<u8>>,
pub attributes: Attributes,
pub data_type: DataType,
pub spec: ObjectSpec,
}

impl DataObject {
pub fn keep(&self) {
self.keep.set(true);
}
pub fn id(&self) -> DataObjectId {
self.spec.id
}
}
49 changes: 49 additions & 0 deletions rain_client/src/client/localcluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::error::Error;
use std::net::SocketAddr;
use super::client::Client;
use std::process::{Child, Command};
use std::path::PathBuf;
use std::{thread, time};
use std::process::Stdio;

pub struct LocalCluster {
listen_addr: SocketAddr,
binary: PathBuf
}

impl LocalCluster {
pub fn new(binary: &str) -> Result<Self, Box<Error>> {
let mut cluster = LocalCluster {
binary: PathBuf::from(binary),
listen_addr: SocketAddr::new("127.0.0.1".parse()?, 7210)
};
cluster.start()?;

Ok(cluster)
}

fn start(&mut self) -> Result<(), Box<Error>> {
Command::new(&self.binary)
.arg("start")
.arg("--listen")
.arg(self.listen_addr.to_string())
.arg("--simple")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()?
.wait()?;

Ok(())
}

pub fn create_client(&self) -> Result<Client, Box<Error>> {
Client::new(self.listen_addr)
}
}

impl Drop for LocalCluster {
fn drop(&mut self) {
Client::new(self.listen_addr).unwrap().terminate_server();
}
}
2 changes: 2 additions & 0 deletions src/client/mod.rs → rain_client/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod client;
pub mod session;
pub mod tasks;
pub mod localcluster;

#[macro_use]
mod rpc;
Expand All @@ -10,3 +11,4 @@ mod task;

pub use self::client::Client;
pub use self::session::Session;
pub use self::localcluster::LocalCluster;
46 changes: 46 additions & 0 deletions rain_client/src/client/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use super::task::Task;
use client::dataobject::DataObject;
use rain_core::client_capnp;
use rain_core::utils::ToCapnp;
use serde_json;

macro_rules! to_capnp_list {
($builder:expr, $items:expr, $name:ident) => {{
let mut builder = $builder.$name($items.len() as u32);
for (i, obj) in $items.iter().enumerate() {
obj.to_capnp(&mut builder.reborrow().get(i as u32));
}
}};
}
macro_rules! from_capnp_list {
($builder:expr, $items:ident, $obj:ident) => {{
$builder
.$items()?
.iter()
.map(|item| $obj::from_capnp(&item))
.collect()
}};
}

impl<'a> ToCapnp<'a> for Task {
type Builder = client_capnp::task::Builder<'a>;

fn to_capnp(&self, builder: &mut Self::Builder) {
builder.set_spec(&serde_json::to_string(&self.spec).unwrap());
}
}
impl<'a> ToCapnp<'a> for DataObject {
type Builder = client_capnp::data_object::Builder<'a>;

fn to_capnp(&self, builder: &mut Self::Builder) {
builder.set_spec(&serde_json::to_string(&self.spec).unwrap());
builder.set_keep(self.keep.get());

if let &Some(ref data) = &self.data {
builder.set_data(&data);
builder.set_has_data(true);
} else {
builder.set_has_data(false);
}
}
}
Loading

0 comments on commit abdfda2

Please sign in to comment.