Skip to content

Commit

Permalink
[subworker] WIP on subworker rust API
Browse files Browse the repository at this point in the history
  • Loading branch information
gavento committed Apr 27, 2018
1 parent 689da6d commit 246e539
Show file tree
Hide file tree
Showing 5 changed files with 398 additions and 204 deletions.
16 changes: 16 additions & 0 deletions rain-task/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use serde_cbor;

// Create the Error, ErrorKind, ResultExt, and Result types
error_chain!{
types {
Error, ErrorKind, ResultExt;
}
foreign_links {
Io(::std::io::Error);
CBOR(serde_cbor::error::Error);
Utf8Err(::std::str::Utf8Error);
}
}

// Explicit alias just to make some IDEs happier
pub type Result<T> = ::std::result::Result<T, Error>;
44 changes: 44 additions & 0 deletions rain-task/src/framing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::os::unix::net::UnixStream;
use byteorder::{ReadBytesExt, WriteBytesExt, LittleEndian};
use super::{Result, WorkerToSubworkerMessage, SubworkerToWorkerMessage, MAX_MSG_SIZE, MSG_PROTOCOL};
use std::io::{Read, Write};
use serde_cbor;

pub(crate) trait SocketExt {
fn write_frame(&mut self, &[u8]) -> Result<()>;
fn read_frame(&mut self) -> Result<Vec<u8>>;
fn write_msg(&mut self, &SubworkerToWorkerMessage) -> Result<()>;
fn read_msg(&mut self) -> Result<WorkerToSubworkerMessage>;
}

impl SocketExt for UnixStream {
fn write_msg(&mut self, m: &SubworkerToWorkerMessage) -> Result<()> {
let data = serde_cbor::to_vec(m)?;
self.write_frame(&data)
}

fn read_msg(&mut self) -> Result<WorkerToSubworkerMessage> {
let data = self.read_frame()?;
let msg = serde_cbor::from_slice::<WorkerToSubworkerMessage>(&data)?;
Ok(msg)
}

fn write_frame(&mut self, data: &[u8]) -> Result<()> {
if data.len() > MAX_MSG_SIZE {
bail!("write_frame: message too long ({} bytes of {} allowed)", data.len(), MAX_MSG_SIZE);
}
self.write_u32::<LittleEndian>(data.len() as u32)?;
self.write_all(data)?;
Ok(())
}

fn read_frame(&mut self) -> Result<Vec<u8>> {
let len = self.read_u32::<LittleEndian>()? as usize;
if len > MAX_MSG_SIZE {
bail!("read_frame: message too long ({} bytes of {} allowed)", len, MAX_MSG_SIZE);
}
let mut data = vec![0; len];
self.read_exact(&mut data)?;
Ok(data)
}
}
223 changes: 19 additions & 204 deletions rain-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,39 @@ use librain::common::Attributes;
use librain::worker::rpc::subworker_serde::*;
use librain::common::id::SId;

pub const MAX_MSG_SIZE: usize = 128 * 1024 * 1024;
pub const MSG_PROTOCOL: &str = "v1cbor";
pub const MEM_BACKED_LIMIT: usize = 128 * 1024;

macro_rules! matchvar {
($ex: expr, $pat: pat) => {
{ if let $pat = $ex { true } else { false } }
};
}

mod framing;
pub use framing::*;
use framing::*;

mod errors;
pub use errors::*;

mod subworker;
pub use subworker::*;

const MEM_BACKED_LIMIT: usize = 128 * 1024;
mod output;
pub use output::*;


#[derive(Debug, Default)]
pub struct Context {
num_inputs: usize,
num_outputs: usize,
/// Task attributes
attributes: Attributes,
/// Absolute path to task working dir
work_dir: PathBuf,
/// Absolute path to staging dir with input and output objects
stage_dir: PathBuf,
}

#[derive(Debug)]
Expand All @@ -65,208 +82,6 @@ impl<'a> DataInstance<'a> {
}
}

#[derive(Debug)]
enum OutputState {
Empty,
MemBacked(Vec<u8>),
FileBacked(BufWriter<File>),
Path(String),
}

/// Represents one concrete output. The output can be either empty (as is initially),
/// set to represent an existing file, set to represent an existing directory, or written
/// to as a `Write`. These three are mutually exclusive, `set_dir_path` and `set_file_path`
/// may be used only once, and not before or after `get_writer`.
///
/// This object is thread-safe and the internal state is guarded by a mutex. Calling
/// `get_writer` locks this mutex and holds it until the returned guard is dropped.
/// This means fast (lockless) writes to the `Write` but you need to make sure your
/// other threads do not starve or deadlock.
#[derive(Debug)]
pub struct Output<'a> {
/// The original output description
desc: &'a DataObjectSpec,
/// Mutex holding the output state
data: Mutex<OutputState>,
/// The resulting attributes. Initially empty.
attributes: Attributes,
/// Path for the resulting file for Writer (if a file will be used)
writer_path: String,
/// Order of the output in outputs
order: usize,
}

//use std::mem::discriminant;
macro_rules! matchvar {
($ex: expr, $pat: pat) => {
{ if let $pat = $ex { true } else { false } }
};
}

use std::fmt;

