From 477130d2c9468a4c686902a7622b9a06730b4e82 Mon Sep 17 00:00:00 2001 From: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com> Date: Thu, 14 Nov 2024 22:54:52 -0500 Subject: [PATCH 1/3] build(core-clp): Remove `boost::iostream` dependency. (#450) --- components/core/src/clp/clg/CMakeLists.txt | 2 +- components/core/src/clp/clo/CMakeLists.txt | 2 +- components/core/src/clp/clp/CMakeLists.txt | 2 +- .../make_dictionaries_readable/CMakeLists.txt | 2 +- .../clp/streaming_archive/reader/Segment.cpp | 54 +++++++++---------- .../clp/streaming_archive/reader/Segment.hpp | 6 +-- .../core/tests/test-StreamingCompression.cpp | 21 ++------ 7 files changed, 36 insertions(+), 53 deletions(-) diff --git a/components/core/src/clp/clg/CMakeLists.txt b/components/core/src/clp/clg/CMakeLists.txt index a0ca5e9d0..c3c8e3aea 100644 --- a/components/core/src/clp/clg/CMakeLists.txt +++ b/components/core/src/clp/clg/CMakeLists.txt @@ -130,7 +130,7 @@ target_compile_features(clg PRIVATE cxx_std_20) target_include_directories(clg PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(clg PRIVATE - Boost::filesystem Boost::iostreams Boost::program_options + Boost::filesystem Boost::program_options fmt::fmt log_surgeon::log_surgeon MariaDBClient::MariaDBClient diff --git a/components/core/src/clp/clo/CMakeLists.txt b/components/core/src/clp/clo/CMakeLists.txt index 931bffeaf..39cc72b60 100644 --- a/components/core/src/clp/clo/CMakeLists.txt +++ b/components/core/src/clp/clo/CMakeLists.txt @@ -158,7 +158,7 @@ target_compile_features(clo PRIVATE cxx_std_20) target_include_directories(clo PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(clo PRIVATE - Boost::filesystem Boost::iostreams Boost::program_options + Boost::filesystem Boost::program_options fmt::fmt log_surgeon::log_surgeon ${MONGOCXX_TARGET} diff --git a/components/core/src/clp/clp/CMakeLists.txt b/components/core/src/clp/clp/CMakeLists.txt index 53342f3a9..eff32ce46 100644 --- a/components/core/src/clp/clp/CMakeLists.txt +++ b/components/core/src/clp/clp/CMakeLists.txt @@ -171,7 +171,7 @@ target_compile_features(clp PRIVATE cxx_std_20) target_include_directories(clp PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(clp PRIVATE - Boost::filesystem Boost::iostreams Boost::program_options + Boost::filesystem Boost::program_options fmt::fmt log_surgeon::log_surgeon spdlog::spdlog diff --git a/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt b/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt index fd62a39fb..9779d137f 100644 --- a/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt +++ b/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt @@ -45,7 +45,7 @@ target_compile_features(make-dictionaries-readable PRIVATE cxx_std_20) target_include_directories(make-dictionaries-readable PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(make-dictionaries-readable PRIVATE - Boost::filesystem Boost::iostreams Boost::program_options + Boost::filesystem Boost::program_options log_surgeon::log_surgeon spdlog::spdlog clp::string_utils diff --git a/components/core/src/clp/streaming_archive/reader/Segment.cpp b/components/core/src/clp/streaming_archive/reader/Segment.cpp index aa43e1d1f..7732dc5f8 100644 --- a/components/core/src/clp/streaming_archive/reader/Segment.cpp +++ b/components/core/src/clp/streaming_archive/reader/Segment.cpp @@ -3,12 +3,16 @@ #include #include +#include #include #include +#include +#include "../../ErrorCode.hpp" #include "../../FileReader.hpp" #include "../../spdlog_with_specializations.hpp" +#include "../../TraceableException.hpp" using std::make_unique; using std::string; @@ -33,47 +37,37 @@ ErrorCode Segment::try_open(string const& segment_dir_path, segment_id_t segment return ErrorCode_Success; } - // Get the size of the compressed segment file - boost::system::error_code boost_error_code; - size_t segment_file_size = boost::filesystem::file_size(segment_path, boost_error_code); - if (boost_error_code) { - SPDLOG_ERROR( - "streaming_archive::reader::Segment: Unable to obtain file size for segment: " - "{}", - segment_path.c_str() - ); - SPDLOG_ERROR("streaming_archive::reader::Segment: {}", boost_error_code.message().c_str()); - return ErrorCode_Failure; - } - - // Sanity check: previously used memory mapped file should be closed before opening a new - // one - if (m_memory_mapped_segment_file.is_open()) { + // Sanity check: previously used memory mapped file should be closed before opening a new one + if (m_memory_mapped_segment_file.has_value()) { SPDLOG_WARN( "streaming_archive::reader::Segment: Previous segment should be closed before " "opening new one: {}", segment_path.c_str() ); - m_memory_mapped_segment_file.close(); + m_memory_mapped_segment_file.reset(); } - // Create read only memory mapped file - boost::iostreams::mapped_file_params memory_map_params; - memory_map_params.path = segment_path; - memory_map_params.flags = boost::iostreams::mapped_file::readonly; - memory_map_params.length = segment_file_size; - // Try to map it to the same memory location as the previous memory mapped file - memory_map_params.hint = m_memory_mapped_segment_file.data(); - m_memory_mapped_segment_file.open(memory_map_params); - if (!m_memory_mapped_segment_file.is_open()) { + + // Create read-only memory mapped file + try { + m_memory_mapped_segment_file.emplace(segment_path); + } catch (TraceableException const& ex) { + auto const error_code{ex.get_error_code()}; + auto const formatted_error{ + ErrorCode_errno == error_code + ? fmt::format("errno={}", errno) + : fmt::format("error_code={}, message={}", error_code, ex.what()) + }; SPDLOG_ERROR( "streaming_archive::reader:Segment: Unable to memory map the compressed " - "segment with path: {}", - segment_path.c_str() + "segment with path: {}. Error: {}", + segment_path.c_str(), + formatted_error ); return ErrorCode_Failure; } - m_decompressor.open(m_memory_mapped_segment_file.data(), segment_file_size); + auto const view{m_memory_mapped_segment_file.value().get_view()}; + m_decompressor.open(view.data(), view.size()); m_segment_path = segment_path; return ErrorCode_Success; @@ -82,7 +76,7 @@ ErrorCode Segment::try_open(string const& segment_dir_path, segment_id_t segment void Segment::close() { if (!m_segment_path.empty()) { m_decompressor.close(); - m_memory_mapped_segment_file.close(); + m_memory_mapped_segment_file.reset(); m_segment_path.clear(); } } diff --git a/components/core/src/clp/streaming_archive/reader/Segment.hpp b/components/core/src/clp/streaming_archive/reader/Segment.hpp index 9ed40ea60..cdcd51b4d 100644 --- a/components/core/src/clp/streaming_archive/reader/Segment.hpp +++ b/components/core/src/clp/streaming_archive/reader/Segment.hpp @@ -2,12 +2,12 @@ #define CLP_STREAMING_ARCHIVE_READER_SEGMENT_HPP #include +#include #include -#include - #include "../../Defs.h" #include "../../ErrorCode.hpp" +#include "../../ReadOnlyMemoryMappedFile.hpp" #include "../../streaming_compression/passthrough/Decompressor.hpp" #include "../../streaming_compression/zstd/Decompressor.hpp" #include "../Constants.hpp" @@ -53,7 +53,7 @@ class Segment { private: std::string m_segment_path; - boost::iostreams::mapped_file_source m_memory_mapped_segment_file; + std::optional m_memory_mapped_segment_file; #if USE_PASSTHROUGH_COMPRESSION streaming_compression::passthrough::Decompressor m_decompressor; diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index b43316b3f..747a38a05 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -1,10 +1,11 @@ +#include #include #include -#include #include #include +#include "../src/clp/ReadOnlyMemoryMappedFile.hpp" #include "../src/clp/streaming_compression/passthrough/Compressor.hpp" #include "../src/clp/streaming_compression/passthrough/Decompressor.hpp" #include "../src/clp/streaming_compression/zstd/Compressor.hpp" @@ -149,22 +150,10 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { // Decompress // Memory map compressed file - // Create memory mapping for compressed_file_path, use boost read only memory mapped file - boost::system::error_code boost_error_code; - size_t compressed_file_size - = boost::filesystem::file_size(compressed_file_path, boost_error_code); - REQUIRE(!boost_error_code); - - boost::iostreams::mapped_file_params memory_map_params; - memory_map_params.path = compressed_file_path; - memory_map_params.flags = boost::iostreams::mapped_file::readonly; - memory_map_params.length = compressed_file_size; - boost::iostreams::mapped_file_source memory_mapped_compressed_file; - memory_mapped_compressed_file.open(memory_map_params); - REQUIRE(memory_mapped_compressed_file.is_open()); - + clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{compressed_file_path}; clp::streaming_compression::passthrough::Decompressor decompressor; - decompressor.open(memory_mapped_compressed_file.data(), compressed_file_size); + auto const compressed_file_view{memory_mapped_compressed_file.get_view()}; + decompressor.open(compressed_file_view.data(), compressed_file_view.size()); size_t uncompressed_bytes = 0; REQUIRE(ErrorCode_Success From ac4f1c1887fff87ae576e19feedd516379ff2368 Mon Sep 17 00:00:00 2001 From: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com> Date: Thu, 14 Nov 2024 23:32:59 -0500 Subject: [PATCH 2/3] test(core): Update `network_reader_with_valid_http_header_kv_pairs` to verify the return value before parsing the read content into a JSON object. (#593) --- components/core/tests/test-NetworkReader.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/components/core/tests/test-NetworkReader.cpp b/components/core/tests/test-NetworkReader.cpp index f32daef14..f2995f141 100644 --- a/components/core/tests/test-NetworkReader.cpp +++ b/components/core/tests/test-NetworkReader.cpp @@ -213,9 +213,10 @@ TEST_CASE("network_reader_with_valid_http_header_kv_pairs", "[NetworkReader]") { clp::NetworkReader::cDefaultBufferSize, valid_http_header_kv_pairs }; - auto const content = nlohmann::json::parse(get_content(reader)); - auto const& headers{content.at("headers")}; + auto const content{get_content(reader)}; REQUIRE(assert_curl_error_code(CURLE_OK, reader)); + auto const parsed_content = nlohmann::json::parse(content); + auto const& headers{parsed_content.at("headers")}; for (auto const& [key, value] : valid_http_header_kv_pairs) { REQUIRE((value == headers.at(key).get())); } From d969aaf456cc2cc5caa58586d4264718fda94aa0 Mon Sep 17 00:00:00 2001 From: haiqi96 <14502009+haiqi96@users.noreply.github.com> Date: Mon, 18 Nov 2024 09:51:20 -0500 Subject: [PATCH 3/3] feat(clp-package): Add support for extracting JSON streams from archives. (#569) Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- .../clp_package_utils/general.py | 17 +- .../clp_package_utils/scripts/decompress.py | 50 +++- .../scripts/native/decompress.py | 96 +++--- .../clp_package_utils/scripts/start_clp.py | 16 +- .../clp-py-utils/clp_py_utils/clp_config.py | 26 +- .../create-results-cache-indices.py | 10 +- .../executor/query/celeryconfig.py | 4 +- ...ract_ir_task.py => extract_stream_task.py} | 50 +++- .../executor/query/fs_search_task.py | 4 +- .../job_orchestration/scheduler/constants.py | 1 + .../job_orchestration/scheduler/job_config.py | 5 + .../scheduler/query/query_scheduler.py | 283 +++++++++++++----- .../scheduler/scheduler_data.py | 13 +- .../client/src/ui/QueryStatus.jsx | 2 +- components/log-viewer-webui/server/.env | 2 +- .../log-viewer-webui/server/settings.json | 4 +- .../log-viewer-webui/server/src/DbManager.js | 10 +- components/log-viewer-webui/server/src/app.js | 2 +- .../server/src/routes/static.js | 10 +- .../package-template/src/etc/clp-config.yml | 8 +- 20 files changed, 415 insertions(+), 198 deletions(-) rename components/job-orchestration/job_orchestration/executor/query/{extract_ir_task.py => extract_stream_task.py} (69%) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 4dca481b0..f42542ebc 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -33,6 +33,7 @@ # CONSTANTS EXTRACT_FILE_CMD = "x" EXTRACT_IR_CMD = "i" +EXTRACT_JSON_CMD = "j" # Paths CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp" @@ -84,7 +85,7 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path): self.data_dir: typing.Optional[DockerMount] = None self.logs_dir: typing.Optional[DockerMount] = None self.archives_output_dir: typing.Optional[DockerMount] = None - self.ir_output_dir: typing.Optional[DockerMount] = None + self.stream_output_dir: typing.Optional[DockerMount] = None def get_clp_home(): @@ -251,17 +252,17 @@ def generate_container_config( container_clp_config.archive_output.directory, ) - container_clp_config.ir_output.directory = pathlib.Path("/") / "mnt" / "ir-output" + container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output" if not is_path_already_mounted( clp_home, CONTAINER_CLP_HOME, - clp_config.ir_output.directory, - container_clp_config.ir_output.directory, + clp_config.stream_output.directory, + container_clp_config.stream_output.directory, ): - docker_mounts.ir_output_dir = DockerMount( + docker_mounts.stream_output_dir = DockerMount( DockerMountType.BIND, - clp_config.ir_output.directory, - container_clp_config.ir_output.directory, + clp_config.stream_output.directory, + container_clp_config.stream_output.directory, ) return container_clp_config, docker_mounts @@ -482,7 +483,7 @@ def validate_results_cache_config( def validate_worker_config(clp_config: CLPConfig): clp_config.validate_input_logs_dir() clp_config.validate_archive_output_dir() - clp_config.validate_ir_output_dir() + clp_config.validate_stream_output_dir() def validate_webui_config( diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index 1a2973fec..903107c32 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -14,6 +14,7 @@ dump_container_config, EXTRACT_FILE_CMD, EXTRACT_IR_CMD, + EXTRACT_JSON_CMD, generate_container_config, generate_container_name, generate_container_start_cmd, @@ -146,11 +147,11 @@ def handle_extract_file_cmd( return 0 -def handle_extract_ir_cmd( +def handle_extract_stream_cmd( parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path ) -> int: """ - Handles the IR extraction command. + Handles the stream extraction command. :param parsed_args: :param clp_home: :param default_config_file_path: @@ -174,29 +175,41 @@ def handle_extract_ir_cmd( ) # fmt: off + job_command = parsed_args.command extract_cmd = [ "python3", "-m", "clp_package_utils.scripts.native.decompress", "--config", str(generated_config_path_on_container), - EXTRACT_IR_CMD, - str(parsed_args.msg_ix), + job_command ] # fmt: on - if parsed_args.orig_file_id: - extract_cmd.append("--orig-file-id") - extract_cmd.append(str(parsed_args.orig_file_id)) + + if EXTRACT_IR_CMD == job_command: + extract_cmd.append(str(parsed_args.msg_ix)) + if parsed_args.orig_file_id: + extract_cmd.append("--orig-file-id") + extract_cmd.append(str(parsed_args.orig_file_id)) + else: + extract_cmd.append("--orig-file-path") + extract_cmd.append(str(parsed_args.orig_file_path)) + if parsed_args.target_uncompressed_size: + extract_cmd.append("--target-uncompressed-size") + extract_cmd.append(str(parsed_args.target_uncompressed_size)) + elif EXTRACT_JSON_CMD == job_command: + extract_cmd.append(str(parsed_args.archive_id)) + if parsed_args.target_chunk_size: + extract_cmd.append("--target-chunk-size") + extract_cmd.append(str(parsed_args.target_chunk_size)) else: - extract_cmd.append("--orig-file-path") - extract_cmd.append(str(parsed_args.orig_file_path)) - if parsed_args.target_uncompressed_size: - extract_cmd.append("--target-uncompressed-size") - extract_cmd.append(str(parsed_args.target_uncompressed_size)) + logger.error(f"Unexpected command: {job_command}") + return -1 + cmd = container_start_cmd + extract_cmd try: subprocess.run(cmd, check=True) except subprocess.CalledProcessError: - logger.exception("Docker or IR extraction command failed.") + logger.exception("Docker or stream extraction command failed.") return -1 # Remove generated files @@ -241,13 +254,20 @@ def main(argv): group.add_argument("--orig-file-id", type=str, help="Original file's ID.") group.add_argument("--orig-file-path", type=str, help="Original file's path.") + # JSON extraction command parser + json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD) + json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID") + json_extraction_parser.add_argument( + "--target-chunk-size", type=int, help="Target chunk size", default=100000 + ) + parsed_args = args_parser.parse_args(argv[1:]) command = parsed_args.command if EXTRACT_FILE_CMD == command: return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path) - elif EXTRACT_IR_CMD == command: - return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path) + elif command in (EXTRACT_IR_CMD, EXTRACT_JSON_CMD): + return handle_extract_stream_cmd(parsed_args, clp_home, default_config_file_path) else: logger.exception(f"Unexpected command: {command}") return -1 diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py index b6585b192..d4217d66d 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py @@ -12,12 +12,17 @@ from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, CLPConfig, Database from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType -from job_orchestration.scheduler.job_config import ExtractIrJobConfig +from job_orchestration.scheduler.job_config import ( + ExtractIrJobConfig, + ExtractJsonJobConfig, + QueryJobConfig, +) from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, EXTRACT_FILE_CMD, EXTRACT_IR_CMD, + EXTRACT_JSON_CMD, get_clp_home, load_config_file, ) @@ -70,45 +75,37 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]: return results[0]["orig_file_id"] -def submit_and_monitor_ir_extraction_job_in_db( +def submit_and_monitor_extraction_job_in_db( db_config: Database, - orig_file_id: str, - msg_ix: int, - target_uncompressed_size: Optional[int], + job_type: QueryJobType, + job_config: QueryJobConfig, ) -> int: """ - Submits an IR extraction job to the scheduler and waits until the job finishes. + Submits a stream extraction job to the scheduler and waits until it finishes. :param db_config: - :param orig_file_id: - :param msg_ix: - :param target_uncompressed_size: + :param job_type: + :param job_config: :return: 0 on success, -1 otherwise. """ - extract_ir_config = ExtractIrJobConfig( - orig_file_id=orig_file_id, - msg_ix=msg_ix, - target_uncompressed_size=target_uncompressed_size, - ) - sql_adapter = SQL_Adapter(db_config) - job_id = submit_query_job(sql_adapter, extract_ir_config, QueryJobType.EXTRACT_IR) + job_id = submit_query_job(sql_adapter, job_config, job_type) job_status = wait_for_query_job(sql_adapter, job_id) if QueryJobStatus.SUCCEEDED == job_status: - logger.info(f"Finished IR extraction job {job_id}.") + logger.info(f"Finished extraction job {job_id}.") return 0 - logger.error( - f"IR extraction job {job_id} finished with unexpected status: {job_status.to_str()}." - ) + logger.error(f"Extraction job {job_id} finished with unexpected status: {job_status.to_str()}.") return -1 -def handle_extract_ir_cmd( - parsed_args: argparse.Namespace, clp_home: pathlib.Path, default_config_file_path: pathlib.Path +def handle_extract_stream_cmd( + parsed_args: argparse.Namespace, + clp_home: pathlib.Path, + default_config_file_path: pathlib.Path, ) -> int: """ - Handles the IR extraction command. + Handles the stream extraction command. :param parsed_args: :param clp_home: :param default_config_file_path: @@ -121,26 +118,46 @@ def handle_extract_ir_cmd( if clp_config is None: return -1 - orig_file_id: str - if parsed_args.orig_file_id: - orig_file_id = parsed_args.orig_file_id + command = parsed_args.command + + job_config: QueryJobConfig + job_type: QueryJobType + if EXTRACT_IR_CMD == command: + job_type = QueryJobType.EXTRACT_IR + orig_file_id: str + if parsed_args.orig_file_id: + orig_file_id = parsed_args.orig_file_id + else: + orig_file_path = parsed_args.orig_file_path + orig_file_id = get_orig_file_id(clp_config.database, orig_file_path) + if orig_file_id is None: + logger.error(f"Cannot find orig_file_id corresponding to '{orig_file_path}'.") + return -1 + job_config = ExtractIrJobConfig( + orig_file_id=orig_file_id, + msg_ix=parsed_args.msg_ix, + target_uncompressed_size=parsed_args.target_uncompressed_size, + ) + elif EXTRACT_JSON_CMD == command: + job_type = QueryJobType.EXTRACT_JSON + job_config = ExtractJsonJobConfig( + archive_id=parsed_args.archive_id, target_chunk_size=parsed_args.target_chunk_size + ) else: - orig_file_id = get_orig_file_id(clp_config.database, parsed_args.orig_file_path) - if orig_file_id is None: - return -1 + logger.error(f"Unsupported stream extraction command: {command}") + return -1 try: return asyncio.run( run_function_in_process( - submit_and_monitor_ir_extraction_job_in_db, + submit_and_monitor_extraction_job_in_db, clp_config.database, - orig_file_id, - parsed_args.msg_ix, - parsed_args.target_uncompressed_size, + job_type, + job_config, ) ) except asyncio.CancelledError: - logger.error("IR extraction cancelled.") + logger.error("Stream extraction cancelled.") return -1 @@ -278,13 +295,20 @@ def main(argv): group.add_argument("--orig-file-id", type=str, help="Original file's ID.") group.add_argument("--orig-file-path", type=str, help="Original file's path.") + # JSON extraction command parser + json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD) + json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID") + json_extraction_parser.add_argument( + "--target-chunk-size", type=int, help="Target chunk size.", required=True + ) + parsed_args = args_parser.parse_args(argv[1:]) command = parsed_args.command if EXTRACT_FILE_CMD == command: return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path) - elif EXTRACT_IR_CMD == command: - return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path) + elif command in (EXTRACT_IR_CMD, EXTRACT_JSON_CMD): + return handle_extract_stream_cmd(parsed_args, clp_home, default_config_file_path) else: logger.exception(f"Unexpected command: {command}") return -1 diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 7c6de0200..a25756fd9 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -286,7 +286,7 @@ def create_results_cache_indices( "python3", str(clp_py_utils_dir / "create-results-cache-indices.py"), "--uri", container_clp_config.results_cache.get_uri(), - "--ir-collection", container_clp_config.results_cache.ir_collection_name, + "--stream-collection", container_clp_config.results_cache.stream_collection_name, ] # fmt: on @@ -660,10 +660,10 @@ def start_query_worker( celery_method = "job_orchestration.executor.query" celery_route = f"{QueueName.QUERY}" - query_worker_mount = [mounts.ir_output_dir] + query_worker_mount = [mounts.stream_output_dir] query_worker_env = { - "CLP_IR_OUTPUT_DIR": container_clp_config.ir_output.directory, - "CLP_IR_COLLECTION": clp_config.results_cache.ir_collection_name, + "CLP_STREAM_OUTPUT_DIR": container_clp_config.stream_output.directory, + "CLP_STREAM_COLLECTION_NAME": clp_config.results_cache.stream_collection_name, } generic_start_worker( @@ -710,7 +710,7 @@ def generic_start_worker( # Create necessary directories clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True) - clp_config.ir_output.directory.mkdir(parents=True, exist_ok=True) + clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True) clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" # fmt: off @@ -933,9 +933,9 @@ def start_log_viewer_webui( "MongoDbHost": clp_config.results_cache.host, "MongoDbPort": clp_config.results_cache.port, "MongoDbName": clp_config.results_cache.db_name, - "MongoDbIrFilesCollectionName": clp_config.results_cache.ir_collection_name, + "MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name, "ClientDir": str(container_log_viewer_webui_dir / "client"), - "IrFilesDir": str(container_clp_config.ir_output.directory), + "StreamFilesDir": str(container_clp_config.stream_output.directory), "LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"), } settings_json = read_and_update_settings_json(settings_json_path, settings_json_updates) @@ -961,7 +961,7 @@ def start_log_viewer_webui( # fmt: on necessary_mounts = [ mounts.clp_home, - mounts.ir_output_dir, + mounts.stream_output_dir, ] for mount in necessary_mounts: if mount: diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index f5a813057..4bb00bd9e 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -270,7 +270,7 @@ class ResultsCache(BaseModel): host: str = "localhost" port: int = 27017 db_name: str = "clp-query-results" - ir_collection_name: str = "ir-files" + stream_collection_name: str = "stream-files" @validator("host") def validate_host(cls, field): @@ -284,10 +284,12 @@ def validate_db_name(cls, field): raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.db_name cannot be empty.") return field - @validator("ir_collection_name") - def validate_ir_collection_name(cls, field): + @validator("stream_collection_name") + def validate_stream_collection_name(cls, field): if "" == field: - raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.ir_collection_name cannot be empty.") + raise ValueError( + f"{RESULTS_CACHE_COMPONENT_NAME}.stream_collection_name cannot be empty." + ) return field def get_uri(self): @@ -343,8 +345,8 @@ def dump_to_primitive_dict(self): return d -class IrOutput(BaseModel): - directory: pathlib.Path = pathlib.Path("var") / "data" / "ir" +class StreamOutput(BaseModel): + directory: pathlib.Path = pathlib.Path("var") / "data" / "stream" target_uncompressed_size: int = 128 * 1024 * 1024 @validator("directory") @@ -425,7 +427,7 @@ class CLPConfig(BaseModel): credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH archive_output: ArchiveOutput = ArchiveOutput() - ir_output: IrOutput = IrOutput() + stream_output: StreamOutput = StreamOutput() data_directory: pathlib.Path = pathlib.Path("var") / "data" logs_directory: pathlib.Path = pathlib.Path("var") / "log" @@ -435,7 +437,7 @@ def make_config_paths_absolute(self, clp_home: pathlib.Path): self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory) self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path) self.archive_output.make_config_paths_absolute(clp_home) - self.ir_output.make_config_paths_absolute(clp_home) + self.stream_output.make_config_paths_absolute(clp_home) self.data_directory = make_config_path_absolute(clp_home, self.data_directory) self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory) self._os_release_file_path = make_config_path_absolute(clp_home, self._os_release_file_path) @@ -455,11 +457,11 @@ def validate_archive_output_dir(self): except ValueError as ex: raise ValueError(f"archive_output.directory is invalid: {ex}") - def validate_ir_output_dir(self): + def validate_stream_output_dir(self): try: - validate_path_could_be_dir(self.ir_output.directory) + validate_path_could_be_dir(self.stream_output.directory) except ValueError as ex: - raise ValueError(f"ir_output.directory is invalid: {ex}") + raise ValueError(f"stream_output.directory is invalid: {ex}") def validate_data_dir(self): try: @@ -528,7 +530,7 @@ def load_redis_credentials_from_file(self): def dump_to_primitive_dict(self): d = self.dict() d["archive_output"] = self.archive_output.dump_to_primitive_dict() - d["ir_output"] = self.ir_output.dump_to_primitive_dict() + d["stream_output"] = self.stream_output.dump_to_primitive_dict() # Turn paths into primitive strings d["input_logs_directory"] = str(self.input_logs_directory) d["credentials_file_path"] = str(self.credentials_file_path) diff --git a/components/clp-py-utils/clp_py_utils/create-results-cache-indices.py b/components/clp-py-utils/clp_py_utils/create-results-cache-indices.py index dafbd3bde..db03e9632 100644 --- a/components/clp-py-utils/clp_py_utils/create-results-cache-indices.py +++ b/components/clp-py-utils/clp_py_utils/create-results-cache-indices.py @@ -18,19 +18,21 @@ def main(argv): args_parser = argparse.ArgumentParser(description="Creates results cache indices for CLP.") args_parser.add_argument("--uri", required=True, help="URI of the results cache.") - args_parser.add_argument("--ir-collection", required=True, help="Collection for IR metadata.") + args_parser.add_argument( + "--stream-collection", required=True, help="Collection for stream metadata." + ) parsed_args = args_parser.parse_args(argv[1:]) results_cache_uri = parsed_args.uri - ir_collection_name = parsed_args.ir_collection + stream_collection_name = parsed_args.stream_collection try: with MongoClient(results_cache_uri) as results_cache_client: - ir_collection = results_cache_client.get_default_database()[ir_collection_name] + stream_collection = results_cache_client.get_default_database()[stream_collection_name] file_split_id_index = IndexModel(["file_split_id"]) orig_file_id_index = IndexModel(["orig_file_id", "begin_msg_ix", "end_msg_ix"]) - ir_collection.create_indexes([file_split_id_index, orig_file_id_index]) + stream_collection.create_indexes([file_split_id_index, orig_file_id_index]) except Exception: logger.exception("Failed to create clp results cache indices.") return -1 diff --git a/components/job-orchestration/job_orchestration/executor/query/celeryconfig.py b/components/job-orchestration/job_orchestration/executor/query/celeryconfig.py index 994c0bbcf..6cf97dbd0 100644 --- a/components/job-orchestration/job_orchestration/executor/query/celeryconfig.py +++ b/components/job-orchestration/job_orchestration/executor/query/celeryconfig.py @@ -4,12 +4,12 @@ imports = ( "job_orchestration.executor.query.fs_search_task", - "job_orchestration.executor.query.extract_ir_task", + "job_orchestration.executor.query.extract_stream_task", ) task_routes = { "job_orchestration.executor.query.fs_search_task.search": QueueName.QUERY, - "job_orchestration.executor.query.extract_ir_task.extract_ir": QueueName.QUERY, + "job_orchestration.executor.query.extract_stream_task.extract_stream": QueueName.QUERY, } task_create_missing_queues = True diff --git a/components/job-orchestration/job_orchestration/executor/query/extract_ir_task.py b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py similarity index 69% rename from components/job-orchestration/job_orchestration/executor/query/extract_ir_task.py rename to components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py index 61fcbf549..9e99842ab 100644 --- a/components/job-orchestration/job_orchestration/executor/query/extract_ir_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py @@ -13,7 +13,7 @@ report_command_creation_failure, run_query_task, ) -from job_orchestration.scheduler.job_config import ExtractIrJobConfig +from job_orchestration.scheduler.job_config import ExtractIrJobConfig, ExtractJsonJobConfig from job_orchestration.scheduler.scheduler_data import QueryTaskStatus # Setup logging @@ -25,12 +25,14 @@ def make_command( clp_home: Path, archives_dir: Path, archive_id: str, - ir_output_dir: Path, - extract_ir_config: ExtractIrJobConfig, + stream_output_dir: Path, + job_config: dict, results_cache_uri: str, - ir_collection: str, + stream_collection_name: str, ) -> Optional[List[str]]: if StorageEngine.CLP == storage_engine: + logger.info("Starting IR extraction") + extract_ir_config = ExtractIrJobConfig.parse_obj(job_config) if not extract_ir_config.file_split_id: logger.error("file_split_id not supplied") return None @@ -39,13 +41,32 @@ def make_command( "i", str(archives_dir / archive_id), extract_ir_config.file_split_id, - str(ir_output_dir), + str(stream_output_dir), results_cache_uri, - ir_collection, + stream_collection_name, ] if extract_ir_config.target_uncompressed_size is not None: command.append("--target-size") command.append(str(extract_ir_config.target_uncompressed_size)) + elif StorageEngine.CLP_S == storage_engine: + logger.info("Starting JSON extraction") + extract_json_config = ExtractJsonJobConfig.parse_obj(job_config) + command = [ + str(clp_home / "bin" / "clp-s"), + "x", + str(archives_dir), + str(stream_output_dir), + "--ordered", + "--archive-id", + archive_id, + "--mongodb-uri", + results_cache_uri, + "--mongodb-collection", + stream_collection_name, + ] + if extract_json_config.target_chunk_size is not None: + command.append("--ordered-chunk-size") + command.append(str(extract_json_config.target_chunk_size)) else: logger.error(f"Unsupported storage engine {storage_engine}") return None @@ -54,16 +75,16 @@ def make_command( @app.task(bind=True) -def extract_ir( +def extract_stream( self: Task, job_id: str, task_id: int, - job_config_obj: dict, + job_config: dict, archive_id: str, clp_metadata_db_conn_params: dict, results_cache_uri: str, ) -> Dict[str, Any]: - task_name = "IR extraction" + task_name = "Stream Extraction" # Setup logging to file clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR")) @@ -80,19 +101,18 @@ def extract_ir( clp_home = Path(os.getenv("CLP_HOME")) archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE") - ir_output_dir = Path(os.getenv("CLP_IR_OUTPUT_DIR")) - ir_collection = os.getenv("CLP_IR_COLLECTION") - extract_ir_config = ExtractIrJobConfig.parse_obj(job_config_obj) + stream_output_dir = Path(os.getenv("CLP_STREAM_OUTPUT_DIR")) + stream_collection_name = os.getenv("CLP_STREAM_COLLECTION_NAME") task_command = make_command( storage_engine=clp_storage_engine, clp_home=clp_home, archives_dir=archive_directory, archive_id=archive_id, - ir_output_dir=ir_output_dir, - extract_ir_config=extract_ir_config, + stream_output_dir=stream_output_dir, + job_config=job_config, results_cache_uri=results_cache_uri, - ir_collection=ir_collection, + stream_collection_name=stream_collection_name, ) if not task_command: return report_command_creation_failure( diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index 162056220..598bfdcfc 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -98,7 +98,7 @@ def search( self: Task, job_id: str, task_id: int, - job_config_obj: dict, + job_config: dict, archive_id: str, clp_metadata_db_conn_params: dict, results_cache_uri: str, @@ -120,7 +120,7 @@ def search( clp_home = Path(os.getenv("CLP_HOME")) archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE") - search_config = SearchJobConfig.parse_obj(job_config_obj) + search_config = SearchJobConfig.parse_obj(job_config) task_command = make_command( storage_engine=clp_storage_engine, diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index 131719148..cd85016a3 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -72,6 +72,7 @@ def to_str(self) -> str: class QueryJobType(IntEnum): SEARCH_OR_AGGREGATION = 0 EXTRACT_IR = auto() + EXTRACT_JSON = auto() def __str__(self) -> str: return str(self.value) diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index e90e2ee7f..7cf8b2324 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -49,6 +49,11 @@ class ExtractIrJobConfig(QueryJobConfig): target_uncompressed_size: typing.Optional[int] = None +class ExtractJsonJobConfig(QueryJobConfig): + archive_id: str + target_chunk_size: typing.Optional[int] = None + + class SearchJobConfig(QueryJobConfig): query_string: str max_num_results: int diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 2a0f855a3..a9c60f380 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -23,6 +23,7 @@ import os import pathlib import sys +from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @@ -39,10 +40,14 @@ from clp_py_utils.core import read_yaml_config_file from clp_py_utils.decorators import exception_default_value from clp_py_utils.sql_adapter import SQL_Adapter -from job_orchestration.executor.query.extract_ir_task import extract_ir +from job_orchestration.executor.query.extract_stream_task import extract_stream from job_orchestration.executor.query.fs_search_task import search from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType, QueryTaskStatus -from job_orchestration.scheduler.job_config import ExtractIrJobConfig, SearchJobConfig +from job_orchestration.scheduler.job_config import ( + ExtractIrJobConfig, + ExtractJsonJobConfig, + SearchJobConfig, +) from job_orchestration.scheduler.query.reducer_handler import ( handle_reducer_connection, ReducerHandlerMessage, @@ -51,6 +56,7 @@ ) from job_orchestration.scheduler.scheduler_data import ( ExtractIrJob, + ExtractJsonJob, InternalJobState, QueryJob, QueryTaskResult, @@ -67,9 +73,118 @@ # Dictionary that maps IDs of file splits being extracted to IDs of jobs waiting for them active_file_split_ir_extractions: Dict[str, List[str]] = {} +# Dictionary that maps IDs of clp-s archives being extracted to IDs of jobs waiting for them +active_archive_json_extractions: Dict[str, List[str]] = {} + reducer_connection_queue: Optional[asyncio.Queue] = None +class StreamExtractionHandle(ABC): + def __init__(self, job_id: str): + self._job_id = job_id + self._archive_id: Optional[str] = None + + def get_archive_id(self) -> Optional[str]: + return self._archive_id + + @abstractmethod + def get_stream_id(self) -> str: ... + + @abstractmethod + def is_stream_extraction_active(self) -> bool: ... + + @abstractmethod + def is_stream_extracted(self, results_cache_uri: str, stream_collection_name: str) -> bool: ... + + @abstractmethod + def mark_job_as_waiting(self) -> None: ... + + @abstractmethod + def create_stream_extraction_job(self) -> QueryJob: ... + + +class IrExtractionHandle(StreamExtractionHandle): + def __init__(self, job_id: str, job_config: Dict[str, Any], db_conn): + super().__init__(job_id) + self.__job_config = ExtractIrJobConfig.parse_obj(job_config) + self._archive_id, self.__file_split_id = get_archive_and_file_split_ids_for_ir_extraction( + db_conn, self.__job_config + ) + if self._archive_id is None: + raise ValueError("Job parameters don't resolve to an existing archive") + + self.__job_config.file_split_id = self.__file_split_id + + def get_stream_id(self) -> str: + return self.__file_split_id + + def is_stream_extraction_active(self) -> bool: + return self.__file_split_id in active_file_split_ir_extractions + + def is_stream_extracted(self, results_cache_uri: str, stream_collection_name: str) -> bool: + return document_exists( + results_cache_uri, stream_collection_name, "file_split_id", self.__file_split_id + ) + + def mark_job_as_waiting(self) -> None: + global active_file_split_ir_extractions + file_split_id = self.__file_split_id + if file_split_id not in active_file_split_ir_extractions: + active_file_split_ir_extractions[file_split_id] = [] + active_file_split_ir_extractions[file_split_id].append(self._job_id) + + def create_stream_extraction_job(self) -> QueryJob: + logger.info( + f"Creating IR extraction job {self._job_id} for file_split: {self.__file_split_id}" + ) + return ExtractIrJob( + id=self._job_id, + extract_ir_config=self.__job_config, + state=InternalJobState.WAITING_FOR_DISPATCH, + ) + + +class JsonExtractionHandle(StreamExtractionHandle): + def __init__(self, job_id: str, job_config: Dict[str, Any], db_conn): + super().__init__(job_id) + self.__job_config = ExtractJsonJobConfig.parse_obj(job_config) + self._archive_id = self.__job_config.archive_id + if not archive_exists(db_conn, self._archive_id): + raise ValueError(f"Archive {self._archive_id} doesn't exist") + + def get_stream_id(self) -> str: + return self._archive_id + + def is_stream_extraction_active(self) -> bool: + return self._archive_id in active_archive_json_extractions + + def is_stream_extracted(self, results_cache_uri: str, stream_collection_name: str) -> bool: + return document_exists( + results_cache_uri, stream_collection_name, "orig_file_id", self._archive_id + ) + + def mark_job_as_waiting(self) -> None: + global active_archive_json_extractions + archive_id = self._archive_id + if archive_id not in active_archive_json_extractions: + active_archive_json_extractions[archive_id] = [] + active_archive_json_extractions[archive_id].append(self._job_id) + + def create_stream_extraction_job(self) -> QueryJob: + logger.info(f"Creating json extraction job {self._job_id} on archive: {self._archive_id}") + return ExtractJsonJob( + id=self._job_id, + extract_json_config=self.__job_config, + state=InternalJobState.WAITING_FOR_DISPATCH, + ) + + +def document_exists(mongodb_uri, collection_name, field, value): + with pymongo.MongoClient(mongodb_uri) as mongo_client: + collection = mongo_client.get_default_database()[collection_name] + return 0 != collection.count_documents({field: value}) + + def cancel_job_except_reducer(job: SearchJob): """ Cancels the job apart from releasing the reducer since that requires an async call. @@ -282,7 +397,7 @@ def get_archives_for_search( return archives_for_search -def get_archive_and_file_split_ids_for_extraction( +def get_archive_and_file_split_ids_for_ir_extraction( db_conn, extract_ir_config: ExtractIrJobConfig, ) -> Tuple[Optional[str], Optional[str]]: @@ -333,6 +448,23 @@ def get_archive_and_file_split_ids( return results +@exception_default_value(default=False) +def archive_exists( + db_conn, + archive_id: str, +) -> bool: + query = f"""SELECT 1 + FROM {CLP_METADATA_TABLE_PREFIX}archives WHERE + id = %s + """ + with contextlib.closing(db_conn.cursor(dictionary=True)) as cursor: + cursor.execute(query, (archive_id,)) + if cursor.fetchone(): + return True + + return False + + def get_task_group_for_job( archive_ids: List[str], task_ids: List[int], @@ -340,7 +472,7 @@ def get_task_group_for_job( clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, ): - job_config_obj = job.get_config().dict() + job_config = job.get_config().dict() job_type = job.get_type() if QueryJobType.SEARCH_OR_AGGREGATION == job_type: return celery.group( @@ -348,19 +480,19 @@ def get_task_group_for_job( job_id=job.id, archive_id=archive_ids[i], task_id=task_ids[i], - job_config_obj=job_config_obj, + job_config=job_config, clp_metadata_db_conn_params=clp_metadata_db_conn_params, results_cache_uri=results_cache_uri, ) for i in range(len(archive_ids)) ) - elif QueryJobType.EXTRACT_IR == job_type: + elif job_type in (QueryJobType.EXTRACT_JSON, QueryJobType.EXTRACT_IR): return celery.group( - extract_ir.s( + extract_stream.s( job_id=job.id, archive_id=archive_ids[i], task_id=task_ids[i], - job_config_obj=job_config_obj, + job_config=job_config, clp_metadata_db_conn_params=clp_metadata_db_conn_params, results_cache_uri=results_cache_uri, ) @@ -466,11 +598,10 @@ def handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, - ir_collection_name: str, + stream_collection_name: str, num_archives_to_search_per_sub_job: int, ) -> List[asyncio.Task]: global active_jobs - global active_file_split_ir_extractions reducer_acquisition_tasks = [] pending_search_jobs = [ @@ -484,14 +615,14 @@ def handle_pending_query_jobs( for job in fetch_new_query_jobs(db_conn): job_id = str(job["job_id"]) job_type = job["type"] - job_config = job["job_config"] + job_config = msgpack.unpackb(job["job_config"]) if QueryJobType.SEARCH_OR_AGGREGATION == job_type: # Avoid double-dispatch when a job is WAITING_FOR_REDUCER if job_id in active_jobs: continue - search_config = SearchJobConfig.parse_obj(msgpack.unpackb(job_config)) + search_config = SearchJobConfig.parse_obj(job_config) archives_for_search = get_archives_for_search(db_conn, search_config) if len(archives_for_search) == 0: if set_job_or_task_status( @@ -527,12 +658,15 @@ def handle_pending_query_jobs( pending_search_jobs.append(new_search_job) active_jobs[job_id] = new_search_job - elif QueryJobType.EXTRACT_IR == job_type: - extract_ir_config = ExtractIrJobConfig.parse_obj(msgpack.unpackb(job_config)) - archive_id, file_split_id = get_archive_and_file_split_ids_for_extraction( - db_conn, extract_ir_config - ) - if not archive_id or not file_split_id: + elif job_type in (QueryJobType.EXTRACT_IR, QueryJobType.EXTRACT_JSON): + job_handle: StreamExtractionHandle + try: + if QueryJobType.EXTRACT_IR == job_type: + job_handle = IrExtractionHandle(job_id, job_config, db_conn) + else: + job_handle = JsonExtractionHandle(job_id, job_config, db_conn) + except ValueError: + logger.exception("Failed to initialize extraction job handle") if not set_job_or_task_status( db_conn, QUERY_JOBS_TABLE_NAME, @@ -546,19 +680,29 @@ def handle_pending_query_jobs( logger.error(f"Failed to set job {job_id} as failed") continue - # NOTE: The following two if blocks should not be reordered since if we first check - # whether *an* IR file has been extracted for the requested file split, it doesn't - # mean that *all* IR files have has been extracted for the file split (since the - # extraction job may still be in progress). Thus, we must first check whether the - # file split is in the process of being extracted, and then check whether it's - # already been extracted. - - # Check if the file split is currently being extracted; if so, add the job ID to the - # list of jobs waiting for it. - if file_split_id in active_file_split_ir_extractions: - active_file_split_ir_extractions[file_split_id].append(job_id) + # NOTE: The following two if blocks for `is_stream_extraction_active` and + # `is_stream_extracted` should not be reordered. + # + # The logic below works as follows: + # 1. It checks if a stream is already being extracted + # (`is_stream_extraction_active`) and if so, it marks the new job as waiting for + # the old job to finish. + # 2. Otherwise, it checks if a stream has already been extracted + # (`is_stream_extracted`) and if so, it marks the new job as complete. + # 3. Otherwise, it creates a new stream extraction job. + # + # `is_stream_extracted` only checks if a single stream has been extracted rather + # than whether all required streams have been extracted. This means that we can't + # use it to check if the old job is complete; instead, we need to employ the + # aforementioned logic. + + # Check if the required streams are currently being extracted; if so, add the job ID + # to the list of jobs waiting for it. + if job_handle.is_stream_extraction_active(): + job_handle.mark_job_as_waiting() logger.info( - f"Split {file_split_id} is being extracted, so mark job {job_id} as running" + f"Stream {job_handle.get_stream_id()} is already being extracted," + f" so mark job {job_id} as running." ) if not set_job_or_task_status( db_conn, @@ -572,12 +716,11 @@ def handle_pending_query_jobs( logger.error(f"Failed to set job {job_id} as running") continue - # Check if the file split has already been extracted - if ir_file_exists_for_file_split( - results_cache_uri, ir_collection_name, file_split_id - ): + # Check if a required stream file has already been extracted + if job_handle.is_stream_extracted(results_cache_uri, stream_collection_name): logger.info( - f"Split {file_split_id} already extracted, so mark job {job_id} as done" + f"Stream {job_handle.get_stream_id()} already extracted," + f" so mark job {job_id} as succeeded." ) if not set_job_or_task_status( db_conn, @@ -592,27 +735,20 @@ def handle_pending_query_jobs( logger.error(f"Failed to set job {job_id} as succeeded") continue - active_file_split_ir_extractions[file_split_id] = [job_id] - extract_ir_config.file_split_id = file_split_id - new_extract_ir_job = ExtractIrJob( - id=job_id, - archive_id=archive_id, - file_split_id=file_split_id, - extract_ir_config=extract_ir_config, - state=InternalJobState.WAITING_FOR_DISPATCH, - ) - target_archive = [new_extract_ir_job.archive_id] - + new_stream_extraction_job = job_handle.create_stream_extraction_job() + archive_id = job_handle.get_archive_id() dispatch_job_and_update_db( db_conn, - new_extract_ir_job, - target_archive, + new_stream_extraction_job, + [archive_id], clp_metadata_db_conn_params, results_cache_uri, 1, ) - active_jobs[new_extract_ir_job.id] = new_extract_ir_job - logger.info(f"Dispatched IR extraction job {job_id} on archive: {archive_id}") + + job_handle.mark_job_as_waiting() + active_jobs[job_id] = new_stream_extraction_job + logger.info(f"Dispatched stream extraction job {job_id} for archive: {archive_id}") else: # NOTE: We're skipping the job for this iteration, but its status will remain @@ -685,15 +821,6 @@ def found_max_num_latest_results( return max_timestamp_in_remaining_archives <= min_timestamp_in_top_results -def ir_file_exists_for_file_split( - results_cache_uri: str, ir_collection_name: str, file_split_id: str -): - with pymongo.MongoClient(results_cache_uri) as results_cache_client: - ir_collection = results_cache_client.get_default_database()[ir_collection_name] - results_count = ir_collection.count_documents({"file_split_id": file_split_id}) - return 0 != results_count - - async def handle_finished_search_job( db_conn, job: SearchJob, task_results: Optional[Any], results_cache_uri: str ) -> None: @@ -780,19 +907,20 @@ async def handle_finished_search_job( del active_jobs[job_id] -async def handle_finished_extract_ir_job( - db_conn, job: ExtractIrJob, task_results: Optional[Any] +async def handle_finished_stream_extraction_job( + db_conn, job: QueryJob, task_results: List[Any] ) -> None: global active_jobs + global active_archive_json_extractions global active_file_split_ir_extractions job_id = job.id - file_split_id = job.file_split_id new_job_status = QueryJobStatus.SUCCEEDED + num_tasks = len(task_results) if 1 != num_tasks: logger.error( - f"Unexpected number of tasks for IR extraction job {job_id}. " + f"Unexpected number of tasks for extraction job {job_id}. " f"Expected 1, got {num_tasks}." ) new_job_status = QueryJobStatus.FAILED @@ -801,13 +929,13 @@ async def handle_finished_extract_ir_job( task_id = task_result.task_id if not QueryJobStatus.SUCCEEDED == task_result.status: logger.error( - f"IR extraction task job-{job_id}-task-{task_id} failed. " + f"Extraction task job-{job_id}-task-{task_id} failed. " f"Check {task_result.error_log_path} for details." ) new_job_status = QueryJobStatus.FAILED else: logger.info( - f"IR extraction task job-{job_id}-task-{task_id} succeeded in " + f"Extraction task job-{job_id}-task-{task_id} succeeded in " f"{task_result.duration} second(s)." ) @@ -821,11 +949,18 @@ async def handle_finished_extract_ir_job( duration=(datetime.datetime.now() - job.start_time).total_seconds(), ): if new_job_status == QueryJobStatus.SUCCEEDED: - logger.info(f"Completed IR extraction job {job_id}.") + logger.info(f"Completed stream extraction job {job_id}.") else: - logger.info(f"Completed IR extraction job {job_id} with failing tasks.") + logger.info(f"Completed stream extraction job {job_id} with failing tasks.") + + waiting_jobs: List[str] + if QueryJobType.EXTRACT_IR == job.get_type(): + extract_ir_config: ExtractIrJobConfig = job.get_config() + waiting_jobs = active_file_split_ir_extractions.pop(extract_ir_config.file_split_id) + else: + extract_json_config: ExtractJsonJobConfig = job.get_config() + waiting_jobs = active_archive_json_extractions.pop(extract_json_config.archive_id) - waiting_jobs = active_file_split_ir_extractions[file_split_id] waiting_jobs.remove(job_id) for waiting_job in waiting_jobs: logger.info(f"Setting status to {new_job_status.to_str()} for waiting jobs: {waiting_job}.") @@ -839,7 +974,6 @@ async def handle_finished_extract_ir_job( duration=(datetime.datetime.now() - job.start_time).total_seconds(), ) - del active_file_split_ir_extractions[file_split_id] del active_jobs[job_id] @@ -880,9 +1014,8 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): await handle_finished_search_job( db_conn, search_job, returned_results, results_cache_uri ) - elif QueryJobType.EXTRACT_IR == job_type: - extract_ir_job: ExtractIrJob = job - await handle_finished_extract_ir_job(db_conn, extract_ir_job, returned_results) + elif job_type in (QueryJobType.EXTRACT_JSON, QueryJobType.EXTRACT_IR): + await handle_finished_stream_extraction_job(db_conn, job, returned_results) else: logger.error(f"Unexpected job type: {job_type}, skipping job {job_id}") @@ -898,7 +1031,7 @@ async def handle_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, - ir_collection_name: str, + stream_collection_name: str, jobs_poll_delay: float, num_archives_to_search_per_sub_job: int, ) -> None: @@ -912,7 +1045,7 @@ async def handle_jobs( db_conn_pool, clp_metadata_db_conn_params, results_cache_uri, - ir_collection_name, + stream_collection_name, num_archives_to_search_per_sub_job, ) if 0 == len(reducer_acquisition_tasks): @@ -996,7 +1129,7 @@ async def main(argv: List[str]) -> int: True ), results_cache_uri=clp_config.results_cache.get_uri(), - ir_collection_name=clp_config.results_cache.ir_collection_name, + stream_collection_name=clp_config.results_cache.stream_collection_name, jobs_poll_delay=clp_config.query_scheduler.jobs_poll_delay, num_archives_to_search_per_sub_job=batch_size, ) diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 5ef92a5d6..4f49a7c1a 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -11,6 +11,7 @@ ) from job_orchestration.scheduler.job_config import ( ExtractIrJobConfig, + ExtractJsonJobConfig, QueryJobConfig, SearchJobConfig, ) @@ -59,8 +60,6 @@ def get_config(self) -> QueryJobConfig: ... class ExtractIrJob(QueryJob): extract_ir_config: ExtractIrJobConfig - file_split_id: str - archive_id: str def get_type(self) -> QueryJobType: return QueryJobType.EXTRACT_IR @@ -69,6 +68,16 @@ def get_config(self) -> QueryJobConfig: return self.extract_ir_config +class ExtractJsonJob(QueryJob): + extract_json_config: ExtractJsonJobConfig + + def get_type(self) -> QueryJobType: + return QueryJobType.EXTRACT_JSON + + def get_config(self) -> QueryJobConfig: + return self.extract_json_config + + class SearchJob(QueryJob): search_config: SearchJobConfig num_archives_to_search: int diff --git a/components/log-viewer-webui/client/src/ui/QueryStatus.jsx b/components/log-viewer-webui/client/src/ui/QueryStatus.jsx index c1ad9eb9e..c1bb639a6 100644 --- a/components/log-viewer-webui/client/src/ui/QueryStatus.jsx +++ b/components/log-viewer-webui/client/src/ui/QueryStatus.jsx @@ -51,7 +51,7 @@ const QueryStatus = () => { setQueryState(QUERY_LOADING_STATES.LOADING); const innerLogEventNum = logEventIdx - data.begin_msg_ix + 1; - window.location = `/log-viewer/index.html?filePath=/ir/${data.path}` + + window.location = `/log-viewer/index.html?filePath=/streams/${data.path}` + `#logEventNum=${innerLogEventNum}`; }) .catch((e) => { diff --git a/components/log-viewer-webui/server/.env b/components/log-viewer-webui/server/.env index b66dc997b..95e3054a8 100644 --- a/components/log-viewer-webui/server/.env +++ b/components/log-viewer-webui/server/.env @@ -1,5 +1,5 @@ CLIENT_DIR=../client/dist -IR_DATA_DIR=../../../build/clp-package/var/data/ir +STREAMS_DATA_DIR=../../../build/clp-package/var/data/streams LOG_VIEWER_DIR=../yscope-log-viewer/dist HOST=localhost diff --git a/components/log-viewer-webui/server/settings.json b/components/log-viewer-webui/server/settings.json index bb1aac48a..163f9a9e2 100644 --- a/components/log-viewer-webui/server/settings.json +++ b/components/log-viewer-webui/server/settings.json @@ -6,9 +6,9 @@ "MongoDbHost": "localhost", "MongoDbPort": 27017, "MongoDbName": "clp-query-results", - "MongoDbIrFilesCollectionName": "ir-files", + "MongoDbStreamFilesCollectionName": "stream-files", "ClientDir": "../client/dist", - "IrFilesDir": "../../../build/clp-package/var/data/ir", + "StreamFilesDir": "../../../build/clp-package/var/data/streams", "LogViewerDir": "../yscope-log-viewer/dist" } diff --git a/components/log-viewer-webui/server/src/DbManager.js b/components/log-viewer-webui/server/src/DbManager.js index 5cb442795..79e097280 100644 --- a/components/log-viewer-webui/server/src/DbManager.js +++ b/components/log-viewer-webui/server/src/DbManager.js @@ -84,7 +84,7 @@ class DbManager { /** * @type {import("mongodb").Collection} */ - #irFilesCollection; + #streamFilesCollection; #queryJobsTableName; @@ -138,7 +138,7 @@ class DbManager { * @return {Promise} A promise that resolves to the extracted IR file's metadata. */ async getExtractedIrFileMetadata (origFileId, logEventIdx) { - return await this.#irFilesCollection.findOne({ + return await this.#streamFilesCollection.findOne({ orig_file_id: origFileId, begin_msg_ix: {$lte: logEventIdx}, end_msg_ix: {$gt: logEventIdx}, @@ -177,7 +177,7 @@ class DbManager { * @param {string} config.host * @param {number} config.port * @param {string} config.database - * @param {string} config.irFilesCollectionName + * @param {string} config.StreamFilesCollectionName */ #initMongo (config) { this.#fastify.register(fastifyMongo, { @@ -187,8 +187,8 @@ class DbManager { if (err) { throw err; } - this.#irFilesCollection = - this.#fastify.mongo.db.collection(config.irFilesCollectionName); + this.#streamFilesCollection = + this.#fastify.mongo.db.collection(config.streamFilesCollectionName); }); } diff --git a/components/log-viewer-webui/server/src/app.js b/components/log-viewer-webui/server/src/app.js index 69918ea58..5d351fd39 100644 --- a/components/log-viewer-webui/server/src/app.js +++ b/components/log-viewer-webui/server/src/app.js @@ -37,7 +37,7 @@ const app = async ({ mongoConfig: { database: settings.MongoDbName, host: settings.MongoDbHost, - irFilesCollectionName: settings.MongoDbIrFilesCollectionName, + streamFilesCollectionName: settings.MongoDbStreamFilesCollectionName, port: settings.MongoDbPort, }, }); diff --git a/components/log-viewer-webui/server/src/routes/static.js b/components/log-viewer-webui/server/src/routes/static.js index 42d9048f0..6118c2855 100644 --- a/components/log-viewer-webui/server/src/routes/static.js +++ b/components/log-viewer-webui/server/src/routes/static.js @@ -18,13 +18,13 @@ const routes = async (fastify, options) => { const dirname = path.dirname(filename); const rootDirname = path.resolve(dirname, "../.."); - let irFilesDir = settings.IrFilesDir; - if (false === path.isAbsolute(irFilesDir)) { - irFilesDir = path.resolve(rootDirname, irFilesDir); + let streamFilesDir = settings.StreamFilesDir; + if (false === path.isAbsolute(streamFilesDir)) { + streamFilesDir = path.resolve(rootDirname, streamFilesDir); } await fastify.register(fastifyStatic, { - prefix: "/ir", - root: irFilesDir, + prefix: "/streams", + root: streamFilesDir, }); let logViewerDir = settings.LogViewerDir; diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index cb66f40cd..15747fe42 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -47,7 +47,7 @@ # host: "localhost" # port: 27017 # db_name: "clp-query-results" -# ir_collection_name: "ir-files" +# stream_collection_name: "stream-files" # #compression_worker: # logging_level: "INFO" @@ -82,9 +82,9 @@ # # How much data CLP should try to fit into each segment within an archive # target_segment_size: 268435456 # 256 MB # -## Where CLP IR files should be output -#ir_output: -# directory: "var/data/ir" +## Where CLP stream files (e.g., IR streams) should be output +#stream_output: +# directory: "var/data/streams" # # # How large each IR file should be before being split into a new IR file # target_uncompressed_size: 134217728 # 128 MB