Skip to content

Commit

Permalink
Merge pull request #37 from QIUZHILEI/dev
Browse files Browse the repository at this point in the history
Improve documentation, code and comments
  • Loading branch information
genedna authored Nov 17, 2023
2 parents 2803988 + a1598d2 commit 6a3b72f
Show file tree
Hide file tree
Showing 48 changed files with 1,351 additions and 1,432 deletions.
3 changes: 0 additions & 3 deletions .vscode/settings.json

This file was deleted.

31 changes: 17 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,33 @@ readme = "README.md"
repository = "https://github.com/open-rust-initiative/dagrs"
keywords = ["DAG", "task", "async", "parallel", "concurrent"]

[workspace]
members = ["derive","."]

[dependencies]
yaml-rust = "0.4.5"
bimap = "0.6.1"
clap = { version = "4.2.2", features = ["derive"] }
anymap2 = "0.13.0"
tokio = { version = "1.28", features = ["rt", "sync","rt-multi-thread"] }
derive ={ path = "derive", version = "0.3.0"}

[dev-dependencies]
log = "0.4"
simplelog = "0.12"

[workspace]
members = ["dagrs_derive","dagrs_core"]


[dependencies]
dagrs_core = {path = "dagrs_core" , version = "0.3.0"}
dagrs_derive ={ path = "dagrs_derive", version = "0.3.0"}

[features]
default = ["dagrs_core/logger"]
yaml = ["dagrs_core/yaml"]
derive = ["dagrs_derive/derive"]
default = ["logger"]
logger = []
yaml = []
derive = ["derive/derive"]

[[example]]
name = "custom_log"
[[bin]]
name = "dagrs"
required-features = ["yaml"]

[[example]]
name = "custom_parser"
name = "custom_log"
required-features = ["yaml"]

[[example]]
Expand Down
198 changes: 138 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,97 +48,166 @@ Among them, each task may produce output, and may also require the output of som

### Programmatically implement task definition

Users need to program to implement the `Action` trait to define the specific logic of the task, and then build a series of `DefaultTask`. The example: `examples/compute_dag.rs`. `DefaultTask` is the default implementation of the Task trait, and it has several mandatory attributes:
Users need to program to implement the `Action` trait to define the specific logic of the task, and then build a series of `DefaultTask`.

- `id`: uniquely identifies the task assigned by the global ID assigner
- `name`: the name of the task
- `predecessor_tasks`: the predecessor tasks of this task
- `action`: is a dynamic type that implements the Action trait in user programming, and it is the specific logic to be executed by the task
First, users need to define some specific task logic. There are two ways to define task logic:

- Create a closure whose type is `Simple`, which is suitable for simple scenarios.
- Create a type and implement the `Complex` trait, which is suitable for more complex situations. For example, if the logic of the task is to execute a system command, the command string needs to be recorded in some way. You can create a `Commad` structure with a string attribute inside to store the command string.

You can refer to examples:`examples/actions.rs`.

In the second step, you need to use the defined task logic to create specific tasks. Here you may need to use the `DefaultTask` type, which provides users with several ways to create `Task`. `DefaultTask` allows you to specify specific task logic for the task and give the task a name. Please refer to the documentation for specific function functions.

In the third step, you need to specify dependencies for the defined series of tasks. Here you need to use the `set_predecessors` function of `DefaultTask`. This function requires you to specify a series of predecessor tasks for the current task.

The fourth step is to create a `Dag` and put all the defined tasks into the `Dag` scheduler.

Optional step: You can specify an environment variable for `Dag`. This environment variable is available in all tasks. In some specific tasks, this behavior can be useful.

Finally, don’t forget to initialize the logger, and then you can call the `start` function of `Dag` to start executing all tasks.

You can refer to an example for the above complete steps: `examples/compute_dag.rs`


Here is the `examples/impl_action.rs` example:

