Skip to content

Commit

Permalink
Merge pull request #21 from QIUZHILEI/dev
Browse files Browse the repository at this point in the history
optimize code;dismiss warning; drawing with mermaid
  • Loading branch information
genedna authored Jul 17, 2023
2 parents 2c147de + a61660f commit d69586f
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 34 deletions.
139 changes: 113 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,48 @@
# Dagrs

## What is dagrs
`dagrs` are suitable for the execution of multiple tasks with graph-like dependencies. `dagrs` has the characteristics of high performance and asynchronous execution. It provides users with a convenient programming interface.

dagrs are suitable for the execution of multiple tasks with graph-like dependencies. dagrs has the characteristics of high performance and asynchronous execution. It provides users with a convenient programming interface.
## What can `dagrs` do

## What can dagrs do

dagrs allows users to easily execute multiple sets of tasks with complex graph dependencies. It only requires:
The user defines tasks and specifies the dependencies of the tasks, and dagrs can execute the tasks sequentially in the topological sequence of the graph.
`dagrs` allows users to easily execute multiple sets of tasks with complex graph dependencies. It only requires:
The user defines tasks and specifies the dependencies of the tasks, and `dagrs` can execute the tasks sequentially in the topological sequence of the graph.
For example:

![image-20230713164020589](assets/tasks.png)
```mermaid
flowchart LR
A((Task a))-->B
A-->C
B((Task b))-->D
C((Task c))-->D
B-->F
C-->E
D((Task d))-->G
E((Task e))-->G
F((Task f))-->G((Task g))
```

This graph represents the dependencies between tasks, and the graph composed of tasks must satisfy two points:

- A graph allows only one point with zero in-degree and zero out-degree(Only one start task and one end task are allowed).

- The graph itself is directed, and the user must ensure that there are no loops in the graph, that is, the dependencies of tasks cannot form a closed loop, otherwise the engine will refuse to execute all tasks, for example:

![image-20230713164229768](assets/loop.png)
```mermaid
flowchart LR
A-->C
A((Task 1))-->B
subgraph "Task B, C, and D form a loop"
B((Task 2))-->C
C((Task 3))-->D
D((Task 4))-->B
end
```

Among them, each task may produce output, and may also require the output of some tasks as its input.

## Try using dagrs
## Try using `dagrs`

dagrs provides two basic task definition methods, which are programming to implement the logic of the task and defining the yaml configuration file. Programmatically implementing the definition of tasks will make the logic of tasks more flexible, and it is also the main method of using dagrs. Next, we will introduce the usage of the two methods in detail.
`dagrs` provides two basic task definition methods, which are programming to implement the logic of the task and defining the yaml configuration file. Programmatically implementing the definition of tasks will make the logic of tasks more flexible, and it is also the main method of using `dagrs`. Next, we will introduce the usage of the two methods in detail.

*Make sure the Rust compilation environment is available.*

