Skip to content

Commit

Permalink
[subworker] Make the rust SW API wotk with paths
Browse files Browse the repository at this point in the history
  • Loading branch information
gavento committed Apr 27, 2018
1 parent 246e539 commit efd5adf
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 56 deletions.
32 changes: 10 additions & 22 deletions rain-task/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 38 additions & 34 deletions rain-task/src/output.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
use std::fmt;
use std::{fmt, fs, mem};
use std::ffi::OsString;
use std::sync::{Mutex, MutexGuard};
use std::mem::swap;
use std::fs::{OpenOptions, File};
use std::io::BufWriter;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::io::Write;

use librain::common::id::{TaskId, DataObjectId, SubworkerId};
use librain::common::Attributes;
use librain::worker::rpc::subworker_serde::*;
use librain::common::id::SId;

use super::{Error, Result, MAX_MSG_SIZE};
use super::{Error, Result, MEM_BACKED_LIMIT};

#[derive(Debug)]
enum OutputState {
Empty,
MemBacked(Vec<u8>),
FileBacked(BufWriter<File>),
Path(String),
StagedPath,
}

/// Represents one concrete output. The output can be either empty (as is initially),
Expand All @@ -39,7 +38,7 @@ pub struct Output<'a> {
data: Mutex<OutputState>,
/// The resulting attributes. Initially empty.
attributes: Attributes,
/// Path for the resulting file or directory (unless `MemoryBacked`)
/// Path for the resulting file or directory if written to fs
path: PathBuf,
/// Order of the output in outputs
order: usize,
Expand All @@ -58,66 +57,71 @@ impl<'a> fmt::Display for Output<'a> {

impl<'a> Output<'a> {
/// Create an output from DataObjectSpec. Internal.
fn new(spec: &'a DataObjectSpec) -> Self {
pub(crate) fn new(spec: &'a DataObjectSpec, stage_path: &Path, order: usize) -> Self {
Output {
desc: spec,
data: Mutex::new(OutputState::Empty),
attributes: Attributes::new(),
writer_path: format!("output-{}-{}", spec.id.get_session_id(), spec.id.get_id()),
path: stage_path.join(format!("output-{}-{}", spec.id.get_session_id(), spec.id.get_id())),
order: order,
}
}

/// Consume self, yielding a `DataObjectSpec` for `ResultMsg` and
/// a flag whether the output object was cached (only possible if requested).
/// Currently, the subworker never caches.
fn create_output_spec(self) -> (DataObjectSpec, bool) {
/// Currently, this subworker never caches.
///
/// NOTE: The returned path may be still an open file until this Output is dropped.
pub(crate) fn create_output_spec(self) -> (DataObjectSpec, bool) {
(DataObjectSpec {
id: self.desc.id,
label: None,
attributes: self.attributes,
location: Some(match self.data.into_inner().unwrap() {
OutputState::Empty => DataLocation::Memory(Vec::new()),
OutputState::MemBacked(data) => DataLocation::Memory(data),
OutputState::FileBacked(f) => DataLocation::Path(self.writer_path),
OutputState::Path(p) => DataLocation::Path(p),
OutputState::FileBacked(f) => { drop(f); DataLocation::Path(self.path) },
OutputState::StagedPath => DataLocation::Path(self.path),
}),
cache_hint: false,
}, false)
}

pub fn set_dir_path<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
/// Submit the given directory as the output contents.
/// Moves the directory to the staging area.
/// You should make sure no files in the directory are open after this operation.
/// Not allowed if the output was submitted to.
pub fn stage_directory<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
let path: &Path = path.as_ref();
// TODO: Check for self directory type
if !path.is_dir() {
bail!("Path {:?} given to `set_dir_path` is not a readable directory.", path);
bail!("Path {:?} given to `stage_directory` is not a readable directory.", path);
}
let mut guard = self.data.lock().unwrap();
if !matchvar!(*guard, OutputState::Empty) {
bail!("Called `set_dir_path` on {} after being previously set.", self.desc.id)
}
if let Some(s) = path.to_str() {
*guard = OutputState::Path(s.into())
} else {
bail!("Can't convert path {:?} to a valid unicode string.", path);
bail!("Called `stage_directory` on {} after being previously staged.", self)
}
fs::rename(path, &self.path)?;
*guard = OutputState::StagedPath;
Ok(())
}

pub fn set_file_path<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
/// Submit the given file as the output contents.
/// Moves the directory to the staging area.
/// You should make sure no files in the directory are open after this operation.
/// Not allowed if the output was submitted or written to.
pub fn stage_file<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
let path: &Path = path.as_ref();
// TODO: Check for self non-directory type
if !path.is_file() {
bail!("Path {:?} given to `set_file_path` is not a readable regular file.", path);
bail!("Path {:?} given to `stage_file` is not a readable regular file.", path);
}
let mut guard = self.data.lock().unwrap();
if !matchvar!(*guard, OutputState::Empty) {
bail!("Called `set_file_path` on {} after being previously set or written to.", self.desc.id)
}
if let Some(s) = path.to_str() {
*guard = OutputState::Path(s.into())
} else {
bail!("Can't convert path {:?} to a valid unicode string.", path);
bail!("Called `stage_file` on {} after being previously staged or written to.", self)
}
fs::rename(path, &self.path)?;
*guard = OutputState::StagedPath;
Ok(())
}

Expand All @@ -137,7 +141,7 @@ impl<'a> Output<'a> {
}
if matchvar!(*guard, OutputState::MemBacked(_)) ||
matchvar!(*guard, OutputState::FileBacked(_)) {
Ok(OutputWriter::new(guard, self.desc.id))
Ok(OutputWriter::new(guard, &self.path))
} else {
bail!("Cannot get writer for Output {:?} with already submitted file or dir path.",
self.desc.id)
Expand All @@ -148,26 +152,26 @@ impl<'a> Output<'a> {
#[derive(Debug)]
pub struct OutputWriter<'a> {
guard: MutexGuard<'a, OutputState>,
path: &'a str,
path: &'a Path,
}

impl<'a> OutputWriter<'a> {
fn new(guard: MutexGuard<'a, OutputState>, path: &'a str) -> Self {
fn new(guard: MutexGuard<'a, OutputState>, path: &'a Path) -> Self {
OutputWriter { guard: guard, path: path }
}

fn convert_to_file(&mut self) -> ::std::io::Result<()> {
let mut f = BufWriter::new(OpenOptions::new()
.write(true)
.create_new(true)
.open(path)?);
.open(self.path)?);
if let OutputState::MemBacked(ref data) = *self.guard {
f.write_all(data)?;
} else {
panic!("bug: invalid state for convert_to_file");
}
let mut os = OutputState::FileBacked(f);
swap(&mut os, &mut *self.guard);
mem::swap(&mut os, &mut *self.guard);
Ok(())
}

Expand Down Expand Up @@ -204,7 +208,7 @@ impl<'a> Write for OutputWriter<'a> {
}
}

fn flush(&mut self) -> std::io::Result<()> {
fn flush(&mut self) -> ::std::io::Result<()> {
if let OutputState::FileBacked(ref mut f) = *self.guard {
f.flush().into()
} else {
Expand Down

0 comments on commit efd5adf

Please sign in to comment.