Skip to content

Commit

Permalink
Add support for custom partitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
noah-weingarden committed Nov 26, 2023
1 parent 5408a7d commit 09d275f
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 27 deletions.
8 changes: 7 additions & 1 deletion madoop/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def main():
'-numReduceTasks', dest='num_reducers', default=4,
help="max number of reducers"
)
optional_args.add_argument(
'-partitioner', dest='partitioner', default=None,
help=("executable that computes a partition for each key-value pair of"
"map output: default is hash(key) %% num_reducers"),
)
required_args = parser.add_argument_group('required arguments')
required_args.add_argument('-input', dest='input', required=True)
required_args.add_argument('-output', dest='output', required=True)
Expand All @@ -64,7 +69,8 @@ def main():
output_dir=args.output,
map_exe=args.mapper,
reduce_exe=args.reducer,
num_reducers=int(args.num_reducers)
num_reducers=int(args.num_reducers),
partitioner=args.partitioner,
)
except MadoopError as err:
sys.exit(f"Error: {err}")
Expand Down
109 changes: 89 additions & 20 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@
LOGGER = logging.getLogger("madoop")


def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers):
def mapreduce(
input_path,
output_dir,
map_exe,
reduce_exe,
num_reducers,
partitioner,
):
"""Madoop API."""
# pylint: disable=too-many-arguments
# Do not clobber existing output directory
output_dir = pathlib.Path(output_dir)
if output_dir.exists():
Expand Down Expand Up @@ -73,7 +81,8 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers):
group_stage(
input_dir=map_output_dir,
output_dir=reduce_input_dir,
num_reducers=num_reducers
num_reducers=num_reducers,
partitioner=partitioner,
)

# Run the reducing stage
Expand Down Expand Up @@ -240,7 +249,7 @@ def keyhash(key):
return int(hexdigest, base=16)


def partition_keys(
def partition_keys_default(
inpath,
outpaths,
input_keys_stats,
Expand All @@ -250,7 +259,6 @@ def partition_keys(
Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.
"""
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
Expand All @@ -266,7 +274,75 @@ def partition_keys(
output_keys_stats[outpath].add(key)


def group_stage(input_dir, output_dir, num_reducers):
def partition_keys_custom(
inpath,
outpaths,
input_keys_stats,
output_keys_stats,
num_reducers,
partitioner,
):
"""Allocate lines of inpath among outpaths using a custom partitioner.
Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.
"""
# pylint: disable=too-many-arguments,too-many-locals
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
with contextlib.ExitStack() as stack:
outfiles = [stack.enter_context(p.open("a")) for p in outpaths]
process = stack.enter_context(subprocess.Popen(
[partitioner, str(num_reducers)],
stdin=stack.enter_context(inpath.open()),
stdout=subprocess.PIPE,
text=True,
))
for line, partition in zip(
stack.enter_context(inpath.open()),
stack.enter_context(process.stdout)
):
try:
partition = int(partition)
except ValueError as err:
raise MadoopError(
"Partition executable returned non-integer value: "
f"{partition}."
) from err
key = line.partition('\t')[0]
input_keys_stats[inpath].add(key)
outfiles[partition].write(line)
outpath = outpaths[partition]
output_keys_stats[outpath].add(key)

return_code = process.wait()
if return_code:
raise MadoopError(
f"Partition executable returned non-zero: {str(partitioner)}"
)


def log_input_key_stats(input_keys_stats, input_dir):
"""Log input key stats."""
all_input_keys = set()
for inpath, keys in sorted(input_keys_stats.items()):
all_input_keys.update(keys)
LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys))
LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys))


def log_output_key_stats(output_keys_stats, output_dir):
"""Log output keyspace stats."""
all_output_keys = set()
for outpath, keys in sorted(output_keys_stats.items()):
all_output_keys.update(keys)
LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys))
LOGGER.debug("%s all_unique_keys=%s", output_dir.name,
len(all_output_keys))


def group_stage(input_dir, output_dir, num_reducers, partitioner):
"""Run group stage.
Process each mapper output file, allocating lines to grouper output files
Expand All @@ -285,15 +361,14 @@ def group_stage(input_dir, output_dir, num_reducers):

# Partition input, appending to output files
for inpath in sorted(input_dir.iterdir()):
partition_keys(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers)
if not partitioner:
partition_keys_default(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers)
else:
partition_keys_custom(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers, partitioner)

# Log input keyspace stats
all_input_keys = set()
for inpath, keys in sorted(input_keys_stats.items()):
all_input_keys.update(keys)
LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys))
LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys))
log_input_key_stats(input_keys_stats, input_dir)