Expand All @@ -46,7 +64,7 @@ use dagrs::{

struct SimpleAction(usize);
/// Implement the `Action` trait for `SimpleAction`, defining the logic of the `run` function.
/// The logic here is simply to get the output value (usize) of all predecessor tasks and then accumulate.
/// The logic here is simply to get the output value (`usize`) of all predecessor tasks and then accumulate.
impl Action for SimpleAction{
fn run(&self, input: Input,env:Arc<EnvVar>) -> Result<Output,RunningError> {
let base = env.get::<usize>("base").unwrap();
Expand Down Expand Up @@ -96,12 +114,9 @@ Finally we call the `start` function of `Dag` to execute all tasks. After the ta

The graph formed by the task is shown below:

```
B
↗ ↘
A D
↘ ↗
C
```mermaid
flowchart LR;
A((Task a))-->B; A-->C; B((Task b))-->D; C((Task c))-->D((Task d));
```

The execution order is a->c->b->d.
Expand Down Expand Up @@ -226,7 +241,7 @@ Executing Task[name: Task 1]

You can see an example: `examples/yaml_dag.rs`. In fact, you can also programmatically read the yaml configuration file generation task, which is very simple, just use the `with_yaml` function provided by `Dag` to parse the configuration file.

**In addition to these two methods, dagrs also supports advanced task custom configuration.**
**In addition to these two methods, `dagrs` also supports advanced task custom configuration.**

- `DefaultTask` is a default implementation of the `Task` trait. Users can also customize tasks and add more functions and attributes to tasks, but they still need to have the four necessary attributes in `DefaultTask`. `YamlTask` is another example of `Task` concrete implementation, its source code is available for reference, or refer to `example/custom_task.rs`.
- In addition to yaml-type configuration files, users can also provide other types of configuration files, but in order to allow other types of configuration files to be parsed as tasks, users need to implement the `Parser` trait. `YamlParser` source code is available for reference, or refer to `examples/custom_parser.rs`
Expand All @@ -236,15 +251,87 @@ You can see an example: `examples/yaml_dag.rs`. In fact, you can also programma
**The execution process of Dag is roughly as follows:**

- The user gives a list of tasks `tasks`. These tasks can be parsed from configuration files, or provided by user programming implementations.
- Internally generate`Graph`based on task dependencies, and generate execution sequences based on* `rely_graph`.

- Internally generate `Graph` based on task dependencies, and generate execution sequences based on* `rely_graph`.

```mermaid
flowchart TD
subgraph tasks
direction LR
A-->B
A-->C
B-->D
B-->F
C-->D
C-->E
D-->F
E-->F
end
subgraph seq
direction LR
a(A)-->b(B)-->c(C)-->d(D)-->e(E)-->f(F)
end
tasks==Generate execution sequence based on topological sort==>seq
```



- The task is scheduled to start executing asynchronously.

- The task will wait to get the result`execute_states`generated by the execution of the predecessor task.

```mermaid
---
title: data flow
---
flowchart LR
A-->oa((out))
oa--input-->B
oa--input-->C
B-->ob((out))
ob--input-->D
ob--input-->F
C-->oc((out))
oc--input-->D
oc--input-->E
D-->od((out))
od--input-->F
E-->oe((out))
oe--input-->F
F-->of((out))
```



- If the result of the predecessor task can be obtained, check the continuation status`can_continue`, if it is true, continue to execute the defined logic, if it is false, trigger`handle_error`, and cancel the execution of the subsequent task.
- After all tasks are executed, set the continuation status to false, which means that the tasks of the dag cannot be scheduled for execution again.

![image-20230713171109893](assets/execute_logic.png)
- After all tasks are executed, set the continuation status to false, which means that the tasks of the `dag` cannot be scheduled for execution again.

The task execution mode of `dagrs` is parallel. In the figure, the execution sequence is divided into four intervals by the vertical dividing line. During the overall execution of the task, it will go through four parallel execution stages. As shown in the figure: first task A is executed, and tasks B and C obtain the output of A as the input of their own tasks and start to execute in parallel; similarly, tasks D and E must wait until they obtain the output of their predecessors before starting to execute in parallel; finally, Task F must wait for the execution of tasks B, D, and E to complete before it can start executing.

```mermaid
gantt
dateFormat X
axisFormat %s
title Execution timing
section Step1
Task A:0,10
Task B:0,1
Task C:0,1
Task D:0,1
Task E:0,1
Task F:0,1
section Step2
Task B:10,19
Task C:10,19
section Step3
Task D:19,28
Task E:19,28
section Step4
Task F:28,37
```


The task execution mode of dagrs is parallel. In the figure, the execution sequence is divided into four intervals by the vertical dividing line. During the overall execution of the task, it will go through four parallel execution stages. As shown in the figure: first task A is executed, and tasks B and C obtain the output of A as the input of their own tasks and start to execute in parallel; similarly, tasks D and E must wait until they obtain the output of their predecessors before starting to execute in parallel; finally, Task F must wait for the execution of tasks B, D, and E to complete before it can start executing.

## The examples

Expand All @@ -256,7 +343,7 @@ The task execution mode of dagrs is parallel. In the figure, the execution seque

`examples/yaml_dag.rs`: Spawn multiple tasks with a given yaml configuration file。

`examples/use_macro.rs`: Use the `gen_task` macro provided by dagrs to generate multiple simple tasks。
`examples/use_macro.rs`: Use the `gen_task` macro provided by `dagrs` to generate multiple simple tasks。

`examples/engine.rs`: Using `Engine` to manage multiple dags with different task types.

Expand All @@ -270,7 +357,7 @@ The task execution mode of dagrs is parallel. In the figure, the execution seque

## Contribution

The dagrs project relies on community contributions and aims to simplify getting started. To develop dagrs, clone the repository, then install all dependencies, run the test suite and try it out locally. Pick an issue, make changes, and submit a pull request for community review.
The dagrs project relies on community contributions and aims to simplify getting started. To develop `dagrs`, clone the repository, then install all dependencies, run the test suite and try it out locally. Pick an issue, make changes, and submit a pull request for community review.

### What's the contribution

Expand All @@ -280,10 +367,10 @@ Here are some guidelines for contributing to this project:
2. Suggest enhancements: If you have an idea to enhance or improve this project, you can suggest it by creating an issue on the issue tracker. Explain your enhancement in detail along with its use cases and benefits. I appreciate well-thought-out enhancement suggestions.
3. Contribute code: If you want to develop and contribute code, follow these steps:
- Choose an issue to work on. Issues labeled `good first issue` are suitable for newcomers. You can also look for issues marked `help wanted`.
- Fork the dagrs repository and create a branch for your changes.
- Fork the `dagrs` repository and create a branch for your changes.
- Make your changes and commit them with a clear commit message. Sign the [Developer Certificate of Origin](https://developercertificate.org/) (DCO) by adding a `Signed-off-by` line to your commit messages. This certifies that you wrote or have the right to submit the code you are contributing to the project.
- Push your changes to GitHub and open a pull request.
- Respond to any feedback on your pull request. The dagrs maintainers will review your changes and may request modifications before merging. Please ensure your code is properly formatted and follows the same style as the existing codebase.
- Respond to any feedback on your pull request. The `dagrs` maintainers will review your changes and may request modifications before merging. Please ensure your code is properly formatted and follows the same style as the existing codebase.
- Once your pull request is merged, you will be listed as a contributor in the project repository and documentation.
4. Write tutorials/blog posts: You can contribute by writing tutorials or blog posts to help users get started with this project. Submit your posts on the issue tracker for review and inclusion. High quality posts that provide value to users are highly appreciated.
5. Improve documentation: If you find any gaps in the documentation or think any part can be improved, you can make changes to files in the documentation folder and submit a PR. Ensure the documentation is up-to-date with the latest changes.
Expand Down
Binary file removed assets/execute_logic.png
Binary file not shown.
Binary file removed assets/loop.png
Binary file not shown.
Binary file removed assets/tasks.png
Binary file not shown.
8 changes: 4 additions & 4 deletions examples/custom_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,17 @@ impl ConfigParser {
fn parse_one(&self, item: String) -> MyTask {
let attr: Vec<&str> = item.split(",").collect();

let pres_item = attr.get(2).unwrap().clone();
let pres_item = *attr.get(2).unwrap();
let pres = if pres_item.eq("") {
Vec::new()
} else {
pres_item.split(" ").map(|pre| pre.to_string()).collect()
};

let id = attr.get(0).unwrap().clone();
let id = *attr.get(0).unwrap();
let name = attr.get(1).unwrap().to_string();
let script = attr.get(4).unwrap().clone();
let t_type = attr.get(3).unwrap().clone();
let script = *attr.get(4).unwrap();
let t_type = *attr.get(3).unwrap();
if t_type.eq("sh") {
MyTask::new(
id,
Expand Down
4 changes: 1 addition & 3 deletions src/engine/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ impl Graph {
})
.count();

while !queue.is_empty() {
let v = queue.pop().unwrap(); // This unwrap is ok since `queue` is not empty

while let Some(v)=queue.pop() {
sequence.push(v);
count += 1;

Expand Down
2 changes: 1 addition & 1 deletion src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ use crate::utils::EnvVar;

pub use self::error::{RunningError,JavaScriptExecuteError,ShExecuteError};
pub use self::script::{JavaScript,ShScript};
pub use self::specific_task::{YamlTask};
pub use self::specific_task::YamlTask;
pub use self::state::{Output,Input};
pub(crate) use self::state::ExecState;

Expand Down

0 comments on commit d69586f

Please sign in to comment.