From d1c2ee4d1e3fb1a70714b18e54851a42fbeaeba2 Mon Sep 17 00:00:00 2001 From: Melina O'Dell Date: Fri, 24 Mar 2023 09:39:25 -0400 Subject: [PATCH 1/7] Add numReducers argument --- madoop/__main__.py | 5 +++++ madoop/mapreduce.py | 19 ++++++++++--------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/madoop/__main__.py b/madoop/__main__.py index 4559d07..b777d86 100644 --- a/madoop/__main__.py +++ b/madoop/__main__.py @@ -34,6 +34,10 @@ def main(): '-v', '--verbose', action='count', default=0, help="verbose output" ) + optional_args.add_argument( + '-numReduceTasks', dest='N', default=4, + help="specify the number of reducers" + ) required_args = parser.add_argument_group('required arguments') required_args.add_argument('-input', dest='input', required=True) required_args.add_argument('-output', dest='output', required=True) @@ -60,6 +64,7 @@ def main(): output_dir=args.output, map_exe=args.mapper, reduce_exe=args.reducer, + num_reducers=int(args.N) ) except MadoopError as err: sys.exit(f"Error: {err}") diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 0d540ec..24f7e82 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -19,14 +19,13 @@ MAX_INPUT_SPLIT_SIZE = 2**20 # 1 MB # The number of reducers is dynamically determined by the number of unique keys -# but will not be more than MAX_NUM_REDUCE -MAX_NUM_REDUCE = 4 +# but will not be more than num_reducers # Madoop logger LOGGER = logging.getLogger("madoop") -def mapreduce(input_dir, output_dir, map_exe, reduce_exe): +def mapreduce(input_dir, output_dir, map_exe, reduce_exe, num_reducers): """Madoop API.""" # Do not clobber existing output directory output_dir = pathlib.Path(output_dir) @@ -74,6 +73,7 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe): group_stage( input_dir=map_output_dir, output_dir=reduce_input_dir, + num_reducers=num_reducers ) # Run the reducing stage @@ -222,14 +222,14 @@ def keyhash(key): return int(hexdigest, base=16) -def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats): +def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats, num_reducers): """Allocate lines of inpath among outpaths using hash of key. Update the data structures provided by the caller input_keys_stats and output_keys_stats. Both map a filename to a set of of keys. """ - assert len(outpaths) == MAX_NUM_REDUCE + assert len(outpaths) == num_reducers outparent = outpaths[0].parent assert all(i.parent == outparent for i in outpaths) with contextlib.ExitStack() as stack: @@ -237,13 +237,13 @@ def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats): for line in stack.enter_context(inpath.open()): key = line.partition('\t')[0] input_keys_stats[inpath].add(key) - reducer_idx = keyhash(key) % MAX_NUM_REDUCE + reducer_idx = keyhash(key) % num_reducers outfiles[reducer_idx].write(line) outpath = outpaths[reducer_idx] output_keys_stats[outpath].add(key) -def group_stage(input_dir, output_dir): +def group_stage(input_dir, output_dir, num_reducers): """Run group stage. Process each mapper output file, allocating lines to grouper output files @@ -251,8 +251,9 @@ def group_stage(input_dir, output_dir): """ # Compute output filenames + LOGGER.debug("%s reducers", num_reducers) outpaths = [] - for i in range(MAX_NUM_REDUCE): + for i in range(num_reducers): outpaths.append(output_dir/part_filename(i)) # Track keyspace stats, map filename -> set of keys @@ -261,7 +262,7 @@ def group_stage(input_dir, output_dir): # Partition input, appending to output files for inpath in sorted(input_dir.iterdir()): - partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats) + partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats, num_reducers) # Log input keyspace stats all_input_keys = set() From 51016d12dfb55a652e1cddc56a806505c5f9ac74 Mon Sep 17 00:00:00 2001 From: Matt Mayfield Date: Thu, 2 Nov 2023 14:30:23 -0400 Subject: [PATCH 2/7] Fix style errors and update tests --- madoop/mapreduce.py | 10 ++++++++-- tests/test_api.py | 5 +++++ tests/test_stages.py | 1 + 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 24f7e82..61e7b88 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -222,7 +222,12 @@ def keyhash(key): return int(hexdigest, base=16) -def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats, num_reducers): +def partition_keys( + inpath, + outpaths, + input_keys_stats, + output_keys_stats, + num_reducers): """Allocate lines of inpath among outpaths using hash of key. Update the data structures provided by the caller input_keys_stats and @@ -262,7 +267,8 @@ def group_stage(input_dir, output_dir, num_reducers): # Partition input, appending to output files for inpath in sorted(input_dir.iterdir()): - partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats, num_reducers) + partition_keys(inpath, outpaths, input_keys_stats, + output_keys_stats, num_reducers) # Log input keyspace stats all_input_keys = set() diff --git a/tests/test_api.py b/tests/test_api.py index d222a62..76f65e5 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -13,6 +13,7 @@ def test_simple(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4 ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -28,6 +29,7 @@ def test_bash_executable(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.sh", reduce_exe=TESTDATA_DIR/"word_count/reduce.sh", + num_reducers=4 ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -43,6 +45,7 @@ def test_bad_map_exe(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map_invalid.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4 ) @@ -54,6 +57,7 @@ def test_missing_shebang(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce_invalid.py", + num_reducers=4 ) @@ -65,6 +69,7 @@ def test_empty_inputs(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4 ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", diff --git a/tests/test_stages.py b/tests/test_stages.py index dad39f7..3087060 100644 --- a/tests/test_stages.py +++ b/tests/test_stages.py @@ -23,6 +23,7 @@ def test_group_stage(tmpdir): group_stage( input_dir=TESTDATA_DIR/"word_count/correct/mapper-output", output_dir=Path(tmpdir), + num_reducers=4, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/grouper-output", From e990cd28d3c704cdbbf7e72a9269bd0b24d1505c Mon Sep 17 00:00:00 2001 From: Matt Mayfield Date: Thu, 2 Nov 2023 16:59:55 -0400 Subject: [PATCH 3/7] Add tests that use numReduceTasks=2 --- tests/test_api.py | 16 ++++++++++++ tests/test_cli.py | 21 +++++++++++++++ tests/test_stages.py | 26 +++++++++++++++++++ .../grouper-output-2-reducers/part-00000 | 6 +++++ .../grouper-output-2-reducers/part-00001 | 2 ++ .../correct/output-2-reducers/part-00000 | 4 +++ .../correct/output-2-reducers/part-00001 | 1 + .../reducer-output-2-reducers/part-00000 | 4 +++ .../reducer-output-2-reducers/part-00001 | 1 + 9 files changed, 81 insertions(+) create mode 100644 tests/testdata/word_count/correct/grouper-output-2-reducers/part-00000 create mode 100644 tests/testdata/word_count/correct/grouper-output-2-reducers/part-00001 create mode 100644 tests/testdata/word_count/correct/output-2-reducers/part-00000 create mode 100644 tests/testdata/word_count/correct/output-2-reducers/part-00001 create mode 100644 tests/testdata/word_count/correct/reducer-output-2-reducers/part-00000 create mode 100644 tests/testdata/word_count/correct/reducer-output-2-reducers/part-00001 diff --git a/tests/test_api.py b/tests/test_api.py index 76f65e5..03495d2 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -21,6 +21,22 @@ def test_simple(tmpdir): ) +def test_2_reducers(tmpdir): + """Run a simple MapReduce job with 2 reducers.""" + with tmpdir.as_cwd(): + madoop.mapreduce( + input_dir=TESTDATA_DIR/"word_count/input", + output_dir="output", + map_exe=TESTDATA_DIR/"word_count/map.py", + reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=2 + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/output-2-reducers", + tmpdir/"output", + ) + + def test_bash_executable(tmpdir): """Run a MapReduce job written in Bash.""" with tmpdir.as_cwd(): diff --git a/tests/test_cli.py b/tests/test_cli.py index 0d6c7f1..dab719e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -49,6 +49,27 @@ def test_simple(tmpdir): ) +def test_2_reducers(tmpdir): + """Run a simple MapReduce job with 2 reducers.""" + with tmpdir.as_cwd(): + subprocess.run( + [ + "madoop", + "-input", TESTDATA_DIR/"word_count/input", + "-output", "output", + "-mapper", TESTDATA_DIR/"word_count/map.py", + "-reducer", TESTDATA_DIR/"word_count/reduce.py", + "-numReduceTasks", "2", + ], + stdout=subprocess.PIPE, + check=True, + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/output-2-reducers", + tmpdir/"output", + ) + + def test_verbose(tmpdir): """Run a simple MapReduce job and verify the verbose stdout.""" with tmpdir.as_cwd(): diff --git a/tests/test_stages.py b/tests/test_stages.py index 3087060..7ee82d0 100644 --- a/tests/test_stages.py +++ b/tests/test_stages.py @@ -31,6 +31,19 @@ def test_group_stage(tmpdir): ) +def test_group_stage_2_reducers(tmpdir): + """Test group stage using word count example with 2 reducers.""" + group_stage( + input_dir=TESTDATA_DIR/"word_count/correct/mapper-output", + output_dir=Path(tmpdir), + num_reducers=2, + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/grouper-output-2-reducers", + tmpdir, + ) + + def test_reduce_stage(tmpdir): """Test reduce stage using word count example.""" reduce_stage( @@ -42,3 +55,16 @@ def test_reduce_stage(tmpdir): TESTDATA_DIR/"word_count/correct/reducer-output", tmpdir, ) + + +def test_reduce_stage_2_reducers(tmpdir): + """Test reduce stage using word count example with 2 reducers.""" + reduce_stage( + exe=TESTDATA_DIR/"word_count/reduce.py", + input_dir=TESTDATA_DIR/"word_count/correct/grouper-output-2-reducers", + output_dir=Path(tmpdir), + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/reducer-output-2-reducers", + tmpdir, + ) diff --git a/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00000 b/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00000 new file mode 100644 index 0000000..48e2fd5 --- /dev/null +++ b/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00000 @@ -0,0 +1,6 @@ +Bye 1 +Goodbye 1 +Hadoop 1 +Hadoop 1 +World 1 +World 1 diff --git a/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00001 b/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00001 new file mode 100644 index 0000000..f7fef0c --- /dev/null +++ b/tests/testdata/word_count/correct/grouper-output-2-reducers/part-00001 @@ -0,0 +1,2 @@ +Hello 1 +Hello 1 diff --git a/tests/testdata/word_count/correct/output-2-reducers/part-00000 b/tests/testdata/word_count/correct/output-2-reducers/part-00000 new file mode 100644 index 0000000..369bdd3 --- /dev/null +++ b/tests/testdata/word_count/correct/output-2-reducers/part-00000 @@ -0,0 +1,4 @@ +Bye 1 +Goodbye 1 +Hadoop 2 +World 2 diff --git a/tests/testdata/word_count/correct/output-2-reducers/part-00001 b/tests/testdata/word_count/correct/output-2-reducers/part-00001 new file mode 100644 index 0000000..30f4be7 --- /dev/null +++ b/tests/testdata/word_count/correct/output-2-reducers/part-00001 @@ -0,0 +1 @@ +Hello 2 diff --git a/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00000 b/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00000 new file mode 100644 index 0000000..369bdd3 --- /dev/null +++ b/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00000 @@ -0,0 +1,4 @@ +Bye 1 +Goodbye 1 +Hadoop 2 +World 2 diff --git a/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00001 b/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00001 new file mode 100644 index 0000000..30f4be7 --- /dev/null +++ b/tests/testdata/word_count/correct/reducer-output-2-reducers/part-00001 @@ -0,0 +1 @@ +Hello 2 From b3ce74e22e377f9e41ad6f876bd70e563994e6f2 Mon Sep 17 00:00:00 2001 From: Andrew DeOrio Date: Sat, 4 Nov 2023 10:42:41 -0400 Subject: [PATCH 4/7] fix tests --- tests/test_api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_api.py b/tests/test_api.py index f6b9139..8754970 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -25,7 +25,7 @@ def test_2_reducers(tmpdir): """Run a simple MapReduce job with 2 reducers.""" with tmpdir.as_cwd(): madoop.mapreduce( - input_dir=TESTDATA_DIR/"word_count/input", + input_path=TESTDATA_DIR/"word_count/input", output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", @@ -85,6 +85,7 @@ def test_empty_inputs(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -100,6 +101,7 @@ def test_single_input_file(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", From eb8b3e4e2a8c4978ab22e214d53d6a4edb0c851b Mon Sep 17 00:00:00 2001 From: Andrew DeOrio Date: Sat, 4 Nov 2023 10:43:02 -0400 Subject: [PATCH 5/7] N -> num_reducers --- madoop/__main__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/madoop/__main__.py b/madoop/__main__.py index 822fd32..f46adef 100644 --- a/madoop/__main__.py +++ b/madoop/__main__.py @@ -35,7 +35,7 @@ def main(): help="verbose output" ) optional_args.add_argument( - '-numReduceTasks', dest='N', default=4, + '-numReduceTasks', dest='num_reducers', default=4, help="specify the number of reducers" ) required_args = parser.add_argument_group('required arguments') @@ -64,7 +64,7 @@ def main(): output_dir=args.output, map_exe=args.mapper, reduce_exe=args.reducer, - num_reducers=int(args.N) + num_reducers=int(args.num_reducers) ) except MadoopError as err: sys.exit(f"Error: {err}") From d93cf47ade2023c2c94165a1ebc136335dc4c316 Mon Sep 17 00:00:00 2001 From: Andrew DeOrio Date: Sat, 4 Nov 2023 10:48:49 -0400 Subject: [PATCH 6/7] specify -> hint --- madoop/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/madoop/__main__.py b/madoop/__main__.py index f46adef..16844ee 100644 --- a/madoop/__main__.py +++ b/madoop/__main__.py @@ -36,7 +36,7 @@ def main(): ) optional_args.add_argument( '-numReduceTasks', dest='num_reducers', default=4, - help="specify the number of reducers" + help="hint the number of reducers" ) required_args = parser.add_argument_group('required arguments') required_args.add_argument('-input', dest='input', required=True) From 28b2733a89d01f8b62c494af35959a926cd0963b Mon Sep 17 00:00:00 2001 From: Andrew DeOrio Date: Sat, 4 Nov 2023 10:49:37 -0400 Subject: [PATCH 7/7] hint -> max --- madoop/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/madoop/__main__.py b/madoop/__main__.py index 16844ee..90409c2 100644 --- a/madoop/__main__.py +++ b/madoop/__main__.py @@ -36,7 +36,7 @@ def main(): ) optional_args.add_argument( '-numReduceTasks', dest='num_reducers', default=4, - help="hint the number of reducers" + help="max number of reducers" ) required_args = parser.add_argument_group('required arguments') required_args.add_argument('-input', dest='input', required=True)