Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizations #70

Merged
merged 18 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
concurrency = thread,multiprocessing
parallel = true
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ build/
/.tox/
/.coverage*
*,cover
# Make an exception for .coveragerc, which should be checked into version control.
!.coveragerc

# Text editors and IDEs
*~
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ graft madoop/example

# Avoid dev and and binary files
exclude tox.ini
exclude .coveragerc
exclude .editorconfig
global-exclude *.pyc
global-exclude __pycache__
230 changes: 132 additions & 98 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,37 @@
import collections
import hashlib
import logging
import math
import pathlib
import shutil
import subprocess
import tempfile
import multiprocessing
import concurrent.futures
from .exceptions import MadoopError


# Large input files are automatically split
MAX_INPUT_SPLIT_SIZE = 2**21 # 2 MB

# The number of reducers is dynamically determined by the number of unique keys
# but will not be more than num_reducers
MAX_INPUT_SPLIT_SIZE = 10 * 1024 * 1024 # 10 MB

# Madoop logger
LOGGER = logging.getLogger("madoop")


def mapreduce(
*,
input_path,
output_dir,
map_exe,
reduce_exe,
num_reducers,
partitioner=None,
):
"""Madoop API."""
"""Madoop API.

The number of reducers is dynamically determined by the number of unique
keys but will not be more than num_reducers

"""
# pylint: disable=too-many-arguments
# Do not clobber existing output directory
output_dir = pathlib.Path(output_dir)
Expand All @@ -51,18 +55,15 @@ def mapreduce(
LOGGER.debug("tmpdir=%s", tmpdir)

# Create stage input and output directory
map_input_dir = tmpdir/'input'
map_output_dir = tmpdir/'mapper-output'
reduce_input_dir = tmpdir/'reducer-input'
reduce_output_dir = tmpdir/'output'
map_input_dir.mkdir()
map_output_dir.mkdir()
reduce_input_dir.mkdir()
reduce_output_dir.mkdir()

# Copy and rename input files: part-00000, part-00001, etc.
input_path = pathlib.Path(input_path)
prepare_input_files(input_path, map_input_dir)

# Executables must be absolute paths
map_exe = pathlib.Path(map_exe).resolve()
Expand All @@ -72,7 +73,7 @@ def mapreduce(
LOGGER.info("Starting map stage")
map_stage(
exe=map_exe,
input_dir=map_input_dir,
input_dir=input_path,
output_dir=map_output_dir,
)

Expand All @@ -98,7 +99,7 @@ def mapreduce(
for filename in sorted(reduce_output_dir.glob("*")):
st_size = filename.stat().st_size
total_size += st_size
shutil.copy(filename, output_dir)
shutil.move(filename, output_dir)
output_path = output_dir.parent/last_two(filename)
LOGGER.debug("%s size=%sB", output_path, st_size)

Expand All @@ -107,52 +108,36 @@ def mapreduce(
LOGGER.info("Output directory: %s", output_dir)


def prepare_input_files(input_path, output_dir):
"""Copy and split input files. Rename to part-00000, part-00001, etc.
def split_file(input_filename, max_chunksize):
"""Iterate over the data in a file one chunk at a time."""
with open(input_filename, "rb") as input_file:
buffer = b""

The input_path can be a file or a directory of files. If a file is smaller
than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files,
split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to
output_dir. Input files will never be combined.
while True:
chunk = input_file.read(max_chunksize)
# Break if no more data remains.
if not chunk:
break

The number of files created will be the number of mappers since we will
assume that the number of tasks per mapper is 1. Apache Hadoop has a
configurable number of tasks per mapper, however for both simplicity and
because our use case has smaller inputs we use 1.
# Add the chunk to the buffer.
buffer += chunk

"""
part_num = 0
total_size = 0
for inpath in normalize_input_paths(input_path):
assert inpath.is_file()

# Compute output filenames
st_size = inpath.stat().st_size
total_size += st_size
n_splits = math.ceil(st_size / MAX_INPUT_SPLIT_SIZE)
n_splits = 1 if not n_splits else n_splits # Handle empty input file
LOGGER.debug(
"input %s size=%sB partitions=%s", inpath, st_size, n_splits
)
outpaths = [
output_dir/part_filename(part_num + i) for i in range(n_splits)
]
part_num += n_splits

# Copy to new output files
with contextlib.ExitStack() as stack:
outfiles = [stack.enter_context(i.open('w')) for i in outpaths]
infile = stack.enter_context(inpath.open(encoding="utf-8"))
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
outnames = [i.name for i in outpaths]
logging.debug(
"partition %s >> %s/{%s}",
last_two(inpath), outparent.name, ",".join(outnames),
)
for i, line in enumerate(infile):
outfiles[i % n_splits].write(line)
LOGGER.debug("total input size=%sB", total_size)
# Find the last newline character in the buffer. We don't want to
# yield a chunk that ends in the middle of a line; we have to
# respect line boundaries or we'll corrupt the input.
last_newline = buffer.rfind(b"\n")
if last_newline != -1:
# Yield the content up to the last newline, saving the rest
# for the next chunk.
yield buffer[:last_newline + 1]

# Remove processed data from the buffer. The next chunk will
# start with whatever data came after the last newline.
buffer = buffer[last_newline + 1:]

# Yield any remaining data.
if buffer:
yield buffer


def normalize_input_paths(input_path):
Expand Down Expand Up @@ -208,30 +193,51 @@ def part_filename(num):
return f"part-{num:05d}"


def map_single_chunk(exe, input_path, output_path, chunk):
"""Execute mapper on a single chunk."""
with output_path.open("w") as outfile:
try:
subprocess.run(
str(exe),
shell=True,
check=True,
input=chunk,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err


def map_stage(exe, input_dir, output_dir):
"""Execute mappers."""
i = 0
for i, input_path in enumerate(sorted(input_dir.iterdir()), 1):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
with input_path.open() as infile, output_path.open('w') as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
stdin=infile,
stdout=outfile,
part_num = 0
futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
) as pool:
for input_path in normalize_input_paths(input_dir):
for chunk in split_file(input_path, MAX_INPUT_SPLIT_SIZE):
output_path = output_dir/part_filename(part_num)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err
LOGGER.info("Finished map executions: %s", i)
futures.append(pool.submit(
map_single_chunk,
exe,
input_path,
output_path,
chunk,
))
part_num += 1
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
raise exception
LOGGER.info("Finished map executions: %s", part_num)


def sort_file(path):
Expand Down Expand Up @@ -287,7 +293,9 @@ def partition_keys_custom(
Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.
"""
# pylint: disable=too-many-arguments,too-many-locals
# pylint: disable=too-many-arguments
# pylint: disable=too-many-positional-arguments
# pylint: disable=too-many-locals
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
Expand Down Expand Up @@ -392,35 +400,61 @@ def group_stage(input_dir, output_dir, num_reducers, partitioner):
path.unlink()

# Sort output files
for path in sorted(output_dir.iterdir()):
sort_file(path)
try:
# Don't use a with statement here, because Coverage won't be able to
# detect code running in a subprocess if we do.
# https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html
# pylint: disable=consider-using-with
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
pool.map(sort_file, sorted(output_dir.iterdir()))
finally:
pool.close()
pool.join()

log_output_key_stats(output_keys_stats, output_dir)


def reduce_single_file(exe, input_path, output_path):
"""Execute reducer on a single file."""
with input_path.open() as infile, output_path.open("w") as outfile:
try:
subprocess.run(
str(exe),
shell=True,
check=True,
stdin=infile,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err


def reduce_stage(exe, input_dir, output_dir):
"""Execute reducers."""
i = 0
for i, input_path in enumerate(sorted(input_dir.iterdir())):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
with input_path.open() as infile, output_path.open('w') as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
stdin=infile,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err
futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
) as pool:
for i, input_path in enumerate(sorted(input_dir.iterdir())):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
futures.append(pool.submit(
reduce_single_file,
exe,
input_path,
output_path,
))
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
raise exception
LOGGER.info("Finished reduce executions: %s", i+1)


Expand Down
Loading
Loading