-
Notifications
You must be signed in to change notification settings - Fork 7
/
shuffle.rs
124 lines (105 loc) · 4.28 KB
/
shuffle.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
//! The example demonstrates the MergeShuffle algorithm by Axel Bacher et al.
//!
//! See the paper _MergeShuffle: A Very Fast, Parallel Random Permutation Algorithm_
//! for the description of this algorithm.
//! https://arxiv.org/abs/1508.03167
use concurrent_slice::{Chunk, ConcurrentSlice};
use futures::{stream, stream::StreamExt as _};
use par_stream::prelude::*;
use rand::prelude::*;
use std::{mem, time::Instant};
const LEN: usize = 100_000_000;
const MIN_CHUNK_LEN: usize = 25_000_000;
fn main() {
par_stream::rt::block_on_executor(async move {
let array: Vec<_> = (0..LEN).collect();
// benchmark Fisher-Yates shuffling
let array_single = array.clone();
let instant = Instant::now();
let _array_single = {
let mut array = array_single;
fisher_yates(&mut array);
array
};
eprintln!("single:\t{:?}", instant.elapsed());
// benchmark parallel algorithm
let instant = Instant::now();
let _array_concurrent = {
// shuffle each chunk locally
let mut chunks: Vec<_> = stream::iter(array.concurrent_chunks(MIN_CHUNK_LEN))
.par_map(None, |mut chunk| {
move || {
fisher_yates(&mut chunk);
chunk
}
})
.collect()
.await;
// merge shuffle
while chunks.len() > 1 {
let chunks_: Vec<_> = stream::iter(chunks)
.chunks(2)
.par_map(None, |pair| {
move || {
let mut pair = pair.into_iter();
let lchunk = pair.next().unwrap();
let rchunk = match pair.next() {
Some(rchunk) => rchunk, // non-tail case
None => {
// tail case
return lchunk;
}
};
let llen = lchunk.len();
// merge chunk pair into one chunk
let mut chunk = Chunk::cat(vec![lchunk, rchunk]);
let len = chunk.len();
let mut rng = rand::thread_rng();
let mut lidx = 0;
let mut ridx = llen;
while lidx < ridx && ridx < len {
if rng.gen() {
let (lslice, rslice) = chunk.split_at_mut(ridx);
mem::swap(&mut lslice[lidx], &mut rslice[0]);
ridx += 1;
}
lidx += 1;
}
while lidx < len {
let oidx = rng.gen_range(0..=lidx);
if oidx != lidx {
let (lslice, rslice) = chunk.split_at_mut(oidx + 1);
mem::swap(
lslice.last_mut().unwrap(),
&mut rslice[lidx - oidx - 1],
);
}
lidx += 1;
}
chunk
}
})
.collect()
.await;
// swap even and odd chunks
chunks = chunks_;
}
// merge chunks back to array
let guard = chunks[0].guard();
drop(chunks);
guard.try_unwrap().unwrap()
};
eprintln!("parallel:\t{:?}", instant.elapsed());
});
}
fn fisher_yates<T>(array: &mut [T]) {
let mut rng = rand::thread_rng();
let len = array.len();
(0..len).for_each(|lidx| {
let ridx = rng.gen_range(lidx..len);
if lidx != ridx {
let (larray, rarray) = array.split_at_mut(lidx + 1);
mem::swap(larray.last_mut().unwrap(), &mut rarray[ridx - lidx - 1]);
}
});
}