Skip to content

Commit

Permalink
move rayon impl into its own module, fix sync run
Browse files Browse the repository at this point in the history
  • Loading branch information
schell committed Mar 31, 2024
1 parent 6867e46 commit 8cca885
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 57 deletions.
2 changes: 1 addition & 1 deletion crates/moongraph/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moongraph"
version = "0.4.0"
version = "0.4.1"
edition = "2021"
description = "Schedules and runs DAGs accessing shared resources. 🌙"
repository = "https://github.com/schell/moongraph"
Expand Down
63 changes: 7 additions & 56 deletions crates/moongraph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use std::{

use broomdog::{Loan, LoanMut};
use dagga::Dag;
#[cfg(feature = "parallel")]
use rayon::prelude::*;
use snafu::prelude::*;

pub use broomdog::{BroomdogErr, TypeKey, TypeMap};
Expand All @@ -30,6 +28,9 @@ mod tutorial_impl;
#[cfg(feature = "tutorial")]
pub use tutorial_impl::tutorial;

#[cfg(feature = "parallel")]
pub mod rayon_impl;

/// All errors.
#[derive(Debug, Snafu)]
pub enum GraphError {
Expand Down Expand Up @@ -562,19 +563,6 @@ where
}
}

impl<'a, S: Send + Sync + 'static, G: Gen<S>> IntoParallelIterator for &'a View<S, G>
where
&'a S: IntoParallelIterator,
{
type Iter = <&'a S as IntoParallelIterator>::Iter;

type Item = <&'a S as IntoParallelIterator>::Item;

fn into_par_iter(self) -> Self::Iter {
self.deref().into_par_iter()
}
}

/// A mutably borrowed resource that may be created by default.
///
/// Node functions wrap their parameters in [`View`], [`ViewMut`] or [`Move`].
Expand Down Expand Up @@ -665,45 +653,6 @@ where
}
}

impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoParallelIterator for &'a ViewMut<T, G>
where
&'a T: IntoParallelIterator,
{
type Item = <&'a T as IntoParallelIterator>::Item;

type Iter = <&'a T as IntoParallelIterator>::Iter;

fn into_par_iter(self) -> Self::Iter {
self.deref().into_par_iter()
}
}

impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoIterator for &'a mut ViewMut<T, G>
where
&'a mut T: IntoIterator,
{
type Item = <<&'a mut T as IntoIterator>::IntoIter as Iterator>::Item;

type IntoIter = <&'a mut T as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.deref_mut().into_iter()
}
}

impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoParallelIterator for &'a mut ViewMut<T, G>
where
&'a mut T: IntoParallelIterator,
{
type Item = <&'a mut T as IntoParallelIterator>::Item;

type Iter = <&'a mut T as IntoParallelIterator>::Iter;

fn into_par_iter(self) -> Self::Iter {
self.deref_mut().into_par_iter()
}
}

/// Contains the nodes/functions and specifies their execution order.
#[derive(Default)]
pub struct Execution {
Expand Down Expand Up @@ -802,6 +751,8 @@ impl<'a> Batch<'a> {
self,
local: &mut Option<impl FnOnce(Resource) -> Result<Resource, GraphError>>,
) -> Result<BatchResult<'a>, GraphError> {
use rayon::prelude::*;

let Batch { nodes, resources } = self;

let mut local_f = None;
Expand Down Expand Up @@ -848,9 +799,9 @@ impl<'a> Batch<'a> {
let mut local_f = None;
let mut inputs = vec![];
let mut runs = vec![];
for node in nodes.iter() {
for node in nodes.iter_mut() {
let input = (node.inner().prepare)(resources)?;
if let Some(f) = node.inner().run.as_ref() {
if let Some(f) = node.inner_mut().run.as_mut() {
inputs.push(input);
runs.push(f);
} else {
Expand Down
58 changes: 58 additions & 0 deletions crates/moongraph/src/rayon_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Rayon implementations.
use std::ops::{Deref, DerefMut};

use rayon::prelude::*;

use crate::{Gen, View, ViewMut};

impl<'a, S: Send + Sync + 'static, G: Gen<S>> IntoParallelIterator for &'a View<S, G>
where
&'a S: IntoParallelIterator,
{
type Iter = <&'a S as IntoParallelIterator>::Iter;

type Item = <&'a S as IntoParallelIterator>::Item;

fn into_par_iter(self) -> Self::Iter {
self.deref().into_par_iter()
}
}

impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoParallelIterator for &'a ViewMut<T, G>
where
&'a T: IntoParallelIterator,
{
type Item = <&'a T as IntoParallelIterator>::Item;

type Iter = <&'a T as IntoParallelIterator>::Iter;

fn into_par_iter(self) -> Self::Iter {
self.deref().into_par_iter()
}
}

impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoIterator for &'a mut ViewMut<T, G>
where
&'a mut T: IntoIterator,
{
type Item = <<&'a mut T as IntoIterator>::IntoIter as Iterator>::Item;

type IntoIter = <&'a mut T as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.deref_mut().into_iter()
}
}

impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoParallelIterator for &'a mut ViewMut<T, G>
where
&'a mut T: IntoParallelIterator,
{
type Item = <&'a mut T as IntoParallelIterator>::Item;

type Iter = <&'a mut T as IntoParallelIterator>::Iter;

fn into_par_iter(self) -> Self::Iter {
self.deref_mut().into_par_iter()
}
}

0 comments on commit 8cca885

Please sign in to comment.