Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into performance-optimi…
Browse files Browse the repository at this point in the history
…zations
  • Loading branch information
awdeorio committed Nov 6, 2024
2 parents f0771ac + 19b4177 commit 48e1ba8
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 30 deletions.
12 changes: 6 additions & 6 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ $ tox -e py3
Update your local `develop` branch. Make sure it's clean.
```console
$ git fetch
$ git checkout develop
$ git switch develop
$ git rebase
$ git status
```
Expand All @@ -56,24 +56,24 @@ $ tox -e py3

Update version
```console
$ $EDITOR setup.py
$ git commit -m "version bump" setup.py
$ $EDITOR pyproject.toml
$ git commit -m "version bump" pyproject.toml
$ git push origin develop
```

Update main branch
```console
$ git fetch
$ git checkout main
$ git switch main
$ git rebase
$ git merge --no-ff origin/develop
```

Tag a release
```console
$ git tag -a X.Y.Z
$ grep version= setup.py
version="X.Y.Z",
$ grep version pyproject.toml
version = "X.Y.Z"
$ git describe
X.Y.Z
$ git push --tags origin main
Expand Down
34 changes: 34 additions & 0 deletions README_Hadoop_Streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,37 @@ def reduce_one_group(key, group):
for line in group:
pass # Do something
```

## Custom partitioner
If you need to specify which key-value pairs are sent to which reducers, you can create a custom partitioner. Here's a sample which works with our word count example.
```python
#!/usr/bin/env -S python3 -u
"""Word count partitioner."""
import sys


num_reducers = int(sys.argv[1])


for line in sys.stdin:
key, value = line.split("\t")
if key[0] <= "G":
print(0 % num_reducers)
else:
print(1 % num_reducers)
```

Each line of output from the mappers is streamed to this partition file, and the number of reducers is set to `sys.argv[1]`. For each line, the partitioner checks whether the first letter of the key is less than or greater than "G". If it's less than "G", the line is sent to the first reducer, and if it's greater, the line is sent to the second reducer.

Use the `-partitioner` command-line argument to tell Madoop to use this partitioner.

```console
$ madoop \
-input example/input \
-output example/output \
-mapper example/map.py \
-reducer example/reduce.py \
-partitioner example/partition.py
```

