From 24ee2ce4fbc364be2ff3ca3f4642ed0b531075cc Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 02:44:41 -0500 Subject: [PATCH 01/15] Don't copy input --- madoop/mapreduce.py | 122 +++++++++++++++++++------------------------- 1 file changed, 52 insertions(+), 70 deletions(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 6afb58b..a7af497 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -7,7 +7,6 @@ import collections import hashlib import logging -import math import pathlib import shutil import subprocess @@ -43,18 +42,15 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers): LOGGER.debug("tmpdir=%s", tmpdir) # Create stage input and output directory - map_input_dir = tmpdir/'input' map_output_dir = tmpdir/'mapper-output' reduce_input_dir = tmpdir/'reducer-input' reduce_output_dir = tmpdir/'output' - map_input_dir.mkdir() map_output_dir.mkdir() reduce_input_dir.mkdir() reduce_output_dir.mkdir() # Copy and rename input files: part-00000, part-00001, etc. input_path = pathlib.Path(input_path) - prepare_input_files(input_path, map_input_dir) # Executables must be absolute paths map_exe = pathlib.Path(map_exe).resolve() @@ -64,7 +60,7 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers): LOGGER.info("Starting map stage") map_stage( exe=map_exe, - input_dir=map_input_dir, + input_dir=input_path, output_dir=map_output_dir, ) @@ -98,52 +94,36 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers): LOGGER.info("Output directory: %s", output_dir) -def prepare_input_files(input_path, output_dir): - """Copy and split input files. Rename to part-00000, part-00001, etc. +def split_file(input_filename, max_chunksize): + """Iterate over the data in a file one chunk at a time.""" + with open(input_filename, "rb") as input_file: + buffer = b"" - The input_path can be a file or a directory of files. If a file is smaller - than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files, - split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to - output_dir. Input files will never be combined. + while True: + chunk = input_file.read(max_chunksize) + # Break if no more data remains. + if not chunk: + break - The number of files created will be the number of mappers since we will - assume that the number of tasks per mapper is 1. Apache Hadoop has a - configurable number of tasks per mapper, however for both simplicity and - because our use case has smaller inputs we use 1. + # Add the chunk to the buffer. + buffer += chunk - """ - part_num = 0 - total_size = 0 - for inpath in normalize_input_paths(input_path): - assert inpath.is_file() - - # Compute output filenames - st_size = inpath.stat().st_size - total_size += st_size - n_splits = math.ceil(st_size / MAX_INPUT_SPLIT_SIZE) - n_splits = 1 if not n_splits else n_splits # Handle empty input file - LOGGER.debug( - "input %s size=%sB partitions=%s", inpath, st_size, n_splits - ) - outpaths = [ - output_dir/part_filename(part_num + i) for i in range(n_splits) - ] - part_num += n_splits - - # Copy to new output files - with contextlib.ExitStack() as stack: - outfiles = [stack.enter_context(i.open('w')) for i in outpaths] - infile = stack.enter_context(inpath.open(encoding="utf-8")) - outparent = outpaths[0].parent - assert all(i.parent == outparent for i in outpaths) - outnames = [i.name for i in outpaths] - logging.debug( - "partition %s >> %s/{%s}", - last_two(inpath), outparent.name, ",".join(outnames), - ) - for i, line in enumerate(infile): - outfiles[i % n_splits].write(line) - LOGGER.debug("total input size=%sB", total_size) + # Find the last newline character in the buffer. We don't want to + # yield a chunk that ends in the middle of a line; we have to + # respect line boundaries or we'll corrupt the input. + last_newline = buffer.rfind(b"\n") + if last_newline != -1: + # Yield the content up to the last newline, saving the rest + # for the next chunk. + yield buffer[:last_newline + 1] + + # Remove unprocessed data from the buffer. The next chunk will + # start with whatever data came after the last newline. + buffer = buffer[last_newline + 1:] + + # Yield any remaining data. + if buffer: + yield buffer def normalize_input_paths(input_path): @@ -201,28 +181,30 @@ def part_filename(num): def map_stage(exe, input_dir, output_dir): """Execute mappers.""" - i = 0 - for i, input_path in enumerate(sorted(input_dir.iterdir()), 1): - output_path = output_dir/part_filename(i) - LOGGER.debug( - "%s < %s > %s", - exe.name, last_two(input_path), last_two(output_path), - ) - with input_path.open() as infile, output_path.open('w') as outfile: - try: - subprocess.run( - str(exe), - shell=True, - check=True, - stdin=infile, - stdout=outfile, - ) - except subprocess.CalledProcessError as err: - raise MadoopError( - f"Command returned non-zero: " - f"{exe} < {input_path} > {output_path}" - ) from err - LOGGER.info("Finished map executions: %s", i) + part_num = 1 + for input_path in normalize_input_paths(input_dir): + for chunk in split_file(input_path, MAX_INPUT_SPLIT_SIZE): + output_path = output_dir/part_filename(part_num) + LOGGER.debug( + "%s < %s > %s", + exe.name, last_two(input_path), last_two(output_path), + ) + with output_path.open("w") as outfile: + try: + subprocess.run( + str(exe), + shell=True, + check=True, + input=chunk, + stdout=outfile, + ) + except subprocess.CalledProcessError as err: + raise MadoopError( + f"Command returned non-zero: " + f"{exe} < {input_path} > {output_path}" + ) from err + part_num += 1 + LOGGER.info("Finished map executions: %s", part_num) def sort_file(path): From e55d7e05b12d0e06f7c3747072b123d724c22895 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 04:19:47 -0500 Subject: [PATCH 02/15] Move output files instead of copying --- madoop/mapreduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index a7af497..670d1da 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -85,7 +85,7 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers): for filename in sorted(reduce_output_dir.glob("*")): st_size = filename.stat().st_size total_size += st_size - shutil.copy(filename, output_dir) + shutil.move(filename, output_dir) output_path = output_dir.parent/last_two(filename) LOGGER.debug("%s size=%sB", output_path, st_size) From 997cc2d727fec0950ab708897b33a289002865c8 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 05:24:39 -0500 Subject: [PATCH 03/15] Parallelize with thread pools and a process pool --- madoop/mapreduce.py | 123 ++++++++++++++++++++++++++++---------------- 1 file changed, 78 insertions(+), 45 deletions(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 670d1da..a9c267e 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -11,6 +11,8 @@ import shutil import subprocess import tempfile +import multiprocessing +import concurrent.futures from .exceptions import MadoopError @@ -179,31 +181,47 @@ def part_filename(num): return f"part-{num:05d}" +def map_single_chunk(exe, input_path, output_path, chunk): + """Execute mapper on a single chunk.""" + with output_path.open("w") as outfile: + try: + subprocess.run( + str(exe), + shell=True, + check=True, + input=chunk, + stdout=outfile, + ) + except subprocess.CalledProcessError as err: + raise MadoopError( + f"Command returned non-zero: " + f"{exe} < {input_path} > {output_path}" + ) from err + + def map_stage(exe, input_dir, output_dir): """Execute mappers.""" - part_num = 1 - for input_path in normalize_input_paths(input_dir): - for chunk in split_file(input_path, MAX_INPUT_SPLIT_SIZE): - output_path = output_dir/part_filename(part_num) - LOGGER.debug( - "%s < %s > %s", - exe.name, last_two(input_path), last_two(output_path), - ) - with output_path.open("w") as outfile: - try: - subprocess.run( - str(exe), - shell=True, - check=True, - input=chunk, - stdout=outfile, - ) - except subprocess.CalledProcessError as err: - raise MadoopError( - f"Command returned non-zero: " - f"{exe} < {input_path} > {output_path}" - ) from err - part_num += 1 + part_num = 0 + futures = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=multiprocessing.cpu_count() + ) as pool: + for input_path in normalize_input_paths(input_dir): + for chunk in split_file(input_path, MAX_INPUT_SPLIT_SIZE): + output_path = output_dir/part_filename(part_num) + LOGGER.debug( + "%s < %s > %s", + exe.name, last_two(input_path), last_two(output_path), + ) + futures.append(pool.submit( + map_single_chunk, + exe, + input_path, + output_path, + chunk, + )) + part_num += 1 + concurrent.futures.wait(futures) LOGGER.info("Finished map executions: %s", part_num) @@ -294,8 +312,8 @@ def group_stage(input_dir, output_dir, num_reducers): path.unlink() # Sort output files - for path in sorted(output_dir.iterdir()): - sort_file(path) + with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: + pool.map(sort_file, sorted(output_dir.iterdir())) # Log output keyspace stats all_output_keys = set() @@ -306,29 +324,44 @@ def group_stage(input_dir, output_dir, num_reducers): len(all_output_keys)) +def reduce_single_file(exe, input_path, output_path): + """Execute reducer on a single file.""" + with input_path.open() as infile, output_path.open("w") as outfile: + try: + subprocess.run( + str(exe), + shell=True, + check=True, + stdin=infile, + stdout=outfile, + ) + except subprocess.CalledProcessError as err: + raise MadoopError( + f"Command returned non-zero: " + f"{exe} < {input_path} > {output_path}" + ) from err + + def reduce_stage(exe, input_dir, output_dir): """Execute reducers.""" i = 0 - for i, input_path in enumerate(sorted(input_dir.iterdir())): - output_path = output_dir/part_filename(i) - LOGGER.debug( - "%s < %s > %s", - exe.name, last_two(input_path), last_two(output_path), - ) - with input_path.open() as infile, output_path.open('w') as outfile: - try: - subprocess.run( - str(exe), - shell=True, - check=True, - stdin=infile, - stdout=outfile, - ) - except subprocess.CalledProcessError as err: - raise MadoopError( - f"Command returned non-zero: " - f"{exe} < {input_path} > {output_path}" - ) from err + futures = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=multiprocessing.cpu_count() + ) as pool: + for i, input_path in enumerate(sorted(input_dir.iterdir())): + output_path = output_dir/part_filename(i) + LOGGER.debug( + "%s < %s > %s", + exe.name, last_two(input_path), last_two(output_path), + ) + futures.append(pool.submit( + reduce_single_file, + exe, + input_path, + output_path, + )) + concurrent.futures.wait(futures) LOGGER.info("Finished reduce executions: %s", i+1) From 5ca5407d40c221fdcec8c764c8e288ed07701ac5 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 05:31:41 -0500 Subject: [PATCH 04/15] Increase chunk size --- madoop/mapreduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index a9c267e..0854fa4 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -17,7 +17,7 @@ # Large input files are automatically split -MAX_INPUT_SPLIT_SIZE = 2**20 # 1 MB +MAX_INPUT_SPLIT_SIZE = 10 * 1024 * 1024 # 10 MB # The number of reducers is dynamically determined by the number of unique keys # but will not be more than num_reducers From 2787ffb5a6934b728b68f8ab1b0c95bc664bdef0 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 05:59:18 -0500 Subject: [PATCH 05/15] Helper functions for Group Stage --- madoop/mapreduce.py | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 0854fa4..07dfa4d 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -266,6 +266,25 @@ def partition_keys( output_keys_stats[outpath].add(key) +def log_input_key_stats(input_keys_stats, input_dir): + """Log input key stats.""" + all_input_keys = set() + for inpath, keys in sorted(input_keys_stats.items()): + all_input_keys.update(keys) + LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys)) + LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys)) + + +def log_output_key_stats(output_keys_stats, output_dir): + """Log output keyspace stats.""" + all_output_keys = set() + for outpath, keys in sorted(output_keys_stats.items()): + all_output_keys.update(keys) + LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys)) + LOGGER.debug("%s all_unique_keys=%s", output_dir.name, + len(all_output_keys)) + + def group_stage(input_dir, output_dir, num_reducers): """Run group stage. @@ -288,12 +307,7 @@ def group_stage(input_dir, output_dir, num_reducers): partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats, num_reducers) - # Log input keyspace stats - all_input_keys = set() - for inpath, keys in sorted(input_keys_stats.items()): - all_input_keys.update(keys) - LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys)) - LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys)) + log_input_key_stats(input_keys_stats, input_dir) # Log partition input and output filenames outnames = [i.name for i in outpaths] @@ -315,13 +329,7 @@ def group_stage(input_dir, output_dir, num_reducers): with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: pool.map(sort_file, sorted(output_dir.iterdir())) - # Log output keyspace stats - all_output_keys = set() - for outpath, keys in sorted(output_keys_stats.items()): - all_output_keys.update(keys) - LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys)) - LOGGER.debug("%s all_unique_keys=%s", output_dir.name, - len(all_output_keys)) + log_output_key_stats(output_keys_stats, output_dir) def reduce_single_file(exe, input_path, output_path): From a5d697650c4da8271e9d8885a8ee21866c78581b Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 17:59:41 -0500 Subject: [PATCH 06/15] Add tests for input splitting --- madoop/mapreduce.py | 2 +- tests/test_stages.py | 50 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 07dfa4d..5afb3d2 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -119,7 +119,7 @@ def split_file(input_filename, max_chunksize): # for the next chunk. yield buffer[:last_newline + 1] - # Remove unprocessed data from the buffer. The next chunk will + # Remove processed data from the buffer. The next chunk will # start with whatever data came after the last newline. buffer = buffer[last_newline + 1:] diff --git a/tests/test_stages.py b/tests/test_stages.py index 7ee82d0..fbc05e0 100644 --- a/tests/test_stages.py +++ b/tests/test_stages.py @@ -1,6 +1,13 @@ """System tests for the map stage of Michigan Hadoop.""" +import shutil from pathlib import Path -from madoop.mapreduce import map_stage, group_stage, reduce_stage +from madoop.mapreduce import ( + map_stage, + group_stage, + reduce_stage, + split_file, + MAX_INPUT_SPLIT_SIZE, +) from . import utils from .utils import TESTDATA_DIR @@ -68,3 +75,44 @@ def test_reduce_stage_2_reducers(tmpdir): TESTDATA_DIR/"word_count/correct/reducer-output-2-reducers", tmpdir, ) + + +def test_input_splitting(tmp_path): + """Test that the Map Stage correctly splits input.""" + input_data = "o" * (MAX_INPUT_SPLIT_SIZE - 10) + "\n" + \ + "a" * int(MAX_INPUT_SPLIT_SIZE / 2) + input_dir = tmp_path/"input" + output_dir = tmp_path/"output" + input_dir.mkdir() + output_dir.mkdir() + + with open(input_dir/"input.txt", "w", encoding="utf-8") as input_file: + input_file.write(input_data) + + map_stage( + exe=Path(shutil.which("cat")), + input_dir=input_dir, + output_dir=output_dir, + ) + + output_files = sorted(output_dir.glob("*")) + assert len(output_files) == 2 + assert output_files == [output_dir/"part-00000", output_dir/"part-00001"] + + with open(output_dir/"part-00000", "r", encoding="utf-8") as outfile1: + data = outfile1.read() + assert data == "o" * (MAX_INPUT_SPLIT_SIZE - 10) + "\n" + with open(output_dir/"part-00001", "r", encoding="utf-8") as outfile2: + data = outfile2.read() + assert data == "a" * int(MAX_INPUT_SPLIT_SIZE / 2) + + +def test_split_file_mid_chunk(tmp_path): + """Test that file splitting still works when data remains in the buffer.""" + input_data = "noah says\nhello world" + input_file = tmp_path/"input.txt" + with open(input_file, "w", encoding="utf-8") as infile: + infile.write(input_data) + + splits = list(split_file(input_file, 50)) + assert splits == [b"noah says\n", b"hello world"] From 6fffb63ecb1a1348808d59385661ee5124554105 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 18:29:04 -0500 Subject: [PATCH 07/15] Re-raise exceptions from thread pool --- madoop/mapreduce.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 5afb3d2..29b674b 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -221,7 +221,10 @@ def map_stage(exe, input_dir, output_dir): chunk, )) part_num += 1 - concurrent.futures.wait(futures) + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + raise exception LOGGER.info("Finished map executions: %s", part_num) @@ -369,7 +372,10 @@ def reduce_stage(exe, input_dir, output_dir): input_path, output_path, )) - concurrent.futures.wait(futures) + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + raise exception LOGGER.info("Finished reduce executions: %s", i+1) From 6f84f80ae708e1a83686ae761c8295cce62c5188 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 18:30:02 -0500 Subject: [PATCH 08/15] Increase API coverage --- tests/test_api.py | 32 ++++++++++++++++++++++ tests/testdata/word_count/reduce_exit_1.py | 6 ++++ 2 files changed, 38 insertions(+) create mode 100755 tests/testdata/word_count/reduce_exit_1.py diff --git a/tests/test_api.py b/tests/test_api.py index 8754970..88d0ed6 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,6 +1,9 @@ """System tests for the API interface.""" +from pathlib import Path import pytest import madoop +from madoop.exceptions import MadoopError +from madoop.mapreduce import map_stage, reduce_stage from . import utils from .utils import TESTDATA_DIR @@ -53,6 +56,18 @@ def test_bash_executable(tmpdir): ) +def test_output_already_exists(tmpdir): + """Output already existing should raise an error.""" + with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): + madoop.mapreduce( + input_path=TESTDATA_DIR/"word_count/input", + output_dir=tmpdir, + map_exe=TESTDATA_DIR/"word_count/map.py", + reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=2, + ) + + def test_bad_map_exe(tmpdir): """Map exe returns non-zero should produce an error message.""" with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): @@ -64,6 +79,23 @@ def test_bad_map_exe(tmpdir): num_reducers=4 ) + with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): + map_stage( + exe=TESTDATA_DIR/"word_count/map_invalid.py", + input_dir=TESTDATA_DIR/"word_count/input", + output_dir=Path(tmpdir), + ) + + +def test_bad_reduce_exe(tmpdir): + """Reduce exe returns non-zero should produce an error message.""" + with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): + reduce_stage( + exe=TESTDATA_DIR/"word_count/reduce_exit_1.py", + input_dir=TESTDATA_DIR/"word_count/input", + output_dir=Path(tmpdir), + ) + def test_missing_shebang(tmpdir): """Reduce exe with a bad shebag should produce an error message.""" diff --git a/tests/testdata/word_count/reduce_exit_1.py b/tests/testdata/word_count/reduce_exit_1.py new file mode 100755 index 0000000..4561d53 --- /dev/null +++ b/tests/testdata/word_count/reduce_exit_1.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +"""Invalid reduce executable exits 1.""" + +import sys + +sys.exit(1) From c147a8726ed94878ad2109e03590d0b7d0091182 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 22 Nov 2023 18:32:52 -0500 Subject: [PATCH 09/15] Unused import --- tests/test_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_api.py b/tests/test_api.py index 88d0ed6..4aebd8b 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2,7 +2,6 @@ from pathlib import Path import pytest import madoop -from madoop.exceptions import MadoopError from madoop.mapreduce import map_stage, reduce_stage from . import utils from .utils import TESTDATA_DIR From b85f9c7e89a4f03f588939ad4052add79068af11 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Sun, 26 Nov 2023 05:37:26 -0500 Subject: [PATCH 10/15] Allow Coverage to detect code running in subprocesses --- .coveragerc | 3 +++ .gitignore | 2 ++ madoop/mapreduce.py | 10 +++++++++- 3 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 .coveragerc diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..2ba6cc1 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,3 @@ +[run] +concurrency = thread,multiprocessing +parallel = true diff --git a/.gitignore b/.gitignore index 8c61743..f75b093 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,8 @@ build/ /.tox/ /.coverage* *,cover +# Make an exception for .coveragerc, which should be checked into version control. +!.coveragerc # Text editors and IDEs *~ diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 29b674b..7059d8e 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -329,8 +329,16 @@ def group_stage(input_dir, output_dir, num_reducers): path.unlink() # Sort output files - with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: + try: + # Don't use a with statement here, because Coverage won't be able to + # detect code running in a subprocess if we do. + # https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html + # pylint: disable=consider-using-with + pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) pool.map(sort_file, sorted(output_dir.iterdir())) + finally: + pool.close() + pool.join() log_output_key_stats(output_keys_stats, output_dir) From f0771acbe946e04c69f6dca2153303f45a84d03a Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Sun, 26 Nov 2023 05:42:54 -0500 Subject: [PATCH 11/15] Update MANIFEST.in --- MANIFEST.in | 1 + 1 file changed, 1 insertion(+) diff --git a/MANIFEST.in b/MANIFEST.in index 9200ff1..af35254 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -9,6 +9,7 @@ graft madoop/example # Avoid dev and and binary files exclude tox.ini +exclude .coveragerc exclude .editorconfig global-exclude *.pyc global-exclude __pycache__ From a9e97c15cd4ed71d6417d3a8f57f5ba3aea64c0e Mon Sep 17 00:00:00 2001 From: Andrew DeOrio Date: Wed, 6 Nov 2024 14:26:56 -0500 Subject: [PATCH 12/15] lint --- madoop/mapreduce.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index bd2b720..4b74b98 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -24,6 +24,7 @@ def mapreduce( + *, input_path, output_dir, map_exe, @@ -292,7 +293,9 @@ def partition_keys_custom( Update the data structures provided by the caller input_keys_stats and output_keys_stats. Both map a filename to a set of of keys. """ - # pylint: disable=too-many-arguments,too-many-locals + # pylint: disable=too-many-arguments + # pylint: disable=too-many-positional-arguments + # pylint: disable=too-many-locals assert len(outpaths) == num_reducers outparent = outpaths[0].parent assert all(i.parent == outparent for i in outpaths) From d397c90d0198a70600f32f5e5dd18a8c1aae7001 Mon Sep 17 00:00:00 2001 From: noah-weingarden <33741795+noah-weingarden@users.noreply.github.com> Date: Wed, 6 Nov 2024 14:47:13 -0500 Subject: [PATCH 13/15] Revert docstring --- madoop/mapreduce.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 4b74b98..2d83edf 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -32,12 +32,7 @@ def mapreduce( num_reducers, partitioner=None, ): - """Madoop API. - - The number of reducers is dynamically determined by the number of unique - keys but will not be more than num_reducers - - """ + """Madoop API.""" # pylint: disable=too-many-arguments # Do not clobber existing output directory output_dir = pathlib.Path(output_dir) From 424a193c78c8e0259d1c506ad2a4608c35794b1b Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 6 Nov 2024 15:14:24 -0500 Subject: [PATCH 14/15] Revert change to `shell` parameter from merge conflict --- madoop/mapreduce.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index 2d83edf..3d2c660 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -194,7 +194,7 @@ def map_single_chunk(exe, input_path, output_path, chunk): try: subprocess.run( str(exe), - shell=True, + shell=False, check=True, input=chunk, stdout=outfile, @@ -415,7 +415,7 @@ def reduce_single_file(exe, input_path, output_path): try: subprocess.run( str(exe), - shell=True, + shell=False, check=True, stdin=infile, stdout=outfile, From 71b01aae45f69b51c04318e283379d4ee96d54b5 Mon Sep 17 00:00:00 2001 From: noah-weingarden Date: Wed, 6 Nov 2024 15:29:51 -0500 Subject: [PATCH 15/15] Eliminate pkg_resources deprecration warning --- tests/test_cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index dab719e..9156c37 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,6 @@ """System tests for the command line interface.""" import subprocess -import pkg_resources +import importlib.metadata import pytest from . import utils from .utils import TESTDATA_DIR @@ -15,7 +15,7 @@ def test_version(): ) output = result.stdout.decode("utf-8") assert "Madoop" in output - assert pkg_resources.get_distribution("madoop").version in output + assert importlib.metadata.version("madoop") in output def test_help():