forked from rcrowley/bashreduce
-
Notifications
You must be signed in to change notification settings - Fork 4
/
br
executable file
·218 lines (195 loc) · 5.73 KB
/
br
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/bin/bash
# bashreduce: mapreduce in bash
usage() {
echo "Usage: $1 [-h '<host>[ <host>][...]'] [-b] [-f]" >&2
echo " [-m <map>] [-r <reduce>] [-M <merge>]" >&2
echo " [-i <input>] [-o <output>] [-c <column>]" \
"[-S <sort-mem-MB>]" >&2
echo " [-t <tmp>] [-?]" >&2
if [ -n "$2" ] ; then
echo " -h hosts to use; repeat hosts for multiple cores" >&2
echo " (defaults to contents of /etc/br.hosts)" >&2
echo " -b get the hosts from the extended attributes," >&2
echo " if the input file is stored on BeeFS" >&2
echo " -f send filenames, not data, over the network," >&2
echo " implies each host has a mirror of the dataset" >&2
echo " -m map program (effectively defaults to cat)" >&2
echo " -r reduce program" >&2
echo " -M merge program" >&2
echo " -i input file or directory (defaults to stdin)" >&2
echo " -o output file (defaults to stdout)" >&2
echo " -c column used by sort (defaults to 1)" >&2
echo " -S memory to use for sort (defaults to 256M)" >&2
echo " -t tmp directory (defaults to /tmp)" >&2
echo " -? this help message" >&2
fi
exit 2
}
hostnames() {
FILENAME=$1
hosts=""
for input in `cat $FILENAME` ; do
replicationGroup=$(getfattr --only-values -n beefs.replicationGroup $input 2> /dev/null)
if [ -n "$replicationGroup" ] ; then
hosts="$hosts $(echo $replicationGroup | cut -d, -f5)"
fi
done
echo $hosts
}
# Defaults
hosts=
xattr=false
filenames=false
map=
reduce=
merge=
input=
output=
column=1
sort_mem=256M
tmp=/tmp
program=$(basename $0)
while getopts "h:bfm:r:M:i:o:c:S:t:?" name; do
case "$name" in
h) hosts=$OPTARG;;
b)
xattr=true
filenames=true
;;
f) filenames=true;;
m) map=$OPTARG;;
r) reduce=$OPTARG;;
M) merge=$OPTARG;;
i) input=$OPTARG;;
o) output=$OPTARG;;
c) column=$OPTARG;;
S) sort_mem=$OPTARG;;
t) tmp=$OPTARG;;
?) usage $program HELP;;
*) usage $program;;
esac
done
# If -h wasn't given, try BeeFS extended attributes
if [[ -z "$hosts" && -f "$input" && $xattr == true ]] ; then
hosts=$(hostnames $input)
fi
# If -h wasn't given, try /etc/br.hosts
if [[ -z "$hosts" ]]; then
if [[ -e /etc/br.hosts ]]; then
hosts=$(cat /etc/br.hosts)
else
echo "$program: must specify -h or provide /etc/br.hosts"
usage $program
fi
fi
# Start br_stderr from a clean slate
cp /dev/null $tmp/br_stderr
# Setup map and reduce as parts of a pipeline
[[ -n "$map" ]] && map="| $map 2>>$tmp/br_stderr"
[[ $filenames == true ]] && map="| xargs -n1 \
sh -c 'zcat \$0 2>>$tmp/br_stderr || cat \$0 2>>$tmp/br_stderr' $map"
[[ -n "$reduce" ]] && reduce="| $reduce 2>>$tmp/br_stderr"
jobid="$(uuidgen)"
jobpath="$tmp/br_job_$jobid"
nodepath="$tmp/br_node_$jobid"
mkdir -p $jobpath/{in,out}
port_in=8192
port_out=$(($port_in + 1))
host_idx=0
out_files=
for host in $hosts; do
mkfifo $jobpath/{in,out}/$host_idx
# Listen for work (remote)
ssh -n $host "mkdir -p $nodepath/"
pid=$(ssh -n $host "nc -l -p $port_out >$nodepath/$host_idx \
2>>$tmp/br_stderr </dev/null & jobs -l" \
| awk {'print $2'})
# Do work (remote)
ssh -n $host "tail -s0.1 -f --pid=$pid $nodepath/$host_idx \
2>>$tmp/br_stderr </dev/null \
| LC_ALL='$LC_ALL' sort -S$sort_mem -T$tmp -k$column,$column \
2>>$tmp/br_stderr \
$map $reduce \
| nc -q0 -l -p $port_in >>$tmp/br_stderr &"
# Send work (local)
nc $host $port_in >$jobpath/in/$host_idx &
# Receive results (local)
nc -q0 $host $port_out <$jobpath/out/$host_idx &
out_files="$out_files $jobpath/out/$host_idx"
# ++i
port_in=$(($port_in + 2))
port_out=$(($port_in + 1))
host_idx=$(($host_idx + 1))
done
# Create the command to produce input
if [[ -d "$input" ]]; then
input="find $input -type f |"
[[ $filenames == false ]] && input="$input xargs -n1 \
sh -c 'zcat \$0 2>>$tmp/br_stderr || cat \$0 2>>$tmp/br_stderr' |"
elif [[ -f "$input" ]]; then
input="sh -c 'zcat $input 2>>$tmp/br_stderr \
|| cat $input 2>>$tmp/br_stderr' |"
else
input=
fi
# Partition local input to the remote workers
if which brp >>$tmp/br_stderr; then
BRP=brp
elif [[ -f brutils/brp ]]; then
BRP=brutils/brp
fi
if [[ $xattr == true ]]; then
# whereas the number of slaves is greater than the number of files, then:
x=0
for filename in `eval "$input cat"` ; do
echo $filename > "$jobpath/out/$x"
x=$((x+1))
done
# TODO: but, if the number of slaves is NOT greater than the number of files?
else
if [[ -n "$BRP" ]]; then
eval "$input $BRP - $(($column - 1)) $out_files"
else
# use awk if we don't have brp
# we're taking advantage of a special property that awk leaves its file handles open until its done
# i think this is universal
# we're also sending a zero length string to all the handles at the end, in case some pipe got no love
eval "$input awk '{
srand(\$$column);
print \$0 >>\"$jobpath/out/\"int(rand() * $host_idx);
}
END {
for (i = 0; i != $host_idx; ++i)
printf \"\" >>\"$jobpath/out/\"i;
}'"
fi
fi
# TODO: Merge output from hosts into one (maybe but not necessarily just a re-reduce)
if which brm >>$tmp/br_stderr; then
BRM=brm
elif [[ -f brutils/brm ]]; then
BRM=brutils/brm
fi
if [[ -n "$merge" ]]; then
eval "find $jobpath/in -type p | xargs cat \
| $merge 2>>$tmp/br_stderr ${output:+| pv >$output}"
else
if [[ -n "$BRM" ]]; then
eval "$BRM - $(($column - 1)) $(find $jobpath/in/ -type p | xargs) \
${output:+| pv >$output}"
else
# use sort -m if we don't have brm
# sort -m creates tmp files if too many input files are specified
# brm doesn't do this
eval "sort -k$column,$column -S$sort_mem -m $jobpath/in/* \
${output:+| pv >$output}"
fi
fi
# Cleanup
rm -rf $jobpath
for host in $hosts; do
ssh $host "rm -rf $nodepath"
done
# TODO: is there a safe way to kill subprocesses upon fail?
# this seems to work: /bin/kill -- -$$