```rust
//! Implement the Action trait to define the task logic.
//! Only use Dag, execute a job. The graph is as follows:
//!
//! ↱----------↴
//! B -→ E --→ G
//! ↗ ↗ ↗
//! A --→ C /
//! ↘ ↘ /
//! D -→ F
//!
//! The final execution result is 272.

extern crate dagrs;

use std::sync::Arc;
use dagrs::{log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError};
use dagrs::{log, Complex, Dag, DefaultTask, EnvVar, Input, LogLevel, Output};

struct SimpleAction(usize);
struct Compute(usize);

/// Implement the `Action` trait for `SimpleAction`, defining the logic of the `run` function.
/// The logic here is simply to get the output value (usize) of all predecessor tasks and then accumulate.
impl Action for SimpleAction {
fn run(&self, input: Input, env: Arc<EnvVar>) -> Result<Output, RunningError> {
impl Complex for Compute {
fn run(&self, input: Input, env: Arc<EnvVar>) -> Output {
let base = env.get::<usize>("base").unwrap();
let mut sum = self.0;
input
.get_iter()
.for_each(|i| sum += i.get::<usize>().unwrap() * base);
Ok(Output::new(sum))
Output::new(sum)
}
}

fn main() {
// Initialize the global logger
// initialization log.
let _initialized = log::init_logger(LogLevel::Info, None);
// Generate four tasks.
let a = DefaultTask::new(SimpleAction(10), "Task a");
let mut b = DefaultTask::new(SimpleAction(20), "Task b");
let mut c = DefaultTask::new(SimpleAction(30), "Task c");
let mut d = DefaultTask::new(SimpleAction(40), "Task d");
// Set the precursor for each task.
// generate some tasks.
let a = DefaultTask::with_action("Compute A", Compute(1));

let mut b = DefaultTask::with_action("Compute B", Compute(2));

let mut c = DefaultTask::new("Compute C");
c.set_action(Compute(4));

let mut d = DefaultTask::new("Compute D");
d.set_action(Compute(8));

let mut e = DefaultTask::with_closure("Compute E", |input, env| {
let base = env.get::<usize>("base").unwrap();
let mut sum = 16;
input
.get_iter()
.for_each(|i| sum += i.get::<usize>().unwrap() * base);
Output::new(sum)
});
let mut f = DefaultTask::with_closure("Compute F", |input, env| {
let base = env.get::<usize>("base").unwrap();
let mut sum = 32;
input
.get_iter()
.for_each(|i| sum += i.get::<usize>().unwrap() * base);
Output::new(sum)
});

let mut g = DefaultTask::new("Compute G");
g.set_closure(|input, env| {
let base = env.get::<usize>("base").unwrap();
let mut sum = 64;
input
.get_iter()
.for_each(|i| sum += i.get::<usize>().unwrap() * base);
Output::new(sum)
});

// Set up task dependencies.
b.set_predecessors(&[&a]);
c.set_predecessors(&[&a]);
d.set_predecessors(&[&b, &c]);
// Take these four tasks as a Dag.
let mut dag = Dag::with_tasks(vec![a, b, c, d]);
d.set_predecessors(&[&a]);
e.set_predecessors(&[&b, &c]);
f.set_predecessors(&[&c, &d]);
g.set_predecessors(&[&b, &e, &f]);
// Create a new Dag.
let mut dag = Dag::with_tasks(vec![a, b, c, d, e, f, g]);
// Set a global environment variable for this dag.
let mut env = EnvVar::new();
env.set("base", 2usize);
dag.set_env(env);
// Begin execution.
// Start executing this dag
assert!(dag.start().unwrap());
// Get execution result
assert_eq!(dag.get_result::<usize>().unwrap(), 220);
// Get execution result.
let res = dag.get_result::<usize>().unwrap();
println!("The result is {}.", res);
}
```

**explain:**

First, we define `SimpleAction` and implement the `Action` trait for this structure. In the rewritten run function, we simply get the output value of the predecessor task and multiply it by the environment variable `base`. Then accumulate the multiplied result to itself self.0.

After defining the specific task logic, start creating the prerequisites for executing `Dag`:
Initialize the global logger first. Here we set the log level to Info, and do not give the log output file, let the log output to the console by default.
First, we initialize the logger, declare the `Compute` type, and implement the `Complex` trait for it. In the rewritten run function, we simply get the output value of the predecessor task and multiply it by the environment variable `base`. Then accumulate the multiplied result to itself self.0.

Create a `DefaultTask` with `SimpleAction` and give the task a name. Then set the dependencies between tasks.
Next, we define 6 tasks and show the usage of some functions in the `DefaultTask` type. Set predecessor tasks for each task.

Then create a Dag and assign it a global environment variable.
Then, create a `Dag`, set a base environment variable for it, and use the start method to start executing all tasks.

Finally we call the `start` function of `Dag` to execute all tasks. After the task is executed, call the `get_result` function to obtain the final execution result of the task.

The graph formed by the task is shown below:

```mermaid
flowchart LR;
A((Task a))-->B; A-->C; B((Task b))-->D; C((Task c))-->D((Task d));
flowchart LR
A-->B
A-->C
B-->D
B-->F
C-->D
C-->E
D-->F
E-->F
```

The execution order is a->c->b->d.

```bash
$cargo run
[Start] -> Task a -> Task c -> Task b -> Task d -> [End]
Executing Task[name: Task a]
Task executed successfully. [name: Task a]
Executing Task[name: Task b]
Executing Task[name: Task c]
Task executed successfully. [name: Task b]
Task executed successfully. [name: Task c]
Executing Task[name: Task d]
Task executed successfully. [name: Task d]

Process finished with exit code 0
$ cargo run --example compute_dag
[Start] -> Compute A -> Compute B -> Compute D -> Compute C -> Compute F -> Compute E -> Compute G -> [End]
Executing task [name: Compute A, id: 1]
Execution succeed [name: Compute A, id: 1]
Executing task [name: Compute C, id: 3]
Executing task [name: Compute B, id: 2]
Executing task [name: Compute D, id: 4]
Execution succeed [name: Compute C, id: 3]
Execution succeed [name: Compute B, id: 2]
Execution succeed [name: Compute D, id: 4]
Executing task [name: Compute F, id: 6]
Executing task [name: Compute E, id: 5]
Execution succeed [name: Compute F, id: 6]
Execution succeed [name: Compute E, id: 5]
Executing task [name: Compute G, id: 7]
Execution succeed [name: Compute G, id: 7]
The result is 272.
```