# Log partition input and output filenames
outnames = [i.name for i in outpaths]
Expand All @@ -315,13 +390,7 @@ def group_stage(input_dir, output_dir, num_reducers):
for path in sorted(output_dir.iterdir()):
sort_file(path)

# Log output keyspace stats
all_output_keys = set()
for outpath, keys in sorted(output_keys_stats.items()):
all_output_keys.update(keys)
LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys))
LOGGER.debug("%s all_unique_keys=%s", output_dir.name,
len(all_output_keys))
log_output_key_stats(output_keys_stats, output_dir)


def reduce_stage(exe, input_dir, output_dir):
Expand Down
46 changes: 40 additions & 6 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def test_simple(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4
num_reducers=4,
partitioner=None,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand All @@ -29,7 +30,8 @@ def test_2_reducers(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=2
num_reducers=2,
partitioner=None,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output-2-reducers",
Expand All @@ -45,7 +47,8 @@ def test_bash_executable(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.sh",
reduce_exe=TESTDATA_DIR/"word_count/reduce.sh",
num_reducers=4
num_reducers=4,
partitioner=None,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand All @@ -61,7 +64,34 @@ def test_bad_map_exe(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map_invalid.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4
num_reducers=4,
partitioner=None,
)


def test_bad_partition_exe(tmpdir):
"""Partition exe returns non-zero should produce an error message."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
madoop.mapreduce(
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4,
partitioner=TESTDATA_DIR/"word_count/partition_invalid.py",
)


def test_noninteger_partition_exe(tmpdir):
"""Partition exe prints non-integer should produce an error message."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
madoop.mapreduce(
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4,
partitioner=TESTDATA_DIR/"word_count/partition_noninteger.py",
)


Expand All @@ -73,7 +103,8 @@ def test_missing_shebang(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce_invalid.py",
num_reducers=4
num_reducers=4,
partitioner=None,
)


Expand All @@ -86,6 +117,7 @@ def test_empty_inputs(tmpdir):
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4,
partitioner=None,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand All @@ -102,6 +134,7 @@ def test_single_input_file(tmpdir):
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4,
partitioner=None,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand All @@ -119,7 +152,8 @@ def test_ignores_subdirs(tmpdir):
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=4
num_reducers=4,
partitioner=None,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
Expand Down
16 changes: 16 additions & 0 deletions tests/test_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def test_group_stage(tmpdir):
input_dir=TESTDATA_DIR/"word_count/correct/mapper-output",
output_dir=Path(tmpdir),
num_reducers=4,
partitioner=None,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/grouper-output",
Expand All @@ -37,13 +38,28 @@ def test_group_stage_2_reducers(tmpdir):
input_dir=TESTDATA_DIR/"word_count/correct/mapper-output",
output_dir=Path(tmpdir),
num_reducers=2,
partitioner=None,
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/grouper-output-2-reducers",
tmpdir,
)


def test_group_stage_custom_partitioner(tmpdir):
"""Test group stage using word count example with custom partitioner."""
group_stage(
input_dir=TESTDATA_DIR/"word_count/correct/mapper-output",
output_dir=Path(tmpdir),
num_reducers=2,
partitioner=TESTDATA_DIR/"word_count/partition.py",
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/grouper-output-custom-partitioner",
tmpdir,
)


def test_reduce_stage(tmpdir):
"""Test reduce stage using word count example."""
reduce_stage(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Bye 1
Goodbye 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Hadoop 1
Hadoop 1
Hello 1
Hello 1
World 1
World 1
14 changes: 14 additions & 0 deletions tests/testdata/word_count/partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env -S python3 -u
"""Word count partitioner."""
import sys


num_reducers = int(sys.argv[1])


for line in sys.stdin:
key, value = line.split("\t")
if key[0] <= "G":
print(0 % num_reducers)
else:
print(1 % num_reducers)
6 changes: 6 additions & 0 deletions tests/testdata/word_count/partition_invalid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python3
"""Invalid partition executable returns non-zero."""

import sys

sys.exit(1)
4 changes: 4 additions & 0 deletions tests/testdata/word_count/partition_noninteger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env python3
"""Invalid partition executable prints a non-integer value."""

print("hello world")

0 comments on commit 09d275f

Please sign in to comment.