Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Kobzol committed Jul 3, 2018
1 parent 7dc23a1 commit 7ea984b
Showing 32 changed files with 205 additions and 144 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ version = "0.3.0"
[workspace]

members = [
"rain_client",
"rain_core",
"rain_server",
"rain_task",
27 changes: 27 additions & 0 deletions rain_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "rain_client"
version = "0.3.0"

description = "Distributed computational framework for large-scale task-based pipelines. Client library in Rust."
# documentation = "https://docs.rs/rain_task/" # default docs.rs
homepage = "https://github.com/substantic/rain"
repository = "https://github.com/substantic/rain/"
readme = "README.md"
authors = [
"Stanislav Bohm <[email protected]>",
"Tomas Gavenciak <[email protected]>",
"Vojtech Cima <[email protected]>",
]
license = "MIT"

exclude = ["testing/**/*"]

[badges]
travis-ci = { repository = "substantic/rain", branch = "master" }
maintenance = { status = "actively-developed" }

[dependencies]
futures="0.1"
log = "0.4"
rain_core = "0.3.0"
tokio-core="0.1"
4 changes: 2 additions & 2 deletions src/client/client.rs → rain_client/src/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::error::Error;
use std::net::SocketAddr;

use CLIENT_PROTOCOL_VERSION;
use super::session::Session;
use super::communicator::Communicator;
use super::session::Session;
use rain_core::CLIENT_PROTOCOL_VERSION;
use std::rc::Rc;

pub struct Client {
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use tokio_core::reactor::Core;
use futures::Future;
use rain_core::capnp_rpc::rpc_twoparty_capnp;
use rain_core::comm::new_rpc_system;
use std::error::Error;
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
use std::error::Error;
use common::rpc::new_rpc_system;
use capnp_rpc::rpc_twoparty_capnp;
use futures::Future;
use tokio_core::reactor::Core;

use super::task::Task;
use common::id::{DataObjectId, TaskId};
use common::convert::{FromCapnp, ToCapnp};
use client::dataobject::DataObject;
use rain_core::types::{DataObjectId, TaskId};
use rain_core::utils::{FromCapnp, ToCapnp};
use std::cell::RefCell;
use std::cell::RefMut;

Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use common::Attributes;
use common::id::DataObjectId;
use common::DataType;
use rain_core::types::{DataObjectId, DataType};
use std::cell::Cell;

pub struct DataObject {
pub id: DataObjectId,
pub label: String,
pub keep: Cell<bool>,
pub data: Option<Vec<u8>>,
pub attributes: Attributes,
pub data_type: DataType,
pub attributes: Attributes,
}

impl DataObject {
File renamed without changes.
32 changes: 14 additions & 18 deletions src/client/rpc.rs → rain_client/src/client/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
use super::task::Task;
use client::dataobject::DataObject;
use client::task::TaskInput;
use common::convert::ToCapnp;
use common::id::DataObjectId;
use rain_core::types::DataObjectId;
use rain_core::utils::ToCapnp;

macro_rules! to_capnp_list {
($builder:expr, $items:expr, $name:ident) => {
{
let mut builder = $builder.$name($items.len() as u32);
for (i, obj) in $items.iter().enumerate() {
obj.to_capnp(&mut builder.reborrow().get(i as u32));
}
($builder:expr, $items:expr, $name:ident) => {{
let mut builder = $builder.$name($items.len() as u32);
for (i, obj) in $items.iter().enumerate() {
obj.to_capnp(&mut builder.reborrow().get(i as u32));
}
}
}};
}
macro_rules! from_capnp_list {
($builder:expr, $items:ident, $obj:ident) => {
{
$builder
.$items()?
.iter()
.map(|item| $obj::from_capnp(&item))
.collect()
}
}
($builder:expr, $items:ident, $obj:ident) => {{
$builder
.$items()?
.iter()
.map(|item| $obj::from_capnp(&item))
.collect()
}};
}

impl<'a> ToCapnp<'a> for TaskInput {
13 changes: 5 additions & 8 deletions src/client/session.rs → rain_client/src/client/session.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use super::communicator::Communicator;
use client::dataobject::DataObject;
use client::task::Task;
use std::collections::HashMap;
use std::error::Error;
use client::task::TaskInput;
use common::Attributes;
use common::id::TaskId;
use common::id::DataObjectId;
use common::id::SId;
use common::DataType;
use rain_core::common_capnp;
use rain_core::types::{DataObjectId, DataType, SId, TaskId};
use std::cell::Cell;
use std::collections::HashMap;
use std::error::Error;
use std::rc::Rc;

pub type DataObjectPtr = Rc<DataObject>;
@@ -82,7 +79,7 @@ impl Session {
}
pub fn wait_all(&mut self) -> Result<(), Box<Error>> {
self.comm.wait(
&vec![TaskId::new(self.id, ::common_capnp::ALL_TASKS_ID)],
&vec![TaskId::new(self.id, common_capnp::ALL_TASKS_ID)],
&vec![],
)
}
6 changes: 2 additions & 4 deletions src/client/task.rs → rain_client/src/client/task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::error::Error;
use common::Attributes;
use common::id::TaskId;
use super::session::DataObjectPtr;
use rain_core::types::TaskId;
use std::error::Error;

pub struct TaskInput {
pub label: Option<String>,
@@ -13,7 +12,6 @@ pub struct Task {
pub command: String,
pub inputs: Vec<TaskInput>,
pub outputs: Vec<DataObjectPtr>,
pub attributes: Attributes,
}

impl Task {
2 changes: 1 addition & 1 deletion src/client/tasks.rs → rain_client/src/client/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use super::task::TaskInput;
use super::session::{DataObjectPtr, Session, TaskPtr};
use super::task::TaskInput;

pub trait CommonTasks {
fn concat(&mut self, objects: &[DataObjectPtr]) -> TaskPtr;
7 changes: 7 additions & 0 deletions rain_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
extern crate futures;
#[macro_use]
extern crate log;
extern crate rain_core;
extern crate tokio_core;

mod client;
1 change: 1 addition & 0 deletions rain_core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ serde_cbor = "0.8"
serde_derive = "1.0"
serde_json = "1.0"
tokio-core="0.1"
tokio-io="0.1"
tokio-timer = "0.2"

[build-dependencies]
2 changes: 1 addition & 1 deletion rain_core/src/comm/executor.rs
Original file line number Diff line number Diff line change
@@ -143,7 +143,7 @@ pub struct DropCachedMsg {
#[cfg(test)]
mod tests {
use super::*;
use serde::{de::DeserializeOwned, Serialize};
use serde::{Serialize, de::DeserializeOwned};
use serde_cbor;
use serde_json;
use std::fmt::Debug;
2 changes: 2 additions & 0 deletions rain_core/src/comm/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub(crate) mod executor;
pub mod rpc;

pub use self::executor::{CallMsg, DataLocation, DropCachedMsg, ExecutorToGovernorMessage,
GovernorToExecutorMessage, LocalObjectIn, LocalObjectOut, RegisterMsg,
ResultMsg};
pub use self::rpc::new_rpc_system;
19 changes: 19 additions & 0 deletions rain_core/src/comm/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use tokio_io::{AsyncRead, AsyncWrite};

pub fn new_rpc_system<Stream>(
stream: Stream,
bootstrap: Option<::capnp::capability::Client>,
) -> RpcSystem<twoparty::VatId>
where
Stream: AsyncRead + AsyncWrite + 'static,
{
let (reader, writer) = stream.split();
let network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
RpcSystem::new(network, bootstrap)
}
3 changes: 2 additions & 1 deletion rain_core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
//! This documentation is minimalistic but still hopefully useful.
//! As an user, you may be interested in the
//! [rain_task lirary documentation](https://docs.rs/rain_task/).
//!
//!
//! See `README.md` and the [project page](https://github.com/substantic/rain/)
//! for general information.
@@ -30,6 +30,7 @@ extern crate serde_cbor;
extern crate serde_derive;
extern crate serde_json;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_timer;

pub const VERSION: &str = env!("CARGO_PKG_VERSION");
2 changes: 1 addition & 1 deletion rain_server/src/common/connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use bytes::BytesMut;
use futures::{Future, Stream};
use tokio_io::codec::length_delimited::{Builder, Framed};
use tokio_io::AsyncRead;
use tokio_io::AsyncWrite;
use tokio_io::codec::length_delimited::{Builder, Framed};

use rain_core::errors::{Error, Result};

2 changes: 1 addition & 1 deletion rain_server/src/governor/graph/dataobj.rs
Original file line number Diff line number Diff line change
@@ -5,9 +5,9 @@ use std::path::Path;
use std::sync::Arc;

use super::{Graph, TaskRef};
use governor::WorkDir;
use governor::data::Data;
use governor::graph::ExecutorRef;
use governor::WorkDir;
use wrapped::WrappedRcRefCell;

#[derive(Debug)]
2 changes: 1 addition & 1 deletion rain_server/src/governor/rpc/control.rs
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@ use futures::future::Future;
use rain_core::{errors::*, types::*, utils::*};
use std::sync::Arc;

use governor::graph::DataObjectState;
use governor::StateRef;
use governor::graph::DataObjectState;
use rain_core::governor_capnp::governor_control;

pub struct GovernorControlImpl {
2 changes: 1 addition & 1 deletion rain_server/src/governor/rpc/executor.rs
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@ use rain_core::{comm::*, errors::*, types::*};
use std::path::Path;
use std::sync::Arc;

use governor::data::{Data, Storage};
use governor::State;
use governor::data::{Data, Storage};

static PROTOCOL_VERSION: &'static str = "cbor-1";

4 changes: 2 additions & 2 deletions rain_server/src/governor/rpc/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use futures::future::Either;
use futures::IntoFuture;
use futures::future::Either;
use futures::{future, Future};
use rain_core::{errors::*, types::*, utils::*};
use std::rc::Rc;

use governor::StateRef;
use governor::data::{Data, DataBuilder};
use governor::graph::DataObjectRef;
use governor::StateRef;

pub struct FetchContext {
pub state_ref: StateRef,
4 changes: 2 additions & 2 deletions rain_server/src/governor/state.rs
Original file line number Diff line number Diff line change
@@ -12,14 +12,14 @@ use std::time::{Duration, Instant};
use common::Monitor;
use common::{create_protocol_stream, new_rpc_system, Connection};

use governor::data::transport::TransportView;
use governor::data::Data;
use governor::data::transport::TransportView;
use governor::fs::workdir::WorkDir;
use governor::graph::executor::get_log_tails;
use governor::graph::{executor_command, DataObject, DataObjectRef, DataObjectState, ExecutorRef,
Graph, TaskRef, TaskState};
use governor::rpc::executor::check_registration;
use governor::rpc::GovernorControlImpl;
use governor::rpc::executor::check_registration;
use governor::tasks::TaskInstance;
use wrapped::WrappedRcRefCell;

Loading

0 comments on commit 7ea984b

Please sign in to comment.