diff --git a/madoop/__main__.py b/madoop/__main__.py index 90409c2..f1482d4 100644 --- a/madoop/__main__.py +++ b/madoop/__main__.py @@ -38,6 +38,11 @@ def main(): '-numReduceTasks', dest='num_reducers', default=4, help="max number of reducers" ) + optional_args.add_argument( + '-partitioner', dest='partitioner', default=None, + help=("executable that computes a partition for each key-value pair of" + "map output: default is hash(key) %% num_reducers"), + ) required_args = parser.add_argument_group('required arguments') required_args.add_argument('-input', dest='input', required=True) required_args.add_argument('-output', dest='output', required=True) @@ -64,7 +69,8 @@ def main(): output_dir=args.output, map_exe=args.mapper, reduce_exe=args.reducer, - num_reducers=int(args.num_reducers) + num_reducers=int(args.num_reducers), + partitioner=args.partitioner, ) except MadoopError as err: sys.exit(f"Error: {err}") diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 6afb58b..9f0772e 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -25,8 +25,16 @@ LOGGER = logging.getLogger("madoop") -def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers): +def mapreduce( + input_path, + output_dir, + map_exe, + reduce_exe, + num_reducers, + partitioner, +): """Madoop API.""" + # pylint: disable=too-many-arguments # Do not clobber existing output directory output_dir = pathlib.Path(output_dir) if output_dir.exists(): @@ -73,7 +81,8 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers): group_stage( input_dir=map_output_dir, output_dir=reduce_input_dir, - num_reducers=num_reducers + num_reducers=num_reducers, + partitioner=partitioner, ) # Run the reducing stage @@ -240,7 +249,7 @@ def keyhash(key): return int(hexdigest, base=16) -def partition_keys( +def partition_keys_default( inpath, outpaths, input_keys_stats, @@ -250,7 +259,6 @@ def partition_keys( Update the data structures provided by the caller input_keys_stats and output_keys_stats. Both map a filename to a set of of keys. - """ assert len(outpaths) == num_reducers outparent = outpaths[0].parent @@ -266,7 +274,75 @@ def partition_keys( output_keys_stats[outpath].add(key) -def group_stage(input_dir, output_dir, num_reducers): +def partition_keys_custom( + inpath, + outpaths, + input_keys_stats, + output_keys_stats, + num_reducers, + partitioner, +): + """Allocate lines of inpath among outpaths using a custom partitioner. + + Update the data structures provided by the caller input_keys_stats and + output_keys_stats. Both map a filename to a set of of keys. + """ + # pylint: disable=too-many-arguments,too-many-locals + assert len(outpaths) == num_reducers + outparent = outpaths[0].parent + assert all(i.parent == outparent for i in outpaths) + with contextlib.ExitStack() as stack: + outfiles = [stack.enter_context(p.open("a")) for p in outpaths] + process = stack.enter_context(subprocess.Popen( + [partitioner, str(num_reducers)], + stdin=stack.enter_context(inpath.open()), + stdout=subprocess.PIPE, + text=True, + )) + for line, partition in zip( + stack.enter_context(inpath.open()), + stack.enter_context(process.stdout) + ): + try: + partition = int(partition) + except ValueError as err: + raise MadoopError( + "Partition executable returned non-integer value: " + f"{partition}." + ) from err + key = line.partition('\t')[0] + input_keys_stats[inpath].add(key) + outfiles[partition].write(line) + outpath = outpaths[partition] + output_keys_stats[outpath].add(key) + + return_code = process.wait() + if return_code: + raise MadoopError( + f"Partition executable returned non-zero: {str(partitioner)}" + ) + + +def log_input_key_stats(input_keys_stats, input_dir): + """Log input key stats.""" + all_input_keys = set() + for inpath, keys in sorted(input_keys_stats.items()): + all_input_keys.update(keys) + LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys)) + LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys)) + + +def log_output_key_stats(output_keys_stats, output_dir): + """Log output keyspace stats.""" + all_output_keys = set() + for outpath, keys in sorted(output_keys_stats.items()): + all_output_keys.update(keys) + LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys)) + LOGGER.debug("%s all_unique_keys=%s", output_dir.name, + len(all_output_keys)) + + +def group_stage(input_dir, output_dir, num_reducers, partitioner): """Run group stage. Process each mapper output file, allocating lines to grouper output files @@ -285,15 +361,14 @@ def group_stage(input_dir, output_dir, num_reducers): # Partition input, appending to output files for inpath in sorted(input_dir.iterdir()): - partition_keys(inpath, outpaths, input_keys_stats, - output_keys_stats, num_reducers) + if not partitioner: + partition_keys_default(inpath, outpaths, input_keys_stats, + output_keys_stats, num_reducers) + else: + partition_keys_custom(inpath, outpaths, input_keys_stats, + output_keys_stats, num_reducers, partitioner) - # Log input keyspace stats - all_input_keys = set() - for inpath, keys in sorted(input_keys_stats.items()): - all_input_keys.update(keys) - LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys)) - LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys)) + log_input_key_stats(input_keys_stats, input_dir) # Log partition input and output filenames outnames = [i.name for i in outpaths] @@ -315,13 +390,7 @@ def group_stage(input_dir, output_dir, num_reducers): for path in sorted(output_dir.iterdir()): sort_file(path) - # Log output keyspace stats - all_output_keys = set() - for outpath, keys in sorted(output_keys_stats.items()): - all_output_keys.update(keys) - LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys)) - LOGGER.debug("%s all_unique_keys=%s", output_dir.name, - len(all_output_keys)) + log_output_key_stats(output_keys_stats, output_dir) def reduce_stage(exe, input_dir, output_dir): diff --git a/tests/test_api.py b/tests/test_api.py index 8754970..d9a8658 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -13,7 +13,8 @@ def test_simple(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", - num_reducers=4 + num_reducers=4, + partitioner=None, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -29,7 +30,8 @@ def test_2_reducers(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", - num_reducers=2 + num_reducers=2, + partitioner=None, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output-2-reducers", @@ -45,7 +47,8 @@ def test_bash_executable(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.sh", reduce_exe=TESTDATA_DIR/"word_count/reduce.sh", - num_reducers=4 + num_reducers=4, + partitioner=None, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -61,7 +64,34 @@ def test_bad_map_exe(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map_invalid.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", - num_reducers=4 + num_reducers=4, + partitioner=None, + ) + + +def test_bad_partition_exe(tmpdir): + """Partition exe returns non-zero should produce an error message.""" + with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): + madoop.mapreduce( + input_path=TESTDATA_DIR/"word_count/input", + output_dir="output", + map_exe=TESTDATA_DIR/"word_count/map.py", + reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4, + partitioner=TESTDATA_DIR/"word_count/partition_invalid.py", + ) + + +def test_noninteger_partition_exe(tmpdir): + """Partition exe prints non-integer should produce an error message.""" + with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): + madoop.mapreduce( + input_path=TESTDATA_DIR/"word_count/input", + output_dir="output", + map_exe=TESTDATA_DIR/"word_count/map.py", + reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=4, + partitioner=TESTDATA_DIR/"word_count/partition_noninteger.py", ) @@ -73,7 +103,8 @@ def test_missing_shebang(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce_invalid.py", - num_reducers=4 + num_reducers=4, + partitioner=None, ) @@ -86,6 +117,7 @@ def test_empty_inputs(tmpdir): map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", num_reducers=4, + partitioner=None, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -102,6 +134,7 @@ def test_single_input_file(tmpdir): map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", num_reducers=4, + partitioner=None, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", @@ -119,7 +152,8 @@ def test_ignores_subdirs(tmpdir): output_dir="output", map_exe=TESTDATA_DIR/"word_count/map.py", reduce_exe=TESTDATA_DIR/"word_count/reduce.py", - num_reducers=4 + num_reducers=4, + partitioner=None, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/output", diff --git a/tests/test_stages.py b/tests/test_stages.py index 7ee82d0..2bc58b0 100644 --- a/tests/test_stages.py +++ b/tests/test_stages.py @@ -24,6 +24,7 @@ def test_group_stage(tmpdir): input_dir=TESTDATA_DIR/"word_count/correct/mapper-output", output_dir=Path(tmpdir), num_reducers=4, + partitioner=None, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/grouper-output", @@ -37,6 +38,7 @@ def test_group_stage_2_reducers(tmpdir): input_dir=TESTDATA_DIR/"word_count/correct/mapper-output", output_dir=Path(tmpdir), num_reducers=2, + partitioner=None, ) utils.assert_dirs_eq( TESTDATA_DIR/"word_count/correct/grouper-output-2-reducers", @@ -44,6 +46,20 @@ def test_group_stage_2_reducers(tmpdir): ) +def test_group_stage_custom_partitioner(tmpdir): + """Test group stage using word count example with custom partitioner.""" + group_stage( + input_dir=TESTDATA_DIR/"word_count/correct/mapper-output", + output_dir=Path(tmpdir), + num_reducers=2, + partitioner=TESTDATA_DIR/"word_count/partition.py", + ) + utils.assert_dirs_eq( + TESTDATA_DIR/"word_count/correct/grouper-output-custom-partitioner", + tmpdir, + ) + + def test_reduce_stage(tmpdir): """Test reduce stage using word count example.""" reduce_stage( diff --git a/tests/testdata/word_count/correct/grouper-output-custom-partitioner/part-00000 b/tests/testdata/word_count/correct/grouper-output-custom-partitioner/part-00000 new file mode 100644 index 0000000..2233edf --- /dev/null +++ b/tests/testdata/word_count/correct/grouper-output-custom-partitioner/part-00000 @@ -0,0 +1,2 @@ +Bye 1 +Goodbye 1 diff --git a/tests/testdata/word_count/correct/grouper-output-custom-partitioner/part-00001 b/tests/testdata/word_count/correct/grouper-output-custom-partitioner/part-00001 new file mode 100644 index 0000000..b03b5f2 --- /dev/null +++ b/tests/testdata/word_count/correct/grouper-output-custom-partitioner/part-00001 @@ -0,0 +1,6 @@ +Hadoop 1 +Hadoop 1 +Hello 1 +Hello 1 +World 1 +World 1 diff --git a/tests/testdata/word_count/partition.py b/tests/testdata/word_count/partition.py new file mode 100755 index 0000000..ec34eca --- /dev/null +++ b/tests/testdata/word_count/partition.py @@ -0,0 +1,14 @@ +#!/usr/bin/env -S python3 -u +"""Word count partitioner.""" +import sys + + +num_reducers = int(sys.argv[1]) + + +for line in sys.stdin: + key, value = line.split("\t") + if key[0] <= "G": + print(0 % num_reducers) + else: + print(1 % num_reducers) diff --git a/tests/testdata/word_count/partition_invalid.py b/tests/testdata/word_count/partition_invalid.py new file mode 100755 index 0000000..c0eb95d --- /dev/null +++ b/tests/testdata/word_count/partition_invalid.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +"""Invalid partition executable returns non-zero.""" + +import sys + +sys.exit(1) diff --git a/tests/testdata/word_count/partition_noninteger.py b/tests/testdata/word_count/partition_noninteger.py new file mode 100755 index 0000000..f4abff3 --- /dev/null +++ b/tests/testdata/word_count/partition_noninteger.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 +"""Invalid partition executable prints a non-integer value.""" + +print("hello world")