This feature is similar to Hadoop's [`Partitioner` class](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/Partitioner.html), although it is not directly compatible with Hadoop. The main difference is that Hadoop only allows partitioners to be a Java class, while Madoop allows any executable that reads from `stdin` and writes to `stdout`.
14 changes: 10 additions & 4 deletions madoop/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
"""
import argparse
import importlib.metadata
import logging
import pathlib
import shutil
import sys
import textwrap
import pkg_resources
from .mapreduce import mapreduce
from .exceptions import MadoopError

Expand All @@ -21,10 +21,10 @@ def main():
)

optional_args = parser.add_argument_group('optional arguments')
version = pkg_resources.get_distribution("madoop").version

optional_args.add_argument(
'--version', action='version',
version=f'Madoop {version}'
version=f'Madoop {importlib.metadata.version("madoop")}'
)
optional_args.add_argument(
'--example', action=ExampleAction, nargs=0,
Expand All @@ -38,6 +38,11 @@ def main():
'-numReduceTasks', dest='num_reducers', default=4,
help="max number of reducers"
)
optional_args.add_argument(
'-partitioner', dest='partitioner', default=None,
help=("executable that computes a partition for each key-value pair "
"of map output: default is hash(key) %% num_reducers"),
)
required_args = parser.add_argument_group('required arguments')
required_args.add_argument('-input', dest='input', required=True)
required_args.add_argument('-output', dest='output', required=True)
Expand All @@ -64,7 +69,8 @@ def main():
output_dir=args.output,
map_exe=args.mapper,
reduce_exe=args.reducer,
num_reducers=int(args.num_reducers)
num_reducers=int(args.num_reducers),
partitioner=args.partitioner,
)
except MadoopError as err:
sys.exit(f"Error: {err}")
Expand Down
94 changes: 81 additions & 13 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,25 @@
# Large input files are automatically split
MAX_INPUT_SPLIT_SIZE = 10 * 1024 * 1024 # 10 MB

# The number of reducers is dynamically determined by the number of unique keys
# but will not be more than num_reducers

# Madoop logger
LOGGER = logging.getLogger("madoop")


def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers):
"""Madoop API."""
def mapreduce(
input_path,
output_dir,
map_exe,
reduce_exe,
num_reducers,
partitioner=None,
):
"""Madoop API.
The number of reducers is dynamically determined by the number of unique
keys but will not be more than num_reducers
"""
# pylint: disable=too-many-arguments
# Do not clobber existing output directory
output_dir = pathlib.Path(output_dir)
if output_dir.exists():
Expand Down Expand Up @@ -71,7 +81,8 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers):
group_stage(
input_dir=map_output_dir,
output_dir=reduce_input_dir,
num_reducers=num_reducers
num_reducers=num_reducers,
partitioner=partitioner,
)

# Run the reducing stage
Expand Down Expand Up @@ -160,13 +171,13 @@ def is_executable(exe):
try:
subprocess.run(
str(exe),
shell=True,
shell=False,
input="".encode(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
except subprocess.CalledProcessError as err:
except (subprocess.CalledProcessError, OSError) as err:
raise MadoopError(f"Failed executable test: {err}") from err


Expand Down Expand Up @@ -243,7 +254,7 @@ def keyhash(key):
return int(hexdigest, base=16)


def partition_keys(
def partition_keys_default(
inpath,
outpaths,
input_keys_stats,
Expand All @@ -253,7 +264,6 @@ def partition_keys(
Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.
"""
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
Expand All @@ -269,6 +279,60 @@ def partition_keys(
output_keys_stats[outpath].add(key)


def partition_keys_custom(
inpath,
outpaths,
input_keys_stats,
output_keys_stats,
num_reducers,
partitioner,
):
"""Allocate lines of inpath among outpaths using a custom partitioner.
Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.
"""
# pylint: disable=too-many-arguments,too-many-locals
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
with contextlib.ExitStack() as stack:
outfiles = [stack.enter_context(p.open("a")) for p in outpaths]
process = stack.enter_context(subprocess.Popen(
[partitioner, str(num_reducers)],
stdin=stack.enter_context(inpath.open()),
stdout=subprocess.PIPE,
text=True,
))
for line, partition in zip(
stack.enter_context(inpath.open()),
stack.enter_context(process.stdout)
):
try:
partition = int(partition)
except ValueError as err:
raise MadoopError(
"Partition executable returned non-integer value: "
f"{partition} for line '{line}'."
) from err
if not 0 <= partition < num_reducers:
raise MadoopError(
"Partition executable returned invalid value: "
f"0 <= {partition} < {num_reducers} for line '{line}'."
)
key = line.partition('\t')[0]
input_keys_stats[inpath].add(key)
outfiles[partition].write(line)
outpath = outpaths[partition]
output_keys_stats[outpath].add(key)

return_code = process.wait()
if return_code:
raise MadoopError(
f"Partition executable returned non-zero: {str(partitioner)}"
)


def log_input_key_stats(input_keys_stats, input_dir):
"""Log input key stats."""
all_input_keys = set()
Expand All @@ -288,7 +352,7 @@ def log_output_key_stats(output_keys_stats, output_dir):
len(all_output_keys))


def group_stage(input_dir, output_dir, num_reducers):
def group_stage(input_dir, output_dir, num_reducers, partitioner):
"""Run group stage.
Process each mapper output file, allocating lines to grouper output files
Expand All @@ -307,8 +371,12 @@ def group_stage(input_dir, output_dir, num_reducers):

# Partition input, appending to output files
for inpath in sorted(input_dir.iterdir()):
partition_keys(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers)
if not partitioner:
partition_keys_default(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers)
else:
partition_keys_custom(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers, partitioner)

log_input_key_stats(input_keys_stats, input_dir)

Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "madoop"
version = "1.1.0"
version = "1.2.2"
description="A light weight MapReduce framework for education."
license = {file = "LICENSE"}
authors = [
Expand All @@ -25,7 +25,6 @@ madoop = "madoop.__main__:main"

[project.optional-dependencies]
dev = [
"pdbpp",
"build",
"twine",
"tox",
Expand Down
Loading

0 comments on commit 48e1ba8

Please sign in to comment.