Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support single input file #67

Merged
merged 8 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion madoop/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def main():
# Run MapReduce API
try:
mapreduce(
input_dir=args.input,
input_path=args.input,
output_dir=args.output,
map_exe=args.mapper,
reduce_exe=args.reducer,
Expand Down
40 changes: 29 additions & 11 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
LOGGER = logging.getLogger("madoop")


def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
def mapreduce(input_path, output_dir, map_exe, reduce_exe):
"""Madoop API."""
# Do not clobber existing output directory
output_dir = pathlib.Path(output_dir)
Expand Down Expand Up @@ -54,8 +54,8 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
reduce_output_dir.mkdir()

# Copy and rename input files: part-00000, part-00001, etc.
input_dir = pathlib.Path(input_dir)
prepare_input_files(input_dir, map_input_dir)
input_path = pathlib.Path(input_path)
prepare_input_files(input_path, map_input_dir)

# Executables must be absolute paths
map_exe = pathlib.Path(map_exe).resolve()
Expand Down Expand Up @@ -98,25 +98,23 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
LOGGER.info("Output directory: %s", output_dir)


def prepare_input_files(input_dir, output_dir):
def prepare_input_files(input_path, output_dir):
"""Copy and split input files. Rename to part-00000, part-00001, etc.

If a file in input_dir is smaller than MAX_INPUT_SPLIT_SIZE, then copy it
to output_dir. For larger files, split into blocks of MAX_INPUT_SPLIT_SIZE
bytes and write block to output_dir. Input files will never be combined.
The input_path can be a file or a directory of files. If a file is smaller
than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files,
split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to
output_dir. Input files will never be combined.

The number of files created will be the number of mappers since we will
assume that the number of tasks per mapper is 1. Apache Hadoop has a
configurable number of tasks per mapper, however for both simplicity and
because our use case has smaller inputs we use 1.

"""
assert input_dir.is_dir(), f"Can't find input_dir '{input_dir}'"

# Split and copy input files
part_num = 0
total_size = 0
for inpath in sorted(input_dir.glob('*')):
for inpath in normalize_input_paths(input_path):
assert inpath.is_file()

# Compute output filenames
Expand Down Expand Up @@ -148,6 +146,26 @@ def prepare_input_files(input_dir, output_dir):
LOGGER.debug("total input size=%sB", total_size)


def normalize_input_paths(input_path):
"""Return a list of filtered input files.

If input_path is a file, then use it. If input_path is a directory, then
grab all the *files* inside. Ignore subdirectories.

"""
input_paths = []
if input_path.is_dir():
for path in sorted(input_path.glob('*')):
if path.is_file():
input_paths.append(path)
else:
LOGGER.warning("Ignoring non-file: %s", path)
elif input_path.is_file():
input_paths.append(input_path)
assert input_paths, f"No input: {input_path}"
return input_paths


def is_executable(exe):
"""Verify exe is executable and raise exception if it is not.

Expand Down
42 changes: 37 additions & 5 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_simple(tmpdir):
"""Run a simple MapReduce job and verify the output."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input",
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
Expand All @@ -24,7 +24,7 @@ def test_bash_executable(tmpdir):
"""Run a MapReduce job written in Bash."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input",
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.sh",
reduce_exe=TESTDATA_DIR/"word_count/reduce.sh",
Expand All @@ -39,7 +39,7 @@ def test_bad_map_exe(tmpdir):
"""Map exe returns non-zero should produce an error message."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input",
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map_invalid.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
Expand All @@ -50,7 +50,7 @@ def test_missing_shebang(tmpdir):
"""Reduce exe with a bad shebag should produce an error message."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input",
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce_invalid.py",
Expand All @@ -61,7 +61,39 @@ def test_empty_inputs(tmpdir):
"""Empty input files should not raise an error."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input_empty",
input_path=TESTDATA_DIR/"word_count/input_empty",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
tmpdir/"output",
)


def test_single_input_file(tmpdir):
"""Run a simple MapReduce job with an input file instead of dir."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_path=TESTDATA_DIR/"word_count/input-single-file.txt",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
tmpdir/"output",
)


def test_ignores_subdirs(tmpdir):
"""Run a simple MapReduce job with an input directory containing a
subdirectory. The subdirectory should be gracefully ignored.
"""
with tmpdir.as_cwd():
madoop.mapreduce(
input_path=TESTDATA_DIR/"word_count/input_with_subdir",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
Expand Down
4 changes: 4 additions & 0 deletions tests/testdata/word_count/input-single-file.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Hello World
Bye World
Hello Hadoop
Goodbye Hadoop
2 changes: 2 additions & 0 deletions tests/testdata/word_count/input_with_subdir/input01.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Hello World
Bye World
2 changes: 2 additions & 0 deletions tests/testdata/word_count/input_with_subdir/input02.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Hello Hadoop
Goodbye Hadoop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL .gitkeep

Empty file.
Loading