Skip to content

Commit

Permalink
[subworker] WIP on rust SW: mosly finished rust API
Browse files Browse the repository at this point in the history
  • Loading branch information
gavento committed Apr 27, 2018
1 parent efd5adf commit 2414c19
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 114 deletions.
48 changes: 0 additions & 48 deletions rain-task/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 1 addition & 19 deletions rain-task/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,11 @@ serde = "*"
serde_json = "*"
byteorder = "*"
serde_cbor = "*"
memmap = "*"

atty="*"
error-chain="*"
clap = "*"
futures="0.1"
tokio-core="*"
tokio-io="*"
tokio-timer = "*"
tokio-uds="*"
tokio-process="*"
env_logger = "*"
arrayref = "*"
num_cpus = "*"
nix = "*"
lazy_static = "*"
bytes = "*"
tempdir = "*"
memmap = "*"
sysconf = "*"
sys-info = "*"
chrono = { version = "*", features = ["serde"] }
tar = "*"
toml = "*"
walkdir = "*"
rmp-serde = "*"
rmp = "*"
53 changes: 53 additions & 0 deletions rain-task/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use super::*;
use std::{fs, env};

///
#[derive(Debug)]
pub struct Context<'a> {
spec: &'a CallMsg,
pub(crate) inputs: Vec<DataInstance<'a>>,
pub(crate) outputs: Vec<Output<'a>>,
/// Task attributes
pub(crate) attributes: Attributes,
/// Absolute path to task working dir
pub(crate) work_dir: PathBuf,
/// Absolute path to staging dir with input and output objects
stage_dir: PathBuf,
pub(crate) success: bool,
}

impl<'a> Context<'a> {
pub(crate) fn for_call_msg(cm: &'a CallMsg, work_dir: &Path) -> Result<Self> {
assert!(work_dir.is_absolute());
let stage_dir = work_dir.join("stage");
fs::create_dir_all(&stage_dir)?;
let inputs = cm.inputs.iter().enumerate().map(|(order, inp)| {
DataInstance::new(inp, &stage_dir, order)
}).collect();
let outputs = cm.outputs.iter().enumerate().map(|(order, outp)| {
Output::new(outp, &stage_dir, order)
}).collect();
Ok(Context {
spec: cm,
inputs: inputs,
outputs: outputs,
attributes: Attributes::new(),
work_dir: work_dir.into(),
stage_dir: stage_dir,
success: true,
})
}

pub(crate) fn into_result_msg(self) -> ResultMsg {
ResultMsg {
task: self.spec.task,
success: self.success,
attributes: self.attributes,
outputs: self.outputs.into_iter().map(|o| {
let (os, _cached) = o.into_output_spec();
os
}).collect(),
cached_objects: Vec::new(),
}
}
}
89 changes: 89 additions & 0 deletions rain-task/src/input.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::sync::{Mutex, MutexGuard};
use librain::worker::rpc::subworker_serde::*;
use memmap::Mmap;
use std::mem;
use super::*;

#[derive(Debug)]
enum InputState {
SpecMem,
SpecMemAndFile,
NotOpen,
MMap(File, Mmap),
}

#[derive(Debug)]
pub struct DataInstance<'a> {
spec: &'a DataObjectSpec,
state: Mutex<InputState>,
/// The absolute path to the existing (or potential) file or dir.
/// NB: Must NOT be modified after DataInstance creation!
path: PathBuf,
order: usize,
}

impl<'a> DataInstance<'a> {
pub(crate) fn new(spec: &'a DataObjectSpec, stage_path: &Path, order: usize) -> Self {
let istate = match spec.location.as_ref().expect("bug: input needs a data location") {
DataLocation::Cached => panic!("bug: cached object requested"),
DataLocation::OtherObject(_) => panic!("bug: `OtherObject` location in input"),
DataLocation::Memory(_) => InputState::SpecMem,
DataLocation::Path(_) => InputState::NotOpen,
};
let path = if let DataLocation::Path(p) = spec.location.as_ref().unwrap() {
stage_path.join(p)
} else {
stage_path.join(format!("input-{}-{}", spec.id.get_session_id(), spec.id.get_id()))
};
DataInstance {
spec: spec,
state: Mutex::new(istate),
path: path,
order: order,
}
}

pub fn get_bytes(&self) -> Result<&'a[u8]> {
// TODO: Check this is not a dir
let mut guard = self.state.lock().unwrap();
if matchvar!(*guard, InputState::SpecMem)
|| matchvar!(*guard, InputState::SpecMemAndFile) {
if let Some(DataLocation::Memory(d)) = self.spec.location.as_ref() {
return Ok(&d)
} else {
panic!("bug: spec suddenly does not contain memory location");
}
}
if matchvar!(*guard, InputState::NotOpen) {
let f = File::open(&self.path)?;
let mmap = unsafe { Mmap::map(&f)? };
*guard = InputState::MMap(f, mmap);
}
if let InputState::MMap(_, ref mmap) = *guard {
// This is safe since the Mmap is not dealocated before the
// containing Input<'a>.
return Ok( unsafe { mem::transmute::<&[u8], &'a [u8]>(&*mmap) });
}
unreachable!();
}

pub fn get_path(&self) -> Result<&'a Path> {
{
let guard = self.state.lock().unwrap();
if matchvar!(*guard, InputState::SpecMem) {
unimplemented!(); // TODO: Save the file to disk
}
}
// This is safe since the PathBuf is never modified after creation.
return Ok( unsafe { mem::transmute::<&Path, &'a Path>(&self.path) });
}

pub fn get_str(&self) -> Result<&'a str> {
unimplemented!()
}

pub fn get_content_type(&self) -> Result<&'a[u8]> {
unimplemented!()
}
}

