-
Notifications
You must be signed in to change notification settings - Fork 7
/
shared_stream.rs
123 lines (103 loc) · 3.29 KB
/
shared_stream.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use futures::{
future,
stream::{self, StreamExt as _},
};
use par_stream::{rt, Shared};
use rand::{prelude::*, rngs::OsRng};
use std::time::Duration;
use structopt::StructOpt;
#[derive(StructOpt)]
struct Opts {
pub num_jobs: usize,
pub num_workers: usize,
pub in_buf_size: usize,
pub out_buf_size: usize,
pub pow: u32,
#[structopt(long)]
pub spawn: bool,
}
fn main() {
par_stream::rt::block_on_executor(async move {
let opts = Opts::from_args();
let elapsed_notifier = shared_stream_by_notifier_test(&opts).await;
println!("elapsed for notifier\t{:?}ms", elapsed_notifier.as_millis());
let elapsed_channel = shared_stream_by_channel_test(&opts).await;
println!("elapsed for channel\t{:?}ms", elapsed_channel.as_millis());
});
}
async fn shared_stream_by_notifier_test(opts: &Opts) -> Duration {
let pow = opts.pow;
let spawn = opts.spawn;
let stream = stream::repeat(())
.take(opts.num_jobs)
.map(|()| -> u64 { OsRng.gen() });
let stream = Shared::new(stream);
let (out_tx, out_rx) = flume::bounded(opts.out_buf_size);
let worker_futures = (0..(opts.num_workers)).map(move |_| {
let out_tx = out_tx.clone();
let stream = stream.clone();
rt::spawn(async move {
let _ = stream
.then(|val| task(val, pow, spawn))
.map(Ok)
.forward(out_tx.into_sink())
.await;
})
});
let output_future = rt::spawn(async move {
out_rx
.into_stream()
.fold(0u64, |sum, val| future::ready(sum.wrapping_add(val)))
.await
});
let instant = std::time::Instant::now();
futures::join!(output_future, future::join_all(worker_futures));
instant.elapsed()
}
async fn shared_stream_by_channel_test(opts: &Opts) -> Duration {
let pow = opts.pow;
let spawn = opts.spawn;
let stream = stream::repeat(())
.take(opts.num_jobs)
.map(|()| -> u64 { OsRng.gen() });
let (in_tx, in_rx) = flume::bounded(opts.in_buf_size);
let (out_tx, out_rx) = flume::bounded(opts.out_buf_size);
let input_future = rt::spawn(async move {
let _ = stream.map(Ok).forward(in_tx.into_sink()).await;
});
let worker_futures = (0..(opts.num_workers)).map(move |_| {
let in_rx = in_rx.clone();
let out_tx = out_tx.clone();
rt::spawn(async move {
let _ = in_rx
.into_stream()
.then(|val| task(val, pow, spawn))
.map(Ok)
.forward(out_tx.into_sink())
.await;
})
});
let output_future = rt::spawn(async move {
out_rx
.into_stream()
.fold(0u64, |sum, val| future::ready(sum.wrapping_add(val)))
.await
});
let instant = std::time::Instant::now();
futures::join!(
input_future,
output_future,
future::join_all(worker_futures)
);
instant.elapsed()
}
async fn task(input: u64, pow: u32, spawn: bool) -> u64 {
if spawn {
rt::spawn_blocking(move || compute(input, pow)).await
} else {
compute(input, pow)
}
}
fn compute(input: u64, pow: u32) -> u64 {
(0..pow).fold(1u64, move |product, _| product.wrapping_mul(input))
}