From ee7e493c0978375e1cece737b299899c3a042b3a Mon Sep 17 00:00:00 2001 From: Devin Gibson Date: Tue, 19 Nov 2024 19:47:48 -0500 Subject: [PATCH] feat(clp-s): Chunk output by size (in bytes) during ordered decompression. (#600) Co-authored-by: haiqi96 <14502009+haiqi96@users.noreply.github.com> Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- .../clp_package_utils/scripts/decompress.py | 4 +++- .../scripts/native/decompress.py | 2 +- .../core/src/clp_s/CommandLineArguments.cpp | 16 +++++++++------- .../core/src/clp_s/CommandLineArguments.hpp | 4 ++-- components/core/src/clp_s/JsonConstructor.cpp | 18 +++++++++--------- components/core/src/clp_s/JsonConstructor.hpp | 2 +- components/core/src/clp_s/clp-s.cpp | 2 +- .../executor/query/extract_stream_task.py | 2 +- 8 files changed, 27 insertions(+), 23 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index 903107c32..9085fb162 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -258,7 +258,9 @@ def main(argv): json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD) json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID") json_extraction_parser.add_argument( - "--target-chunk-size", type=int, help="Target chunk size", default=100000 + "--target-chunk-size", + type=int, + help="Target chunk size (B).", ) parsed_args = args_parser.parse_args(argv[1:]) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py index d4217d66d..7cce5d92a 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py @@ -299,7 +299,7 @@ def main(argv): json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD) json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID") json_extraction_parser.add_argument( - "--target-chunk-size", type=int, help="Target chunk size.", required=True + "--target-chunk-size", type=int, help="Target chunk size (B)." ) parsed_args = args_parser.parse_args(argv[1:]) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index ace505788..d174b4a23 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -302,11 +302,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { po::bool_switch(&m_ordered_decompression), "Enable decompression in log order for this archive" )( - "ordered-chunk-size", - po::value(&m_ordered_chunk_size) - ->default_value(m_ordered_chunk_size), - "Number of records to include in each output file when decompressing records " - "in log order" + "target-ordered-chunk-size", + po::value(&m_target_ordered_chunk_size) + ->default_value(m_target_ordered_chunk_size) + ->value_name("SIZE"), + "Chunk size (B) for each output file when decompressing records in log order." + " When set to 0, no chunking is performed." ); // clang-format on extraction_options.add(decompression_options); @@ -369,8 +370,9 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { throw std::invalid_argument("No output directory specified"); } - if (0 != m_ordered_chunk_size && false == m_ordered_decompression) { - throw std::invalid_argument("ordered-chunk-size must be used with ordered argument" + if (0 != m_target_ordered_chunk_size && false == m_ordered_decompression) { + throw std::invalid_argument( + "target-ordered-chunk-size must be used with ordered argument" ); } diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 8f2d79d8f..913e27fbc 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -106,7 +106,7 @@ class CommandLineArguments { bool get_ordered_decompression() const { return m_ordered_decompression; } - size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; } + size_t get_target_ordered_chunk_size() const { return m_target_ordered_chunk_size; } size_t get_minimum_table_size() const { return m_minimum_table_size; } @@ -178,7 +178,7 @@ class CommandLineArguments { size_t m_max_document_size{512ULL * 1024 * 1024}; // 512 MB bool m_structurize_arrays{false}; bool m_ordered_decompression{false}; - size_t m_ordered_chunk_size{0}; + size_t m_target_ordered_chunk_size{}; size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB bool m_disable_log_order{false}; diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index 0c816c5e3..95e3fa2c5 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -81,9 +81,9 @@ void JsonConstructor::construct_in_order() { // a given table tables.clear(); - int64_t first_idx{0}; - int64_t last_idx{0}; - size_t num_records_marshalled{0}; + int64_t first_idx{}; + int64_t last_idx{}; + size_t chunk_size{}; auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id; FileWriter writer; writer.open(src_path, FileWriter::OpenMode::CreateForWriting); @@ -149,7 +149,7 @@ void JsonConstructor::construct_in_order() { ReaderPointer next = record_queue.top(); record_queue.pop(); last_idx = next->get_next_log_event_idx(); - if (0 == num_records_marshalled) { + if (0 == chunk_size) { first_idx = last_idx; } next->get_next_message(buffer); @@ -157,17 +157,17 @@ void JsonConstructor::construct_in_order() { record_queue.emplace(std::move(next)); } writer.write(buffer.c_str(), buffer.length()); - num_records_marshalled += 1; + chunk_size += buffer.length(); - if (0 != m_option.ordered_chunk_size - && num_records_marshalled >= m_option.ordered_chunk_size) + if (0 != m_option.target_ordered_chunk_size + && chunk_size >= m_option.target_ordered_chunk_size) { finalize_chunk(true); - num_records_marshalled = 0; + chunk_size = 0; } } - if (num_records_marshalled > 0) { + if (chunk_size > 0) { finalize_chunk(false); } else { writer.close(); diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index c38e6d00b..3d9228a02 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -30,7 +30,7 @@ struct JsonConstructorOption { std::string archive_id; std::string output_dir; bool ordered{false}; - size_t ordered_chunk_size{0}; + size_t target_ordered_chunk_size{}; std::optional metadata_db; }; diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 941fd4366..a74693e33 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -300,7 +300,7 @@ int main(int argc, char const* argv[]) { option.output_dir = command_line_arguments.get_output_dir(); option.ordered = command_line_arguments.get_ordered_decompression(); option.archives_dir = archives_dir; - option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size(); + option.target_ordered_chunk_size = command_line_arguments.get_target_ordered_chunk_size(); if (false == command_line_arguments.get_mongodb_uri().empty()) { option.metadata_db = {command_line_arguments.get_mongodb_uri(), diff --git a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py index 9e99842ab..423ebb757 100644 --- a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py @@ -65,7 +65,7 @@ def make_command( stream_collection_name, ] if extract_json_config.target_chunk_size is not None: - command.append("--ordered-chunk-size") + command.append("--target-ordered-chunk-size") command.append(str(extract_json_config.target_chunk_size)) else: logger.error(f"Unsupported storage engine {storage_engine}")