51 changes: 13 additions & 38 deletions rain-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ extern crate log;
#[macro_use]
extern crate error_chain;
extern crate serde_cbor;

extern crate memmap;

use std::collections::HashMap;
use std::path::PathBuf;
use std::os::unix::net::UnixStream;
use std::io;
use std::default::Default;
use std::sync::{Mutex, MutexGuard};
use std::mem::swap;
use std::fs::{OpenOptions, File};
use std::io::BufWriter;
Expand All @@ -26,10 +25,17 @@ use librain::common::Attributes;
use librain::worker::rpc::subworker_serde::*;
use librain::common::id::SId;

/// Maximal protocol message size (128 MB)
pub const MAX_MSG_SIZE: usize = 128 * 1024 * 1024;
pub const MSG_PROTOCOL: &str = "v1cbor";

/// Current protocol code name and magic string
pub const MSG_PROTOCOL: &str = "v1-CBOR";

/// Size limit for memory-backed objects. Larger blobs
/// get written to the filesystem.
pub const MEM_BACKED_LIMIT: usize = 128 * 1024;

// Local macro to match variants
macro_rules! matchvar {
($ex: expr, $pat: pat) => {
{ if let $pat = $ex { true } else { false } }
Expand All @@ -48,42 +54,11 @@ pub use subworker::*;
mod output;
pub use output::*;

mod context;
pub use context::*;

#[derive(Debug, Default)]
pub struct Context {
num_inputs: usize,
num_outputs: usize,
/// Task attributes
attributes: Attributes,
/// Absolute path to task working dir
work_dir: PathBuf,
/// Absolute path to staging dir with input and output objects
stage_dir: PathBuf,
}

#[derive(Debug)]
pub struct DataInstance<'a> {
desc: &'a DataObjectSpec,
data: Mutex<Option<&'a[u8]>>,
}

impl<'a> DataInstance<'a> {
pub fn get_bytes(&self) -> Result<&'a[u8]> {
unimplemented!()
}
pub fn get_path(&self) -> Result<&'a Path> {
unimplemented!()
}
pub fn get_str(&self) -> Result<&'a str> {
unimplemented!()
}
pub fn get_content_type(&self) -> Result<&'a[u8]> {
unimplemented!()
}
}


pub type TaskFn = Fn(&mut Context, &[DataInstance], &mut [Output]) -> Result<()>;
mod input;
pub use input::*;

/*
macro_rules! count_params {
Expand Down
6 changes: 3 additions & 3 deletions rain-task/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<'a> Output<'a> {
/// Currently, this subworker never caches.
///
/// NOTE: The returned path may be still an open file until this Output is dropped.
pub(crate) fn create_output_spec(self) -> (DataObjectSpec, bool) {
pub(crate) fn into_output_spec(self) -> (DataObjectSpec, bool) {
(DataObjectSpec {
id: self.desc.id,
label: None,
Expand All @@ -91,7 +91,7 @@ impl<'a> Output<'a> {
/// Moves the directory to the staging area.
/// You should make sure no files in the directory are open after this operation.
/// Not allowed if the output was submitted to.
pub fn stage_directory<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
pub fn stage_directory<P: AsRef<Path>>(&self, path: P) -> Result<()> {
let path: &Path = path.as_ref();
// TODO: Check for self directory type
if !path.is_dir() {
Expand All @@ -110,7 +110,7 @@ impl<'a> Output<'a> {
/// Moves the directory to the staging area.
/// You should make sure no files in the directory are open after this operation.
/// Not allowed if the output was submitted or written to.
pub fn stage_file<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
pub fn stage_file<P: AsRef<Path>>(&self, path: P) -> Result<()> {
let path: &Path = path.as_ref();
// TODO: Check for self non-directory type
if !path.is_file() {
Expand Down
Loading

0 comments on commit 2414c19

Please sign in to comment.