From 246e5392684c241d14623c5193ef37687e3cbb8b Mon Sep 17 00:00:00 2001 From: Tomas Gavenciak Date: Fri, 27 Apr 2018 17:03:39 +0200 Subject: [PATCH] [subworker] WIP on subworker rust API --- rain-task/src/errors.rs | 16 +++ rain-task/src/framing.rs | 44 ++++++++ rain-task/src/lib.rs | 223 ++++--------------------------------- rain-task/src/output.rs | 214 +++++++++++++++++++++++++++++++++++ rain-task/src/subworker.rs | 105 +++++++++++++++++ 5 files changed, 398 insertions(+), 204 deletions(-) create mode 100644 rain-task/src/errors.rs create mode 100644 rain-task/src/framing.rs create mode 100644 rain-task/src/output.rs create mode 100644 rain-task/src/subworker.rs diff --git a/rain-task/src/errors.rs b/rain-task/src/errors.rs new file mode 100644 index 0000000..81d4889 --- /dev/null +++ b/rain-task/src/errors.rs @@ -0,0 +1,16 @@ +use serde_cbor; + +// Create the Error, ErrorKind, ResultExt, and Result types +error_chain!{ + types { + Error, ErrorKind, ResultExt; + } + foreign_links { + Io(::std::io::Error); + CBOR(serde_cbor::error::Error); + Utf8Err(::std::str::Utf8Error); + } +} + +// Explicit alias just to make some IDEs happier +pub type Result = ::std::result::Result; diff --git a/rain-task/src/framing.rs b/rain-task/src/framing.rs new file mode 100644 index 0000000..0577fa2 --- /dev/null +++ b/rain-task/src/framing.rs @@ -0,0 +1,44 @@ +use std::os::unix::net::UnixStream; +use byteorder::{ReadBytesExt, WriteBytesExt, LittleEndian}; +use super::{Result, WorkerToSubworkerMessage, SubworkerToWorkerMessage, MAX_MSG_SIZE, MSG_PROTOCOL}; +use std::io::{Read, Write}; +use serde_cbor; + +pub(crate) trait SocketExt { + fn write_frame(&mut self, &[u8]) -> Result<()>; + fn read_frame(&mut self) -> Result>; + fn write_msg(&mut self, &SubworkerToWorkerMessage) -> Result<()>; + fn read_msg(&mut self) -> Result; +} + +impl SocketExt for UnixStream { + fn write_msg(&mut self, m: &SubworkerToWorkerMessage) -> Result<()> { + let data = serde_cbor::to_vec(m)?; + self.write_frame(&data) + } + + fn read_msg(&mut self) -> Result { + let data = self.read_frame()?; + let msg = serde_cbor::from_slice::(&data)?; + Ok(msg) + } + + fn write_frame(&mut self, data: &[u8]) -> Result<()> { + if data.len() > MAX_MSG_SIZE { + bail!("write_frame: message too long ({} bytes of {} allowed)", data.len(), MAX_MSG_SIZE); + } + self.write_u32::(data.len() as u32)?; + self.write_all(data)?; + Ok(()) + } + + fn read_frame(&mut self) -> Result> { + let len = self.read_u32::()? as usize; + if len > MAX_MSG_SIZE { + bail!("read_frame: message too long ({} bytes of {} allowed)", len, MAX_MSG_SIZE); + } + let mut data = vec![0; len]; + self.read_exact(&mut data)?; + Ok(data) + } +} diff --git a/rain-task/src/lib.rs b/rain-task/src/lib.rs index 2524e8e..1f7bbb2 100644 --- a/rain-task/src/lib.rs +++ b/rain-task/src/lib.rs @@ -26,8 +26,18 @@ use librain::common::Attributes; use librain::worker::rpc::subworker_serde::*; use librain::common::id::SId; +pub const MAX_MSG_SIZE: usize = 128 * 1024 * 1024; +pub const MSG_PROTOCOL: &str = "v1cbor"; +pub const MEM_BACKED_LIMIT: usize = 128 * 1024; + +macro_rules! matchvar { + ($ex: expr, $pat: pat) => { + { if let $pat = $ex { true } else { false } } + }; +} + mod framing; -pub use framing::*; +use framing::*; mod errors; pub use errors::*; @@ -35,13 +45,20 @@ pub use errors::*; mod subworker; pub use subworker::*; -const MEM_BACKED_LIMIT: usize = 128 * 1024; +mod output; +pub use output::*; + #[derive(Debug, Default)] pub struct Context { num_inputs: usize, num_outputs: usize, + /// Task attributes attributes: Attributes, + /// Absolute path to task working dir + work_dir: PathBuf, + /// Absolute path to staging dir with input and output objects + stage_dir: PathBuf, } #[derive(Debug)] @@ -65,208 +82,6 @@ impl<'a> DataInstance<'a> { } } -#[derive(Debug)] -enum OutputState { - Empty, - MemBacked(Vec), - FileBacked(BufWriter), - Path(String), -} - -/// Represents one concrete output. The output can be either empty (as is initially), -/// set to represent an existing file, set to represent an existing directory, or written -/// to as a `Write`. These three are mutually exclusive, `set_dir_path` and `set_file_path` -/// may be used only once, and not before or after `get_writer`. -/// -/// This object is thread-safe and the internal state is guarded by a mutex. Calling -/// `get_writer` locks this mutex and holds it until the returned guard is dropped. -/// This means fast (lockless) writes to the `Write` but you need to make sure your -/// other threads do not starve or deadlock. -#[derive(Debug)] -pub struct Output<'a> { - /// The original output description - desc: &'a DataObjectSpec, - /// Mutex holding the output state - data: Mutex, - /// The resulting attributes. Initially empty. - attributes: Attributes, - /// Path for the resulting file for Writer (if a file will be used) - writer_path: String, - /// Order of the output in outputs - order: usize, -} - -//use std::mem::discriminant; -macro_rules! matchvar { - ($ex: expr, $pat: pat) => { - { if let $pat = $ex { true } else { false } } - }; -} - -use std::fmt; - -impl<'a> fmt::Display for Output<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - if let Some(ref label) = self.desc.label { - write!(f, "Output #{} (ID {}, label {:?})", self.order, self.desc.id, label) - } else { - write!(f, "Output #{} (ID {}, no label)", self.order, self.desc.id) - } - } -} - -use std::ffi::OsString; -impl<'a> Output<'a> { - /// Create an output from DataObjectSpec. Internal. - fn new(spec: &'a DataObjectSpec) -> Self { - Output { - desc: spec, - data: Mutex::new(OutputState::Empty), - attributes: Attributes::new(), - writer_path: format!("output-{}-{}", spec.id.get_session_id(), spec.id.get_id()), - } - } - - /// Consume self, yielding a `DataObjectSpec` for `ResultMsg` and - /// a flag whether the output object was cached (only possible if requested). - /// Currently, the subworker never caches. - fn create_output_spec(self) -> (DataObjectSpec, bool) { - (DataObjectSpec { - id: self.desc.id, - label: None, - attributes: self.attributes, - location: Some(match self.data.into_inner().unwrap() { - OutputState::Empty => DataLocation::Memory(Vec::new()), - OutputState::MemBacked(data) => DataLocation::Memory(data), - OutputState::FileBacked(f) => DataLocation::Path(self.writer_path), - OutputState::Path(p) => DataLocation::Path(p), -// .expect("output file name not valid utf-8")), - }), - cache_hint: false, - }, false) - } - - pub fn set_dir_path>(&mut self, path: P) -> Result<()> { - let path: &Path = path.as_ref(); - // TODO: Check for self directory type - if !path.is_dir() { - bail!("Path {:?} given to `set_dir_path` is not a readable directory.", path); - } - let mut guard = self.data.lock().unwrap(); - if !matchvar!(*guard, OutputState::Empty) { - bail!("Called `set_dir_path` on {} after being previously set.", self.desc.id) - } - if let Some(s) = path.to_str() { - *guard = OutputState::Path(s.into()) - } else { - bail!("Can't convert path {:?} to a valid unicode string.", path); - } - Ok(()) - } - - pub fn set_file_path>(&mut self, path: P) -> Result<()> { - let path: &Path = path.as_ref(); - // TODO: Check for self non-directory type - if !path.is_file() { - bail!("Path {:?} given to `set_file_path` is not a readable regular file.", path); - } - let mut guard = self.data.lock().unwrap(); - if !matchvar!(*guard, OutputState::Empty) { - bail!("Called `set_file_path` on {} after being previously set or written to.", self.desc.id) - } - if let Some(s) = path.to_str() { - *guard = OutputState::Path(s.into()) - } else { - bail!("Can't convert path {:?} to a valid unicode string.", path); - } - Ok(()) - } - - pub fn get_content_type(&self) -> Result<&'a str> { - unimplemented!() - } - - pub fn set_content_type(&self, ct: &str) -> Result<()> { - unimplemented!() - } - - pub fn get_writer<'b: 'a>(&'b self) -> Result> { - // TODO: Check whether it is a non-directory type - let mut guard = self.data.lock().unwrap(); - if matchvar!(*guard, OutputState::Empty) { - *guard = OutputState::MemBacked(Vec::new()) - } - if matchvar!(*guard, OutputState::MemBacked(_)) || - matchvar!(*guard, OutputState::FileBacked(_)) { - Ok(OutputWriter::new(guard, self.desc.id)) - } else { - bail!("Cannot get writer for Output {:?} with already submitted file or dir path.", - self.desc.id) - } - } -} - -#[derive(Debug)] -pub struct OutputWriter<'a> { - guard: MutexGuard<'a, OutputState>, - object_id: DataObjectId, -} - -impl<'a> OutputWriter<'a> { - fn new(guard: MutexGuard<'a, OutputState>, object_id: DataObjectId) -> Self { - OutputWriter { guard: guard, object_id: object_id } - } - - fn convert_to_file(&mut self) -> std::io::Result<()> { - let fname = format!("output-{}-{}", self.object_id.get_session_id(), self.object_id.get_id()); - let mut f = BufWriter::new(OpenOptions::new() - .write(true) - .create_new(true) - .open(&fname)?); - if let OutputState::MemBacked(ref data) = *self.guard { - f.write_all(data)?; - } else { - panic!("bug: invalid state for convert_to_file"); - } - let mut os = OutputState::FileBacked(f); - swap(&mut os, &mut *self.guard); - Ok(()) - } -} - -impl<'a> Write for OutputWriter<'a> { - fn write(&mut self, buf: &[u8]) -> std::result::Result { - // Should be Some() only for MemBacked - let mut data_len = None; - if let OutputState::MemBacked(ref data) = *self.guard { - data_len = Some(data.len()); - } - if let Some(len) = data_len { - if len + buf.len() > MEM_BACKED_LIMIT { - self.convert_to_file()?; - } - } - match *self.guard { - OutputState::MemBacked(ref mut data) => { - data.write(buf).into() - }, - OutputState::FileBacked(ref mut f) => { - f.write(buf).into() - }, - _ => { - panic!("bug: invalid OutputState in OutputWriter") - } - } - } - - fn flush(&mut self) -> std::io::Result<()> { - if let OutputState::FileBacked(ref mut f) = *self.guard { - f.flush().into() - } else { - Ok(()) - } - } -} pub type TaskFn = Fn(&mut Context, &[DataInstance], &mut [Output]) -> Result<()>; diff --git a/rain-task/src/output.rs b/rain-task/src/output.rs new file mode 100644 index 0000000..577dc2b --- /dev/null +++ b/rain-task/src/output.rs @@ -0,0 +1,214 @@ +use std::fmt; +use std::ffi::OsString; +use std::sync::{Mutex, MutexGuard}; +use std::mem::swap; +use std::fs::{OpenOptions, File}; +use std::io::BufWriter; +use std::path::Path; +use std::io::Write; + +use librain::common::id::{TaskId, DataObjectId, SubworkerId}; +use librain::common::Attributes; +use librain::worker::rpc::subworker_serde::*; +use librain::common::id::SId; + +use super::{Error, Result, MAX_MSG_SIZE}; + +#[derive(Debug)] +enum OutputState { + Empty, + MemBacked(Vec), + FileBacked(BufWriter), + Path(String), +} + +/// Represents one concrete output. The output can be either empty (as is initially), +/// set to represent an existing file, set to represent an existing directory, or written +/// to as a `Write`. These three are mutually exclusive, `set_dir_path` and `set_file_path` +/// may be used only once, and not before or after `get_writer`. +/// +/// This object is thread-safe and the internal state is guarded by a mutex. Calling +/// `get_writer` locks this mutex and holds it until the returned guard is dropped. +/// This means fast (lockless) writes to the `Write` but you need to make sure your +/// other threads do not starve or deadlock. +#[derive(Debug)] +pub struct Output<'a> { + /// The original output description + desc: &'a DataObjectSpec, + /// Mutex holding the output state + data: Mutex, + /// The resulting attributes. Initially empty. + attributes: Attributes, + /// Path for the resulting file or directory (unless `MemoryBacked`) + path: PathBuf, + /// Order of the output in outputs + order: usize, +} + + +impl<'a> fmt::Display for Output<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if let Some(ref label) = self.desc.label { + write!(f, "Output #{} (ID {}, label {:?})", self.order, self.desc.id, label) + } else { + write!(f, "Output #{} (ID {}, no label)", self.order, self.desc.id) + } + } +} + +impl<'a> Output<'a> { + /// Create an output from DataObjectSpec. Internal. + fn new(spec: &'a DataObjectSpec) -> Self { + Output { + desc: spec, + data: Mutex::new(OutputState::Empty), + attributes: Attributes::new(), + writer_path: format!("output-{}-{}", spec.id.get_session_id(), spec.id.get_id()), + } + } + + /// Consume self, yielding a `DataObjectSpec` for `ResultMsg` and + /// a flag whether the output object was cached (only possible if requested). + /// Currently, the subworker never caches. + fn create_output_spec(self) -> (DataObjectSpec, bool) { + (DataObjectSpec { + id: self.desc.id, + label: None, + attributes: self.attributes, + location: Some(match self.data.into_inner().unwrap() { + OutputState::Empty => DataLocation::Memory(Vec::new()), + OutputState::MemBacked(data) => DataLocation::Memory(data), + OutputState::FileBacked(f) => DataLocation::Path(self.writer_path), + OutputState::Path(p) => DataLocation::Path(p), + }), + cache_hint: false, + }, false) + } + + pub fn set_dir_path>(&mut self, path: P) -> Result<()> { + let path: &Path = path.as_ref(); + // TODO: Check for self directory type + if !path.is_dir() { + bail!("Path {:?} given to `set_dir_path` is not a readable directory.", path); + } + let mut guard = self.data.lock().unwrap(); + if !matchvar!(*guard, OutputState::Empty) { + bail!("Called `set_dir_path` on {} after being previously set.", self.desc.id) + } + if let Some(s) = path.to_str() { + *guard = OutputState::Path(s.into()) + } else { + bail!("Can't convert path {:?} to a valid unicode string.", path); + } + Ok(()) + } + + pub fn set_file_path>(&mut self, path: P) -> Result<()> { + let path: &Path = path.as_ref(); + // TODO: Check for self non-directory type + if !path.is_file() { + bail!("Path {:?} given to `set_file_path` is not a readable regular file.", path); + } + let mut guard = self.data.lock().unwrap(); + if !matchvar!(*guard, OutputState::Empty) { + bail!("Called `set_file_path` on {} after being previously set or written to.", self.desc.id) + } + if let Some(s) = path.to_str() { + *guard = OutputState::Path(s.into()) + } else { + bail!("Can't convert path {:?} to a valid unicode string.", path); + } + Ok(()) + } + + pub fn get_content_type(&self) -> Result<&'a str> { + unimplemented!() + } + + pub fn set_content_type(&self, ct: &str) -> Result<()> { + unimplemented!() + } + + pub fn get_writer<'b: 'a>(&'b self) -> Result> { + // TODO: Check whether it is a non-directory type + let mut guard = self.data.lock().unwrap(); + if matchvar!(*guard, OutputState::Empty) { + *guard = OutputState::MemBacked(Vec::new()) + } + if matchvar!(*guard, OutputState::MemBacked(_)) || + matchvar!(*guard, OutputState::FileBacked(_)) { + Ok(OutputWriter::new(guard, self.desc.id)) + } else { + bail!("Cannot get writer for Output {:?} with already submitted file or dir path.", + self.desc.id) + } + } +} + +#[derive(Debug)] +pub struct OutputWriter<'a> { + guard: MutexGuard<'a, OutputState>, + path: &'a str, +} + +impl<'a> OutputWriter<'a> { + fn new(guard: MutexGuard<'a, OutputState>, path: &'a str) -> Self { + OutputWriter { guard: guard, path: path } + } + + fn convert_to_file(&mut self) -> ::std::io::Result<()> { + let mut f = BufWriter::new(OpenOptions::new() + .write(true) + .create_new(true) + .open(path)?); + if let OutputState::MemBacked(ref data) = *self.guard { + f.write_all(data)?; + } else { + panic!("bug: invalid state for convert_to_file"); + } + let mut os = OutputState::FileBacked(f); + swap(&mut os, &mut *self.guard); + Ok(()) + } + + pub fn ensure_file_based(&mut self) -> Result<()> { + if matchvar!(*self.guard, OutputState::MemBacked(_)) { + self.convert_to_file()?; + } + Ok(()) + } +} + +impl<'a> Write for OutputWriter<'a> { + fn write(&mut self, buf: &[u8]) -> ::std::io::Result { + // Should be Some() only for MemBacked + let mut data_len = None; + if let OutputState::MemBacked(ref data) = *self.guard { + data_len = Some(data.len()); + } + if let Some(len) = data_len { + if len + buf.len() > MEM_BACKED_LIMIT { + self.convert_to_file()?; + } + } + match *self.guard { + OutputState::MemBacked(ref mut data) => { + data.write(buf).into() + }, + OutputState::FileBacked(ref mut f) => { + f.write(buf).into() + }, + _ => { + panic!("bug: invalid OutputState in OutputWriter") + } + } + } + + fn flush(&mut self) -> std::io::Result<()> { + if let OutputState::FileBacked(ref mut f) = *self.guard { + f.flush().into() + } else { + Ok(()) + } + } +} diff --git a/rain-task/src/subworker.rs b/rain-task/src/subworker.rs new file mode 100644 index 0000000..e4270d4 --- /dev/null +++ b/rain-task/src/subworker.rs @@ -0,0 +1,105 @@ +use librain::common::id::{TaskId, DataObjectId, SubworkerId}; +use librain::worker::rpc::subworker_serde::*; + +use std::env; +use std::collections::HashMap; +use std::os::unix::net::UnixStream; +use std::path::PathBuf; +use super::*; + +pub struct Subworker { + subworker_id: SubworkerId, + subworker_type: String, + socket_path: PathBuf, + tasks: HashMap>, +} + +impl Subworker { + pub fn new(subworker_type: &str) -> Self { + Subworker::with_params( + subworker_type, + env::var("RAIN_SUBWORKER_ID") + .expect("Env variable RAIN_SUBWORKER_ID required") + .parse() + .expect("Error parsing RAIN_SUBWORKER_ID"), + env::var_os("RAIN_SUBWORKER_SOCKET") + .expect("Env variable RAIN_SUBWORKER_SOCKET required") + .into()) + } + + pub fn with_params(subworker_type: &str, subworker_id: SubworkerId, socket_path: PathBuf) -> Self { + Subworker { + subworker_type: subworker_type.into(), + subworker_id: subworker_id, + socket_path: socket_path, + tasks: HashMap::new() + } + } + + pub fn add_task(&mut self, task_name: S, task_fun: F) + where S: Into, F: 'static + Fn(&mut Context, &[DataInstance], &mut [Output]) -> Result<()> { + let key: String = task_name.into(); + if self.tasks.contains_key(&key) { + panic!("can't add task {:?}: already present", &key); + } + self.tasks.insert(key, Box::new(task_fun)); + } + + pub fn run(&mut self) -> Result<()> { + let res = self.run_wrap(); + // TODO: catch connection closed gracefully + /* + Err(Error(ErrorKind::Io(ref e), _)) if e.kind() == io::ErrorKind::ConnectionAborted => { + info!("Connection closed, shutting down"); + return Ok(()); + } + */ + res + } + + fn run_wrap(&mut self) -> Result<()> { + info!("Connecting to worker at socket {:?} with ID {}", self.socket_path, self.subworker_id); + let mut sock = UnixStream::connect(&self.socket_path)?; + self.register(&mut sock)?; + loop { + match sock.read_msg()? { + WorkerToSubworkerMessage::Call(call_msg) => { + let reply = self.handle_call(call_msg)?; + sock.write_msg(&SubworkerToWorkerMessage::Result(reply))?; + }, + WorkerToSubworkerMessage::DropCached(drop_msg) => { + if !drop_msg.drop.is_empty() { + bail!("received nonempty dropCached request with no cached objects"); + } + }, + } + } + } + + fn register(&mut self, sock: &mut UnixStream) -> Result<()> { + let msg = SubworkerToWorkerMessage::Register(RegisterMsg { + protocol: MSG_PROTOCOL.into(), + subworker_id: self.subworker_id, + subworker_type: self.subworker_type.clone(), + }); + sock.write_msg(&msg) + } + + fn handle_call(&mut self, call_msg: CallMsg) -> Result { + Ok(unimplemented!()) // TODO: Implement + } + + #[allow(dead_code)] + pub(crate) fn run_task_test>(&mut self, task_name: S) -> Result<()> { + let key: String = task_name.into(); + match self.tasks.get(&key) { + Some(f) => { + let ins = vec![]; + let mut outs = vec![]; + let mut ctx: Context = Default::default(); + f(&mut ctx, &ins, &mut outs) + }, + None => bail!("Task {} not found", key) + } + } +}