diff --git a/madoop/__main__.py b/madoop/__main__.py index 713604c..90409c2 100644 --- a/madoop/__main__.py +++ b/madoop/__main__.py @@ -34,6 +34,10 @@ def main(): '-v', '--verbose', action='count', default=0, help="verbose output" ) + optional_args.add_argument( + '-numReduceTasks', dest='num_reducers', default=4, + help="max number of reducers" + ) required_args = parser.add_argument_group('required arguments') required_args.add_argument('-input', dest='input', required=True) required_args.add_argument('-output', dest='output', required=True) @@ -60,6 +64,7 @@ def main(): output_dir=args.output, map_exe=args.mapper, reduce_exe=args.reducer, + num_reducers=int(args.num_reducers) ) except MadoopError as err: sys.exit(f"Error: {err}") diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 26dfa38..6afb58b 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -19,14 +19,13 @@ MAX_INPUT_SPLIT_SIZE = 2**20 # 1 MB # The number of reducers is dynamically determined by the number of unique keys -# but will not be more than MAX_NUM_REDUCE -MAX_NUM_REDUCE = 4 +# but will not be more than num_reducers # Madoop logger LOGGER = logging.getLogger("madoop") -def mapreduce(input_path, output_dir, map_exe, reduce_exe): +def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers): """Madoop API.""" # Do not clobber existing output directory output_dir = pathlib.Path(output_dir) @@ -74,6 +73,7 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe): group_stage( input_dir=map_output_dir, output_dir=reduce_input_dir, + num_reducers=num_reducers ) # Run the reducing stage @@ -240,14 +240,19 @@ def keyhash(key): return int(hexdigest, base=16) -def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats): +def partition_keys( + inpath, + outpaths, + input_keys_stats, + output_keys_stats, + num_reducers): """Allocate lines of inpath among outpaths using hash of key. Update the data structures provided by the caller input_keys_stats and output_keys_stats. Both map a filename to a set of of keys. """ - assert len(outpaths) == MAX_NUM_REDUCE + assert len(outpaths) == num_reducers outparent = outpaths[0].parent assert all(i.parent == outparent for i in outpaths) with contextlib.ExitStack() as stack: @@ -255,13 +260,13 @@ def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats): for line in stack.enter_context(inpath.open()): key = line.partition('\t')[0] input_keys_stats[inpath].add(key) - reducer_idx = keyhash(key) % MAX_NUM_REDUCE + reducer_idx = keyhash(key) % num_reducers outfiles[reducer_idx].write(line) outpath = outpaths[reducer_idx] output_keys_stats[outpath].add(key) -def group_stage(input_dir, output_dir): +def group_stage(input_dir, output_dir, num_reducers): """Run group stage. Process each mapper output file, allocating lines to grouper output files @@ -269,8 +274,9 @@ def group_stage(input_dir, output_dir): """ # Compute output filenames + LOGGER.debug("%s reducers", num_reducers) outpaths = [] - for i in range(MAX_NUM_REDUCE): + for i in range(num_reducers): outpaths.append(output_dir/part_filename(i)) # Track keyspace stats, map filename -> set of keys @@ -279,7 +285,8 @@ def group_stage(input_dir, output_dir): # Partition input, appending to output files for inpath in sorted(input_dir.iterdir()): - partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats) + partition_keys(inpath, outpaths, input_keys_stats, + output_keys_stats, num_reducers) # Log input keyspace stats all_input_keys = set() diff --git a/tests/test_api.py b/tests/test_api.py index a75d711..8754970 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -13,6 +13,7 @@ def test_simple(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4 ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -20,6 +21,22 @@ def test_simple(tmpdir): ) +def test_2_reducers(tmpdir): + """Run a simple MapReduce job with 2 reducers.""" + with tmpdir.as_cwd(): + madoop.mapreduce( + input_path=TESTDATA_DIR/"word_count/input", + output_dir="output", + map_exe=TESTDATA_DIR/"word_count/map.py", + reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=2 + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/output-2-reducers", + tmpdir/"output", + ) + + def test_bash_executable(tmpdir): """Run a MapReduce job written in Bash.""" with tmpdir.as_cwd(): @@ -28,6 +45,7 @@ def test_bash_executable(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.sh", reduce_exe=TESTDATA_DIR/"word_count/reduce.sh", + num_reducers=4 ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -43,6 +61,7 @@ def test_bad_map_exe(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map_invalid.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4 ) @@ -54,6 +73,7 @@ def test_missing_shebang(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce_invalid.py", + num_reducers=4 ) @@ -65,6 +85,7 @@ def test_empty_inputs(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -80,6 +101,7 @@ def test_single_input_file(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -97,6 +119,7 @@ def test_ignores_subdirs(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4 ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", diff --git a/tests/test_cli.py b/tests/test_cli.py index 0d6c7f1..dab719e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -49,6 +49,27 @@ def test_simple(tmpdir): ) +def test_2_reducers(tmpdir): + """Run a simple MapReduce job with 2 reducers.""" + with tmpdir.as_cwd(): + subprocess.run( + [ + "madoop", + "-input", TESTDATA_DIR/"word_count/input", + "-output", "output", + "-mapper", TESTDATA_DIR/"word_count/map.py", + "-reducer", TESTDATA_DIR/"word_count/reduce.py", + "-numReduceTasks", "2", + ], + stdout=subprocess.PIPE, + check=True, + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/output-2-reducers", + tmpdir/"output", + ) + + def test_verbose(tmpdir): """Run a simple MapReduce job and verify the verbose stdout.""" with tmpdir.as_cwd(): diff --git a/tests/test_stages.py b/tests/test_stages.py index dad39f7..7ee82d0 100644 --- a/tests/test_stages.py +++ b/tests/test_stages.py @@ -23,6 +23,7 @@ def test_group_stage(tmpdir): group_stage( input_dir=TESTDATA_DIR/"word_count/correct/mapper-output", output_dir=Path(tmpdir), + num_reducers=4, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/grouper-output", @@ -30,6 +31,19 @@ def test_group_stage(tmpdir): ) +def test_group_stage_2_reducers(tmpdir): + """Test group stage using word count example with 2 reducers.""" + group_stage( + input_dir=TESTDATA_DIR/"word_count/correct/mapper-output", + output_dir=Path(tmpdir), + num_reducers=2, + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/grouper-output-2-reducers", + tmpdir, + ) + + def test_reduce_stage(tmpdir): """Test reduce stage using word count example.""" reduce_stage( @@ -41,3 +55,16 @@ def test_reduce_stage(tmpdir): TESTDATA_DIR/"word_count/correct/reducer-output", tmpdir, ) + + +def test_reduce_stage_2_reducers(tmpdir): + """Test reduce stage using word count example with 2 reducers.""" + reduce_stage( + exe=TESTDATA_DIR/"word_count/reduce.py", + input_dir=TESTDATA_DIR/"word_count/correct/grouper-output-2-reducers", + output_dir=Path(tmpdir), + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/reducer-output-2-reducers", + tmpdir, + ) diff --git a/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00000 b/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00000 new file mode 100644 index 0000000..48e2fd5 --- /dev/null +++ b/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00000 @@ -0,0 +1,6 @@ +Bye 1 +Goodbye 1 +Hadoop 1 +Hadoop 1 +World 1 +World 1 diff --git a/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00001 b/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00001 new file mode 100644 index 0000000..f7fef0c --- /dev/null +++ b/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00001 @@ -0,0 +1,2 @@ +Hello 1 +Hello 1 diff --git a/tests/testdata/word_count/correct/output-2-reducers/part-00000 b/tests/testdata/word_count/correct/output-2-reducers/part-00000 new file mode 100644 index 0000000..369bdd3 --- /dev/null +++ b/tests/testdata/word_count/correct/output-2-reducers/part-00000 @@ -0,0 +1,4 @@ +Bye 1 +Goodbye 1 +Hadoop 2 +World 2 diff --git a/tests/testdata/word_count/correct/output-2-reducers/part-00001 b/tests/testdata/word_count/correct/output-2-reducers/part-00001 new file mode 100644 index 0000000..30f4be7 --- /dev/null +++ b/tests/testdata/word_count/correct/output-2-reducers/part-00001 @@ -0,0 +1 @@ +Hello 2 diff --git a/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00000 b/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00000 new file mode 100644 index 0000000..369bdd3 --- /dev/null +++ b/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00000 @@ -0,0 +1,4 @@ +Bye 1 +Goodbye 1 +Hadoop 2 +World 2 diff --git a/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00001 b/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00001 new file mode 100644 index 0000000..30f4be7 --- /dev/null +++ b/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00001 @@ -0,0 +1 @@ +Hello 2