impl<'a> fmt::Display for Output<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref label) = self.desc.label {
write!(f, "Output #{} (ID {}, label {:?})", self.order, self.desc.id, label)
} else {
write!(f, "Output #{} (ID {}, no label)", self.order, self.desc.id)
}
}
}

use std::ffi::OsString;
impl<'a> Output<'a> {
/// Create an output from DataObjectSpec. Internal.
fn new(spec: &'a DataObjectSpec) -> Self {
Output {
desc: spec,
data: Mutex::new(OutputState::Empty),
attributes: Attributes::new(),
writer_path: format!("output-{}-{}", spec.id.get_session_id(), spec.id.get_id()),
}
}

/// Consume self, yielding a `DataObjectSpec` for `ResultMsg` and
/// a flag whether the output object was cached (only possible if requested).
/// Currently, the subworker never caches.
fn create_output_spec(self) -> (DataObjectSpec, bool) {
(DataObjectSpec {
id: self.desc.id,
label: None,
attributes: self.attributes,
location: Some(match self.data.into_inner().unwrap() {
OutputState::Empty => DataLocation::Memory(Vec::new()),
OutputState::MemBacked(data) => DataLocation::Memory(data),
OutputState::FileBacked(f) => DataLocation::Path(self.writer_path),
OutputState::Path(p) => DataLocation::Path(p),
// .expect("output file name not valid utf-8")),
}),
cache_hint: false,
}, false)
}

pub fn set_dir_path<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
let path: &Path = path.as_ref();
// TODO: Check for self directory type
if !path.is_dir() {
bail!("Path {:?} given to `set_dir_path` is not a readable directory.", path);
}
let mut guard = self.data.lock().unwrap();
if !matchvar!(*guard, OutputState::Empty) {
bail!("Called `set_dir_path` on {} after being previously set.", self.desc.id)
}
if let Some(s) = path.to_str() {
*guard = OutputState::Path(s.into())
} else {
bail!("Can't convert path {:?} to a valid unicode string.", path);
}
Ok(())
}

pub fn set_file_path<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
let path: &Path = path.as_ref();
// TODO: Check for self non-directory type
if !path.is_file() {
bail!("Path {:?} given to `set_file_path` is not a readable regular file.", path);
}
let mut guard = self.data.lock().unwrap();
if !matchvar!(*guard, OutputState::Empty) {
bail!("Called `set_file_path` on {} after being previously set or written to.", self.desc.id)
}
if let Some(s) = path.to_str() {
*guard = OutputState::Path(s.into())
} else {
bail!("Can't convert path {:?} to a valid unicode string.", path);
}
Ok(())
}

pub fn get_content_type(&self) -> Result<&'a str> {
unimplemented!()
}

pub fn set_content_type(&self, ct: &str) -> Result<()> {
unimplemented!()
}

pub fn get_writer<'b: 'a>(&'b self) -> Result<OutputWriter<'b>> {
// TODO: Check whether it is a non-directory type
let mut guard = self.data.lock().unwrap();
if matchvar!(*guard, OutputState::Empty) {
*guard = OutputState::MemBacked(Vec::new())
}
if matchvar!(*guard, OutputState::MemBacked(_)) ||
matchvar!(*guard, OutputState::FileBacked(_)) {
Ok(OutputWriter::new(guard, self.desc.id))
} else {
bail!("Cannot get writer for Output {:?} with already submitted file or dir path.",
self.desc.id)
}
}
}

#[derive(Debug)]
pub struct OutputWriter<'a> {
guard: MutexGuard<'a, OutputState>,
object_id: DataObjectId,
}

impl<'a> OutputWriter<'a> {
fn new(guard: MutexGuard<'a, OutputState>, object_id: DataObjectId) -> Self {
OutputWriter { guard: guard, object_id: object_id }
}

fn convert_to_file(&mut self) -> std::io::Result<()> {
let fname = format!("output-{}-{}", self.object_id.get_session_id(), self.object_id.get_id());
let mut f = BufWriter::new(OpenOptions::new()
.write(true)
.create_new(true)
.open(&fname)?);
if let OutputState::MemBacked(ref data) = *self.guard {
f.write_all(data)?;
} else {
panic!("bug: invalid state for convert_to_file");
}
let mut os = OutputState::FileBacked(f);
swap(&mut os, &mut *self.guard);
Ok(())
}
}

impl<'a> Write for OutputWriter<'a> {
fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, std::io::Error> {
// Should be Some() only for MemBacked
let mut data_len = None;
if let OutputState::MemBacked(ref data) = *self.guard {
data_len = Some(data.len());
}
if let Some(len) = data_len {
if len + buf.len() > MEM_BACKED_LIMIT {
self.convert_to_file()?;
}
}
match *self.guard {
OutputState::MemBacked(ref mut data) => {
data.write(buf).into()
},
OutputState::FileBacked(ref mut f) => {
f.write(buf).into()
},
_ => {
panic!("bug: invalid OutputState in OutputWriter")
}
}
}

fn flush(&mut self) -> std::io::Result<()> {
if let OutputState::FileBacked(ref mut f) = *self.guard {
f.flush().into()
} else {
Ok(())
}
}
}

pub type TaskFn = Fn(&mut Context, &[DataInstance], &mut [Output]) -> Result<()>;

Expand Down
Loading

0 comments on commit 246e539

Please sign in to comment.