This tutorial shows how to write MapReduce programs in Python that are compatible with Hadoop Streaming. We'll use Python's itertools.groupby()
function to simplify our code.
Install Madoop, a light weight MapReduce framework for education. Madoop implements the Hadoop Streaming interface.
$ pip install madoop
We'll use an example MapReduce program and input files provided by Madoop.
$ madoop --example
$ tree example
example
├── input
│ ├── input01.txt
│ └── input02.txt
├── map.py
└── reduce.py
Execute the example MapReduce program using Madoop and show the output.
$ madoop \
-input example/input \
-output example/output \
-mapper example/map.py \
-reducer example/reduce.py
$ cat output/part-*
Goodbye 1
Bye 1
Hadoop 2
World 2
Hello 2
Hadoop Streaming is a MapReduce API that works with any programming language. The mapper and the reducer are executables that read input from stdin and write output to stdout.
The MapReduce framework begins by partitioning (splitting) the input. If the input size is large, a real MapReduce framework will break it up into smaller chunks. Each Map execution will process one input partition. In this tutorial, we're faking MapReduce at the command line with a single mapper, so we'll skip the partition step.
The mapper is an executable that reads input from stdin and writes output to stdout. Here's an example map.py
which is part of a word count MapReduce program.
#!/usr/bin/env python3
"""Word count mapper."""
import sys
for line in sys.stdin:
words = line.split()
for word in words:
print(f"{word}\t1")
The map input format is up to the programmer. For example:
$ cat input/input01.txt
Hello World
Bye World
$ cat input/input02.txt
Hello Hadoop
Goodbye Hadoop
Each map output line contains one key-value pair separated by a TAB character. We'll fake the map stage at the command line. While a real MapReduce framework may use multiple map executions, our example runs one.
$ cat input/input* | python3 map.py
Hello 1
World 1
Bye 1
World 1
Hello 1
Hadoop 1
Goodbye 1
Hadoop 1
The MapReduce framework provides the Group functionality. You can fake the group stage with the sort
command:
$ cat input/input* | python3 map.py | sort
Bye 1
Goodbye 1
Hadoop 1
Hadoop 1
Hello 1
Hello 1
World 1
World 1
The reducer is an executable that reads input from stdin and writes output to stdout. Here's a simplified reduce.py
which is part of a word count MapReduce program. This is a bad example because it's inefficient and processes multiple groups all at once.
#!/usr/bin/env python3
"""Word count reducer.
BAD EXAMPLE: it's inefficient and processes multiple groups all at once.
"""
import sys
import collections
def main():
"""Reduce multiple groups."""
word_count = collections.defaultdict(int)
for line in sys.stdin:
word, _, count = line.partition("\t")
word_count[word] += int(count)
for word, count in word_count.items():
print(f"{word} {count}")
if __name__ == "__main__":
main()
Each reduce input line contains one key-value pair separated by a TAB character. For example:
Hello 1
Hello 1
One reducer execution receives every key in a group. For example, these two groups are impossible:
Hello 1
Hello 1
Lines are supplied to each reducer execution in sorted order. The entire line is sorted, not just the key. For example, these unsorted lines will never happen:
Hadoop 1
Bye 1
Hadoop 1
Goodbye 1
One reducer execution may receive multiple groups. For example:
Bye 1
Goodbye 1
Hadoop 1
Hadoop 1
The reduce output format is up to the programmer. Here's how to run the whole word count MapReduce program at the command line.
$ cat input/input* | python3 map.py | sort | python3 reduce.py
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
In this section, we'll simplify our reducer code by using itertools.groupby()
to separate the input into groups of lines that share a key.
Our earlier reduce.py
was a bad example for two reasons:
- It's inefficient. It put everything in a big dictionary
word_count
. - It's complicated. It processed multiple groups. This strategy becomes more of a problem in more complicated MapReduce programs.
# Bad example: Inefficient and complicated
def main():
"""Reduce multiple groups."""
word_count = collections.defaultdict(int)
for line in sys.stdin:
word, _, count = line.partition("\t")
word_count[word] += int(count)
for word, count in word_count.items():
print(f"{word} {count}")
If one reducer execution received one group, we could simplify the reducer and use only O(1) memory.
# Good example: Efficient and simple
def reduce_one_group(key, group):
"""Reduce one group."""
word_count = 0
for line in group:
count = line.partition("\t")[2]
word_count += int(count)
print(f"{key} {word_count}")
If one reducer execution input contains multiple groups, how can we process one group at a time? We'll use itertools.groupby()
.
itertools.groupby()
separates input into groups that share a key. If the input to groupby()
looks like this:
Bye 1
Goodbye 1
Hadoop 1
Hadoop 1
The output will be an iterator over three groups, like this:
Bye 1
---------
Goodbye 1
---------
Hadoop 1
Hadoop 1
The reducer's main function will divide the input into groups of lines and call a function reduce_one_group()
on each group. groupby()
assumes that the input sorted. Hadoop Streaming provides sorted input to the reducer.
def main():
"""Divide sorted lines into groups that share a key."""
for key, group in itertools.groupby(sys.stdin, keyfunc):
reduce_one_group(key, group)
The keyfunc
function extracts the key. When lines share a key, they share a group.
def keyfunc(line):
"""Return the key from a TAB-delimited key-value pair."""
return line.partition("\t")[0]
For example:
>>> keyfunc("Hello\t1")
'Hello'
We can process one group at a time with reduce_one_group()
. The key
will be one word in our word count example. The group
is an iterator over lines of input that start with key
.
def reduce_one_group(key, group):
"""Reduce one group."""
word_count = 0
for line in group:
count = line.partition("\t")[2]
word_count += int(count)
print(f"{key} {word_count}")
Finally, we can run our entire MapReduce program.
$ cat input/* | ./map.py | sort | ./reduce.py
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
Here's a template you can copy-paste to get started on a reducer. The only part you need to edit is the "IMPLEMENT ME"
line.
#!/usr/bin/env python3
"""
Template reducer.
https://github.com/eecs485staff/madoop/blob/main/README_Hadoop_Streaming.md
"""
import sys
import itertools
def reduce_one_group(key, group):
"""Reduce one group."""
assert False, "IMPLEMENT ME"
def keyfunc(line):
"""Return the key from a TAB-delimited key-value pair."""
return line.partition("\t")[0]
def main():
"""Divide sorted lines into groups that share a key."""
for key, group in itertools.groupby(sys.stdin, keyfunc):
reduce_one_group(key, group)
if __name__ == "__main__":
main()
These are some pro-tips for working with MapReduce programs written in Python for the Hadoop Streaming interface.
We encounter a problem if we add a breakpoint()
in map.py
.
for line in sys.stdin:
breakpoint() # Added this line
words = line.split()
for word in words:
print(f"{word}\t1")
PDB/PDB++ confuses the stdin being piped in from the input file for user input, so we get these errors:
$ cat input/input* | ./map.py
...
(Pdb++) *** SyntaxError: invalid syntax
(Pdb++) *** SyntaxError: invalid syntax
(Pdb++) *** SyntaxError: invalid syntax
...
To solve this problem, temporarily refactor your program to read every line of stdin to a variable first. SO post
lines = sys.stdin.readlines() # Temporary addition
sys.stdin = open("/dev/tty") # Temporary addition
for line in lines: # Temporary modification
breakpoint()
words = line.split()
for word in words:
print(f"{word}\t1")
Now our debugger works correctly.
$ cat input/input* | ./map.py
...
(Pdb++)
Don't forget to remove your temporary changes!
To iterate over the same group twice, convert it to a list
. By default, a group is an iterable, which is "one pass".
def reduce_one_group(key, group):
group = list(group) # iterable to list
for line in group:
pass # Do something
for line in group:
pass # Do something
If you need to specify which key-value pairs are sent to which reducers, you can create a custom partitioner. Here's a sample which works with our word count example.
#!/usr/bin/env -S python3 -u
"""Word count partitioner."""
import sys
num_reducers = int(sys.argv[1])
for line in sys.stdin:
key, value = line.split("\t")
if key[0] <= "G":
print(0 % num_reducers)
else:
print(1 % num_reducers)
Each line of output from the mappers is streamed to this partition file, and the number of reducers is set to sys.argv[1]
. For each line, the partitioner checks whether the first letter of the key is less than or greater than "G". If it's less than "G", the line is sent to the first reducer, and if it's greater, the line is sent to the second reducer.
Use the -partitioner
command-line argument to tell Madoop to use this partitioner.
$ madoop \
-input example/input \
-output example/output \
-mapper example/map.py \
-reducer example/reduce.py \
-partitioner example/partition.py
This feature is similar to Hadoop's Partitioner
class, although it is not directly compatible with Hadoop. The main difference is that Hadoop only allows partitioners to be a Java class, while Madoop allows any executable that reads from stdin
and writes to stdout
.