### `Yaml` configuration file
Expand Down Expand Up @@ -192,8 +261,8 @@ These yaml-defined task items form a complex dependency graph. In the yaml confi
To parse the yaml configured file, you need to compile this project, requiring rust version >= 1.70:

```bash
$cargo build --release
$ .\target\release\dagrs.exe --help
$ cargo build --release --features=yaml
$ ./target/release/dagrs.exe --help
Usage: dagrs.exe [OPTIONS] --yaml <YAML>
Options:
Expand All @@ -213,7 +282,7 @@ Options:
We can try an already defined file at `tests/config/correct.yaml`

```bash
$./target/release/dagrs --yaml=./tests/config/correct.yaml --log-path=./dagrs.log --log-level=info
$ ./target/release/dagrs --yaml=./tests/config/correct.yaml --log-path=./dagrs.log --log-level=info
[Start] -> Task 8 -> Task 5 -> Task 7 -> Task 6 -> Task 3 -> Task 2 -> Task 1 -> Task 4 -> [End]
Executing Task[name: Task 8]
Executing Task[name: Task 5]
Expand All @@ -227,10 +296,19 @@ Executing Task[name: Task 1]

You can see an example: `examples/yaml_dag.rs`. In fact, you can also programmatically read the yaml configuration file generation task, which is very simple, just use the `with_yaml` function provided by `Dag` to parse the configuration file.

--------------------------------------

**In addition to these two methods, `dagrs` also supports advanced task custom configuration.**

- `DefaultTask` is a default implementation of the `Task` trait. Users can also customize tasks and add more functions and attributes to tasks, but they still need to have the four necessary attributes in `DefaultTask`. `YamlTask` is another example of `Task` concrete implementation, its source code is available for reference, or refer to `example/custom_task.rs`.
- In addition to yaml-type configuration files, users can also provide other types of configuration files, but in order to allow other types of configuration files to be parsed as tasks, users need to implement the `Parser` trait. `YamlParser` source code is available for reference, or refer to `examples/custom_parser.rs`
- `DefaultTask` is a default implementation of the `Task` trait. Users can also customize tasks and add more functions and attributes to tasks, but they still need to have the four necessary attributes in `DefaultTask`. `YamlTask` is another example of `Task` concrete implementation, its source code is available for reference. No matter how you customize the task type, the customized task type must have the following attributes:
- `id`: uniquely identifies the task assigned by the global ID assigner
- `name`: the name of the task
- `predecessor_tasks`: the predecessor tasks of this task
- `action`: is a dynamic type that implements the Action trait in user programming, and it is the specific logic to be executed by the task

- In addition to yaml-type configuration files, users can also provide other types of configuration files, but in order to allow other types of configuration files to be parsed as tasks, users need to implement the `Parser` trait. `YamlParser` source code is available for reference.

`examples/custom_parser_and_task.rs` is an example of a custom task type and a custom configuration file parser

## Analyze the logic of task execution

Expand Down Expand Up @@ -323,27 +401,27 @@ gantt

### Basic function usage

`examples/compute_dag.rs`: Use a custom macro to generate multiple simple tasks.
`examples/compute_dag.rs`: A complete usage example of dagrs.

`examples/impl_action.rs`: Define a simple Action to build multiple tasks with the same logic.
`examples/action.rs`: Two ways to define the specific logic of a task.

`examples/yaml_dag.rs`: Spawn multiple tasks with a given yaml configuration file。

`examples/use_macro.rs`: Use the `gen_task` macro provided by `dagrs` to generate multiple simple tasks。
`examples/yaml_dag.rs`: Example of reading yaml configuration file (needs to enable `yaml` features).

`examples/engine.rs`: Using `Engine` to manage multiple dags with different task types.

### Advanced Features

`examples/custom_task.rs`: Implement the `Task` trait and define your own task type.

`examples/custom_parser.rs`: Implement the `Parser` trait to define your own task configuration file parser。
`examples/custom_parser_and_task.rs`: Custom task types and configuration file parsers.

`examples/custom_log.rs`: Implement the `Logger` trait to define your own global logger.

`examples/derive_task.rs`:Use `CustomTask` derived macros to help customize task types.

`examples/dependencies.rs`:Use the `dependencies!` macro to specify dependencies in an intuitive way and define a series of tasks.

## Contribution

The dagrs project relies on community contributions and aims to simplify getting started. To develop `dagrs`, clone the repository, then install all dependencies, run the test suite and try it out locally. Pick an issue, make changes, and submit a pull request for community review.
The `dagrs` project relies on community contributions and aims to simplify getting started. To develop `dagrs`, clone the repository, then install all dependencies, run the test suite and try it out locally. Pick an issue, make changes, and submit a pull request for community review.

### What's the contribution

Expand Down
25 changes: 0 additions & 25 deletions dagrs_core/Cargo.toml

This file was deleted.

Loading

0 comments on commit 6a3b72f

Please sign in to comment.