Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flexible num reducers #66

Merged
merged 9 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions madoop/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def main():
'-v', '--verbose', action='count', default=0,
help="verbose output"
)
optional_args.add_argument(
'-numReduceTasks', dest='num_reducers', default=4,
help="max number of reducers"
)
required_args = parser.add_argument_group('required arguments')
required_args.add_argument('-input', dest='input', required=True)
required_args.add_argument('-output', dest='output', required=True)
Expand All @@ -60,6 +64,7 @@ def main():
output_dir=args.output,
map_exe=args.mapper,
reduce_exe=args.reducer,
num_reducers=int(args.num_reducers)
)
except MadoopError as err:
sys.exit(f"Error: {err}")
Expand Down
25 changes: 16 additions & 9 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
MAX_INPUT_SPLIT_SIZE = 2**20 # 1 MB

# The number of reducers is dynamically determined by the number of unique keys
# but will not be more than MAX_NUM_REDUCE
MAX_NUM_REDUCE = 4
# but will not be more than num_reducers

# Madoop logger
LOGGER = logging.getLogger("madoop")


def mapreduce(input_path, output_dir, map_exe, reduce_exe):
def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers):
"""Madoop API."""
# Do not clobber existing output directory
output_dir = pathlib.Path(output_dir)
Expand Down Expand Up @@ -74,6 +73,7 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe):
group_stage(
input_dir=map_output_dir,
output_dir=reduce_input_dir,
num_reducers=num_reducers
)

# Run the reducing stage
Expand Down Expand Up @@ -240,37 +240,43 @@ def keyhash(key):
return int(hexdigest, base=16)


def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats):
def partition_keys(
inpath,
outpaths,
input_keys_stats,
output_keys_stats,
num_reducers):
"""Allocate lines of inpath among outpaths using hash of key.

Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.

"""
assert len(outpaths) == MAX_NUM_REDUCE
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
with contextlib.ExitStack() as stack:
outfiles = [stack.enter_context(p.open("a")) for p in outpaths]
for line in stack.enter_context(inpath.open()):
key = line.partition('\t')[0]
input_keys_stats[inpath].add(key)
reducer_idx = keyhash(key) % MAX_NUM_REDUCE
reducer_idx = keyhash(key) % num_reducers
outfiles[reducer_idx].write(line)
outpath = outpaths[reducer_idx]
output_keys_stats[outpath].add(key)


def group_stage(input_dir, output_dir):
def group_stage(input_dir, output_dir, num_reducers):
"""Run group stage.

Process each mapper output file, allocating lines to grouper output files
using the hash and modulo of the key.

"""
# Compute output filenames
LOGGER.debug("%s reducers", num_reducers)
outpaths = []
for i in range(MAX_NUM_REDUCE):
for i in range(num_reducers):
outpaths.append(output_dir/part_filename(i))

# Track keyspace stats, map filename -> set of keys
Expand All @@ -279,7 +285,8 @@ def group_stage(input_dir, output_dir):

# Partition input, appending to output files
for inpath in sorted(input_dir.iterdir()):
partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats)
partition_keys(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers)

# Log input keyspace stats
all_input_keys = set()
Expand Down
23 changes: 23 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,30 @@ def test_simple(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
tmpdir/"output",
)


def test_2_reducers(tmpdir):
"""Run a simple MapReduce job with 2 reducers."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=2
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output-2-reducers",
tmpdir/"output",
)


def test_bash_executable(tmpdir):
"""Run a MapReduce job written in Bash."""
with tmpdir.as_cwd():
Expand All @@ -28,6 +45,7 @@ def test_bash_executable(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.sh",
reduce_exe=TESTDATA_DIR/"word_count/reduce.sh",
num_reducers=4
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand All @@ -43,6 +61,7 @@ def test_bad_map_exe(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map_invalid.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4
)


Expand All @@ -54,6 +73,7 @@ def test_missing_shebang(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce_invalid.py",
num_reducers=4
)


Expand All @@ -65,6 +85,7 @@ def test_empty_inputs(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand All @@ -80,6 +101,7 @@ def test_single_input_file(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand All @@ -97,6 +119,7 @@ def test_ignores_subdirs(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand Down
21 changes: 21 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ def test_simple(tmpdir):
)


def test_2_reducers(tmpdir):
"""Run a simple MapReduce job with 2 reducers."""
with tmpdir.as_cwd():
subprocess.run(
[
"madoop",
"-input", TESTDATA_DIR/"word_count/input",
"-output", "output",
"-mapper", TESTDATA_DIR/"word_count/map.py",
"-reducer", TESTDATA_DIR/"word_count/reduce.py",
"-numReduceTasks", "2",
],
stdout=subprocess.PIPE,
check=True,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output-2-reducers",
tmpdir/"output",
)


def test_verbose(tmpdir):
"""Run a simple MapReduce job and verify the verbose stdout."""
with tmpdir.as_cwd():
Expand Down
27 changes: 27 additions & 0 deletions tests/test_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,27 @@ def test_group_stage(tmpdir):
group_stage(
input_dir=TESTDATA_DIR/"word_count/correct/mapper-output",
output_dir=Path(tmpdir),
num_reducers=4,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/grouper-output",
tmpdir,
)


def test_group_stage_2_reducers(tmpdir):
"""Test group stage using word count example with 2 reducers."""
group_stage(
input_dir=TESTDATA_DIR/"word_count/correct/mapper-output",
output_dir=Path(tmpdir),
num_reducers=2,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/grouper-output-2-reducers",
tmpdir,
)


def test_reduce_stage(tmpdir):
"""Test reduce stage using word count example."""
reduce_stage(
Expand All @@ -41,3 +55,16 @@ def test_reduce_stage(tmpdir):
TESTDATA_DIR/"word_count/correct/reducer-output",
tmpdir,
)


def test_reduce_stage_2_reducers(tmpdir):
"""Test reduce stage using word count example with 2 reducers."""
reduce_stage(
exe=TESTDATA_DIR/"word_count/reduce.py",
input_dir=TESTDATA_DIR/"word_count/correct/grouper-output-2-reducers",
output_dir=Path(tmpdir),
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/reducer-output-2-reducers",
tmpdir,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Bye 1
Goodbye 1
Hadoop 1
Hadoop 1
World 1
World 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Hello 1
Hello 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Bye 1
Goodbye 1
Hadoop 2
World 2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Bye 1
Goodbye 1
Hadoop 2
World 2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello 2
Loading