diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index ccbb7af76..b7bb77f8d 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -9,10 +9,10 @@ import socket import sys import time -from asyncio import StreamReader, StreamWriter from contextlib import closing import msgpack +import pymongo import zstandard from clp_package_utils.general import ( @@ -20,7 +20,11 @@ validate_and_load_config_file, get_clp_home ) -from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, Database +from clp_py_utils.clp_config import ( + CLP_METADATA_TABLE_PREFIX, + Database, + ResultsCache +) from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.job_config import SearchConfig from job_orchestration.scheduler.constants import JobStatus @@ -69,10 +73,10 @@ def process_error_callback(err): pool.close() -def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str, - begin_timestamp: int | None, end_timestamp: int | None, - path_filter: str, search_controller_host: str, - search_controller_port: int): +def create_and_monitor_job_in_db(db_config: Database, results_cache: ResultsCache, + wildcard_query: str, begin_timestamp: int | None, + end_timestamp: int | None, path_filter: str, + search_controller_host: str, search_controller_port: int): search_config = SearchConfig( search_controller_host=search_controller_host, search_controller_port=search_controller_port, @@ -84,14 +88,14 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str, sql_adapter = SQL_Adapter(db_config) zstd_cctx = zstandard.ZstdCompressor(level=3) - with closing(sql_adapter.create_connection(True)) as db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor: + with closing(sql_adapter.create_connection(True)) as \ + db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor: # Create job db_cursor.execute(f"INSERT INTO `search_jobs` (`search_config`) VALUES (%s)", (zstd_cctx.compress(msgpack.packb(search_config.dict())),)) db_conn.commit() job_id = db_cursor.lastrowid - # Create a task for each archive, in batches next_pagination_id = 0 pagination_limit = 64 num_tasks_added = 0 @@ -147,32 +151,29 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str, job_complete = True db_conn.commit() - time.sleep(1) + time.sleep(0.5) + + with pymongo.MongoClient(results_cache.get_uri()) as client: + search_results_collection = client[results_cache.db_name][str(job_id)] + for document in search_results_collection.find(): + print(f"{document['original_path']}: {document['message']}", end='') -async def worker_connection_handler(reader: StreamReader, writer: StreamWriter): +async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): try: - unpacker = msgpack.Unpacker() - while True: - # Read some data from the worker and feed it to msgpack - buf = await reader.read(1024) - if b'' == buf: - # Worker closed - return - unpacker.feed(buf) - - # Print out any messages we can decode - for unpacked in unpacker: - print(f"{unpacked[0]}: {unpacked[2]}", end='') + buf = await reader.read(1024) + if b'' == buf: + # Worker closed + return except asyncio.CancelledError: return finally: writer.close() -async def do_search(db_config: Database, wildcard_query: str, begin_timestamp: int | None, - end_timestamp: int | None, path_filter: str, host: str): - # Start server to receive and print results +async def do_search(db_config: Database, results_cache: ResultsCache, wildcard_query: str, + begin_timestamp: int | None, end_timestamp: int | None, path_filter: str, host: str): + # Start a server try: server = await asyncio.start_server(client_connected_cb=worker_connection_handler, host=host, port=0, family=socket.AF_INET) @@ -184,7 +185,7 @@ async def do_search(db_config: Database, wildcard_query: str, begin_timestamp: i server_task = asyncio.ensure_future(server.serve_forever()) db_monitor_task = asyncio.ensure_future( - run_function_in_process(create_and_monitor_job_in_db, db_config, wildcard_query, + run_function_in_process(create_and_monitor_job_in_db, db_config, results_cache, wildcard_query, begin_timestamp, end_timestamp, path_filter, host, port)) # Wait for the job to complete or an error to occur @@ -245,8 +246,9 @@ def main(argv): logger.error("Could not determine IP of local machine.") return -1 - asyncio.run(do_search(clp_config.database, parsed_args.wildcard_query, parsed_args.begin_time, - parsed_args.end_time, parsed_args.file_path, host_ip)) + asyncio.run(do_search(clp_config.database, clp_config.results_cache, parsed_args.wildcard_query, + parsed_args.begin_time, parsed_args.end_time, parsed_args.file_path, + host_ip)) return 0 diff --git a/components/clp-package-utils/pyproject.toml b/components/clp-package-utils/pyproject.toml index 7a010ad22..319c55f14 100644 --- a/components/clp-package-utils/pyproject.toml +++ b/components/clp-package-utils/pyproject.toml @@ -8,6 +8,7 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.8 || ^3.10" msgpack = "^1.0.7" +pymongo = "^4.6.1" PyYAML = "^6.0.1" zstandard = "~0.22" diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index daa8f887e..e25f15819 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -104,6 +104,7 @@ class Scheduler(BaseModel): class ResultsCache(BaseModel): host: str = 'localhost' port: int = 27017 + db_name: str = 'clp-search' @validator('host') def validate_host(cls, field): @@ -111,6 +112,9 @@ def validate_host(cls, field): raise ValueError(f'{RESULTS_CACHE_COMPONENT_NAME}.host cannot be empty.') return field + def get_uri(self): + return f"mongodb://{self.host}:{self.port}/{self.db_name}" + class Queue(BaseModel): host: str = 'localhost' diff --git a/components/core/.clang-format b/components/core/.clang-format index fbaf8f62e..5ec180847 100644 --- a/components/core/.clang-format +++ b/components/core/.clang-format @@ -72,8 +72,8 @@ IncludeCategories: # NOTE: A header is grouped by first matching regex # Library headers. Update when adding new libraries. # NOTE: clang-format retains leading white-space on a line in violation of the YAML spec. - - Regex: "<(absl|antlr4|archive|boost|catch2|date|fmt|json|log_surgeon|mariadb|simdjson|spdlog\ -|sqlite3|string_utils|yaml-cpp|zstd)" + - Regex: "<(absl|antlr4|archive|boost|catch2|date|fmt|json|log_surgeon|mariadb|mongocxx|simdjson\ +|spdlog|sqlite3|string_utils|yaml-cpp|zstd)" Priority: 3 # C system headers - Regex: "^<.+\\.h>" diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 9007f9328..a892003e5 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -143,6 +143,20 @@ else() message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for MariaDBClient") endif() +# Find and setup mongocxx +if(CLP_USE_STATIC_LIBS) + set(MONGOCXX_TARGET mongo::mongocxx_static) +else() + set(MONGOCXX_TARGET mongo::mongocxx_shared) +endif() + +find_package(mongocxx REQUIRED) +if(mongocxx_FOUND) + message(STATUS "Found mongocxx ${mongocxx_VERSION}") +else() + message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for mongocxx") +endif() + # Find and setup msgpack find_package(msgpack-cxx 6.0.0 REQUIRED) if(msgpack-cxx_FOUND) diff --git a/components/core/src/clp/clo/CMakeLists.txt b/components/core/src/clp/clo/CMakeLists.txt index dfd717286..7ac8a36ed 100644 --- a/components/core/src/clp/clo/CMakeLists.txt +++ b/components/core/src/clp/clo/CMakeLists.txt @@ -110,6 +110,8 @@ set( CommandLineArguments.hpp ControllerMonitoringThread.cpp ControllerMonitoringThread.hpp + ResultsCacheClient.cpp + ResultsCacheClient.hpp ) add_executable(clo ${CLO_SOURCES}) @@ -120,7 +122,7 @@ target_link_libraries(clo Boost::filesystem Boost::iostreams Boost::program_options fmt::fmt log_surgeon::log_surgeon - msgpack-cxx + ${MONGOCXX_TARGET} spdlog::spdlog ${sqlite_LIBRARY_DEPENDENCIES} ${STD_FS_LIBS} diff --git a/components/core/src/clp/clo/CommandLineArguments.cpp b/components/core/src/clp/clo/CommandLineArguments.cpp index 36f9556c1..7a5d98a29 100644 --- a/components/core/src/clp/clo/CommandLineArguments.cpp +++ b/components/core/src/clp/clo/CommandLineArguments.cpp @@ -80,10 +80,18 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { "Ignore case distinctions in both WILDCARD STRING and the input files" ); + po::options_description options_batch_control("Batch Controls"); + options_batch_control.add_options()( + "batch-size,b", + po::value(&m_batch_size)->value_name("SIZE")->default_value(m_batch_size), + "Batch size" + ); + // Define visible options po::options_description visible_options; visible_options.add(options_general); visible_options.add(options_match_control); + visible_options.add(options_batch_control); // Define hidden positional options (not shown in Boost's program options help message) po::options_description hidden_positional_options; @@ -94,6 +102,12 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { )( "search-controller-port", po::value(&m_search_controller_port) + )( + "mongodb-uri", + po::value(&m_mongodb_uri) + )( + "mongodb-collection", + po::value(&m_mongodb_collection) )( "archive-path", po::value(&m_archive_path) @@ -108,6 +122,8 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { po::positional_options_description positional_options_description; positional_options_description.add("search-controller-host", 1); positional_options_description.add("search-controller-port", 1); + positional_options_description.add("mongodb-uri", 1); + positional_options_description.add("mongodb-collection", 1); positional_options_description.add("archive-path", 1); positional_options_description.add("wildcard-string", 1); positional_options_description.add("file-path", 1); @@ -116,6 +132,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { po::options_description all_options; all_options.add(options_general); all_options.add(options_match_control); + all_options.add(options_batch_control); all_options.add(hidden_positional_options); // Parse options @@ -159,10 +176,13 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { cerr << endl; cerr << "Examples:" << endl; - cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to the controller)" - R"( at localhost:5555)" + cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to )" + R"(mongodb://127.0.0.1:27017/test "result" collection )" + R"(and use localhost:5555 as the search controller)" << endl; - cerr << " " << get_program_name() << R"( localhost 5555 ARCHIVE_PATH " ERROR ")" + cerr << " " << get_program_name() + << R"(localhost 5555 mongodb://127.0.0.1:27017/test result )" + R"(ARCHIVE_PATH " ERROR ")" << endl; cerr << endl; @@ -188,6 +208,16 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { throw invalid_argument("SEARCH_CONTROLLER_PORT not specified or empty."); } + // Validate mongodb uri was specified + if (m_mongodb_uri.empty()) { + throw invalid_argument("MONGODB_URI not specified or empty."); + } + + // Validate mongodb collection was specified + if (m_mongodb_collection.empty()) { + throw invalid_argument("MONGODB_COLLECTION not specified or empty."); + } + // Validate archive path was specified if (m_archive_path.empty()) { throw invalid_argument("ARCHIVE_PATH not specified or empty."); @@ -245,6 +275,11 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { ); } } + + // Validate batch size + if (m_batch_size == 0) { + throw invalid_argument("Batch size cannot be 0."); + } } catch (exception& e) { SPDLOG_ERROR("{}", e.what()); print_basic_usage(); @@ -258,6 +293,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { void CommandLineArguments::print_basic_usage() const { cerr << "Usage: " << get_program_name() << " [OPTIONS] SEARCH_CONTROLLER_HOST SEARCH_CONTROLLER_PORT " + "MONGODB_URI MONGODB_COLLECTION " << R"(ARCHIVE_PATH "WILDCARD STRING" [FILE])" << endl; } } // namespace clp::clo diff --git a/components/core/src/clp/clo/CommandLineArguments.hpp b/components/core/src/clp/clo/CommandLineArguments.hpp index cfa8180a6..82890f3e2 100644 --- a/components/core/src/clp/clo/CommandLineArguments.hpp +++ b/components/core/src/clp/clo/CommandLineArguments.hpp @@ -15,6 +15,7 @@ class CommandLineArguments : public CommandLineArgumentsBase { // Constructors explicit CommandLineArguments(std::string const& program_name) : CommandLineArgumentsBase(program_name), + m_batch_size(1000), m_ignore_case(false), m_search_begin_ts(cEpochTimeMin), m_search_end_ts(cEpochTimeMax) {} @@ -26,6 +27,12 @@ class CommandLineArguments : public CommandLineArgumentsBase { std::string const& get_search_controller_port() const { return m_search_controller_port; } + std::string const& get_mongodb_uri() const { return m_mongodb_uri; } + + std::string const& get_mongodb_collection() const { return m_mongodb_collection; } + + uint64_t get_batch_size() const { return m_batch_size; } + std::string const& get_archive_path() const { return m_archive_path; } bool ignore_case() const { return m_ignore_case; } @@ -45,6 +52,9 @@ class CommandLineArguments : public CommandLineArgumentsBase { // Variables std::string m_search_controller_host; std::string m_search_controller_port; + std::string m_mongodb_uri; + std::string m_mongodb_collection; + uint64_t m_batch_size; std::string m_archive_path; bool m_ignore_case; std::string m_search_string; diff --git a/components/core/src/clp/clo/ResultsCacheClient.cpp b/components/core/src/clp/clo/ResultsCacheClient.cpp new file mode 100644 index 000000000..e56399e1d --- /dev/null +++ b/components/core/src/clp/clo/ResultsCacheClient.cpp @@ -0,0 +1,52 @@ +#include "ResultsCacheClient.hpp" + +namespace clp::clo { +ResultsCacheClient::ResultsCacheClient( + std::string const& uri, + std::string const& collection, + uint64_t batch_size +) + : m_batch_size(batch_size) { + try { + auto mongo_uri = mongocxx::uri(uri); + m_client = mongocxx::client(mongo_uri); + m_collection = m_client[mongo_uri.database()][collection]; + } catch (mongocxx::exception const& e) { + throw OperationFailed(ErrorCode::ErrorCode_BadParam_DB_URI, __FILE__, __LINE__); + } +} + +void ResultsCacheClient::flush() { + try { + if (false == m_results.empty()) { + m_collection.insert_many(m_results); + m_results.clear(); + } + } catch (mongocxx::exception const& e) { + throw OperationFailed(ErrorCode::ErrorCode_Failure_DB_Bulk_Write, __FILE__, __LINE__); + } +} + +void ResultsCacheClient::add_result( + std::string const& original_path, + std::string const& message, + epochtime_t timestamp +) { + try { + auto document = bsoncxx::builder::basic::make_document( + bsoncxx::builder::basic::kvp("original_path", original_path), + bsoncxx::builder::basic::kvp("message", message), + bsoncxx::builder::basic::kvp("timestamp", timestamp) + ); + + m_results.push_back(std::move(document)); + + if (m_results.size() >= m_batch_size) { + m_collection.insert_many(m_results); + m_results.clear(); + } + } catch (mongocxx::exception const& e) { + throw OperationFailed(ErrorCode::ErrorCode_Failure_DB_Bulk_Write, __FILE__, __LINE__); + } +} +} // namespace clp::clo diff --git a/components/core/src/clp/clo/ResultsCacheClient.hpp b/components/core/src/clp/clo/ResultsCacheClient.hpp new file mode 100644 index 000000000..7c0d03e25 --- /dev/null +++ b/components/core/src/clp/clo/ResultsCacheClient.hpp @@ -0,0 +1,57 @@ +#ifndef CLP_CLO_MONGODBCLIENT_HPP +#define CLP_CLO_MONGODBCLIENT_HPP + +#include + +#include +#include +#include +#include + +#include "../Defs.h" +#include "../TraceableException.hpp" + +namespace clp::clo { +/** + * Class encapsulating a MongoDB client used to send query results to the results cache. + */ +class ResultsCacheClient { +public: + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + + // Methods + char const* what() const noexcept override { return "ResultsCacheClient operation failed"; } + }; + + // Constructors + ResultsCacheClient(std::string const& uri, std::string const& collection, uint64_t batch_size); + + // Methods + /** + * Adds a result to the batch. + * @param original_path The original path of the log event. + * @param message The content of the log event. + * @param timestamp The timestamp of the log event. + */ + void + add_result(std::string const& original_path, std::string const& message, epochtime_t timestamp); + + /** + * Flushes the batch. + */ + void flush(); + +private: + mongocxx::client m_client; + mongocxx::collection m_collection; + std::vector m_results; + uint64_t m_batch_size; +}; +} // namespace clp::clo + +#endif // CLP_CLO_MONGODBCLIENT_HPP diff --git a/components/core/src/clp/clo/clo.cpp b/components/core/src/clp/clo/clo.cpp index f2e4074f9..fdcfd2fce 100644 --- a/components/core/src/clp/clo/clo.cpp +++ b/components/core/src/clp/clo/clo.cpp @@ -2,9 +2,9 @@ #include #include -#include #include +#include #include #include "../Defs.h" @@ -16,8 +16,10 @@ #include "../Utils.hpp" #include "CommandLineArguments.hpp" #include "ControllerMonitoringThread.hpp" +#include "ResultsCacheClient.hpp" using clp::clo::CommandLineArguments; +using clp::clo::ResultsCacheClient; using clp::CommandLineArgumentsBase; using clp::epochtime_t; using clp::ErrorCode; @@ -55,27 +57,13 @@ enum class SearchFilesResult { */ static int connect_to_search_controller(string const& controller_host, string const& controller_port); -/** - * Sends the search result to the search controller - * @param orig_file_path - * @param compressed_msg - * @param decompressed_msg - * @param controller_socket_fd - * @return Same as networking::try_send - */ -static ErrorCode send_result( - string const& orig_file_path, - Message const& compressed_msg, - string const& decompressed_msg, - int controller_socket_fd -); /** * Searches all files referenced by a given database cursor * @param query * @param archive * @param file_metadata_ix * @param query_cancelled - * @param controller_socket_fd + * @param results_cache_client * @return SearchFilesResult::OpenFailure on failure to open a compressed file * @return SearchFilesResult::ResultSendFailure on failure to send a result * @return SearchFilesResult::Success otherwise @@ -85,21 +73,21 @@ static SearchFilesResult search_files( Archive& archive, MetadataDB::FileIterator& file_metadata_ix, std::atomic_bool const& query_cancelled, - int controller_socket_fd + ResultsCacheClient& results_cache_client ); /** * Searches an archive with the given path * @param command_line_args * @param archive_path * @param query_cancelled - * @param controller_socket_fd + * @param results_cache_client * @return true on success, false otherwise */ static bool search_archive( CommandLineArguments const& command_line_args, boost::filesystem::path const& archive_path, std::atomic_bool const& query_cancelled, - int controller_socket_fd + ResultsCacheClient& results_cache_client ); static int @@ -151,28 +139,12 @@ connect_to_search_controller(string const& controller_host, string const& contro return controller_socket_fd; } -static ErrorCode send_result( - string const& orig_file_path, - Message const& compressed_msg, - string const& decompressed_msg, - int controller_socket_fd -) { - msgpack::type::tuple src( - orig_file_path, - compressed_msg.get_ts_in_milli(), - decompressed_msg - ); - msgpack::sbuffer m; - msgpack::pack(m, src); - return clp::networking::try_send(controller_socket_fd, m.data(), m.size()); -} - static SearchFilesResult search_files( Query& query, Archive& archive, MetadataDB::FileIterator& file_metadata_ix, std::atomic_bool const& query_cancelled, - int controller_socket_fd + ResultsCacheClient& results_cache_client ) { SearchFilesResult result = SearchFilesResult::Success; @@ -205,20 +177,11 @@ static SearchFilesResult search_files( decompressed_message )) { - error_code = send_result( + results_cache_client.add_result( compressed_file.get_orig_path(), - compressed_message, decompressed_message, - controller_socket_fd + compressed_message.get_ts_in_milli() ); - if (ErrorCode_Success != error_code) { - result = SearchFilesResult::ResultSendFailure; - break; - } - } - if (SearchFilesResult::ResultSendFailure == result) { - // Stop search now since results aren't reaching the controller - break; } archive.close_file(compressed_file); @@ -231,7 +194,7 @@ static bool search_archive( CommandLineArguments const& command_line_args, boost::filesystem::path const& archive_path, std::atomic_bool const& query_cancelled, - int controller_socket_fd + ResultsCacheClient& results_cache_client ) { if (false == boost::filesystem::exists(archive_path)) { SPDLOG_ERROR("Archive '{}' does not exist.", archive_path.c_str()); @@ -309,13 +272,14 @@ static bool search_archive( archive_reader, file_metadata_ix, query_cancelled, - controller_socket_fd + results_cache_client ); if (SearchFilesResult::ResultSendFailure == result) { // Stop search now since results aren't reaching the controller break; } } + results_cache_client.flush(); file_metadata_ix_ptr.reset(nullptr); archive_reader.close(); @@ -356,6 +320,13 @@ int main(int argc, char const* argv[]) { return -1; } + mongocxx::instance mongocxx_instance{}; + ResultsCacheClient results_cache_client( + command_line_args.get_mongodb_uri(), + command_line_args.get_mongodb_collection(), + command_line_args.get_batch_size() + ); + auto const archive_path = boost::filesystem::path(command_line_args.get_archive_path()); clp::clo::ControllerMonitoringThread controller_monitoring_thread(controller_socket_fd); @@ -368,7 +339,7 @@ int main(int argc, char const* argv[]) { command_line_args, archive_path, controller_monitoring_thread.get_query_cancelled(), - controller_socket_fd + results_cache_client )) { return_value = -1; diff --git a/components/job-orchestration/job_orchestration/executor/search_task.py b/components/job-orchestration/job_orchestration/executor/search_task.py index 024a05aa7..cf3d77237 100644 --- a/components/job-orchestration/job_orchestration/executor/search_task.py +++ b/components/job-orchestration/job_orchestration/executor/search_task.py @@ -15,7 +15,8 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_dir: pathlib.Path, - logs_dir: pathlib.Path, search_config: SearchConfig, archive_id: str): + logs_dir: pathlib.Path, search_config: SearchConfig, archive_id: str, + results_cache_uri: str): """ Searches the given archive for the given wildcard query @@ -26,6 +27,7 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_di :param logs_dir: :param search_config: :param archive_id: + :param results_cache_uri: :return: tuple -- (whether the search was successful, output messages) """ # Assemble search command @@ -33,6 +35,8 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_di str(clp_home / 'bin' / 'clo'), search_config.search_controller_host, str(search_config.search_controller_port), + results_cache_uri, + str(job_id), str(archive_output_dir / archive_id), search_config.wildcard_query ] @@ -73,7 +77,8 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_di @app.task() -def search(job_id: int, task_id: int, search_config_json: str, archive_id: str): +def search(job_id: int, task_id: int, search_config_json: str, archive_id: str, + results_cache_uri: str): clp_home = os.getenv('CLP_HOME') archive_output_dir = os.getenv('CLP_ARCHIVE_OUTPUT_DIR') logs_dir = os.getenv('CLP_LOGS_DIR') @@ -92,7 +97,8 @@ def search(job_id: int, task_id: int, search_config_json: str, archive_id: str): search_successful, worker_output = run_clo(job_id, task_id, pathlib.Path(clp_home), pathlib.Path(archive_output_dir), - pathlib.Path(logs_dir), search_config, archive_id) + pathlib.Path(logs_dir), search_config, archive_id, + results_cache_uri) if search_successful: task_update.status = TaskStatus.SUCCEEDED diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler.py b/components/job-orchestration/job_orchestration/scheduler/scheduler.py index fe1c7b61d..4d41610c2 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler.py @@ -12,7 +12,11 @@ import zstandard from pydantic import ValidationError -from clp_py_utils.clp_config import CLPConfig, Database +from clp_py_utils.clp_config import ( + CLPConfig, + Database, + ResultsCache, +) from clp_py_utils.core import read_yaml_config_file from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.compression_task import compress @@ -141,12 +145,15 @@ def schedule_compression_task(job: CompressionJob, task: CompressionTask, databa return compress.apply_async(args, task_id=str(task.id), queue=QueueName.COMPRESSION, priority=task.priority) -def schedule_search_task(job: SearchJob, task: SearchTask, dctx: zstandard.ZstdDecompressor): - args = (job.id, task.id, job.get_search_config_json_str(dctx), task.archive_id) +def schedule_search_task(job: SearchJob, task: SearchTask, results_cache: ResultsCache, + dctx: zstandard.ZstdDecompressor): + args = (job.id, task.id, job.get_search_config_json_str(dctx), task.archive_id, + results_cache.get_uri()) return search.apply_async(args, task_id=str(task.id), queue=QueueName.SEARCH, priority=task.priority) -def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database): +def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database, + results_cache: ResultsCache): """ For all task with SUBMITTED status, push them to task queue to be processed, if finished, update them """ @@ -183,10 +190,11 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database) num_tasks_completed=task_row['num_tasks_completed'], search_config=task_row['search_config'], ) - update_search_job_metadata(db_cursor, job_id, dict(start_time=now.strftime('%Y-%m-%d %H:%M:%S'))) + update_search_job_metadata(db_cursor, job_id, + dict(start_time=now.strftime('%Y-%m-%d %H:%M:%S'))) id_to_search_job[search_job.id] = search_job - celery_task = schedule_search_task(search_job, search_task, dctx) + celery_task = schedule_search_task(search_job, search_task, results_cache, dctx) update_search_task_metadata(db_cursor, search_task.id, dict( status=TaskStatus.SCHEDULED, @@ -419,7 +427,7 @@ def callback(ch, method, properties, body): task_update = TaskUpdate.parse_raw(body) if TaskStatus.FAILED == task_update.status: task_update = TaskFailureUpdate.parse_raw(body) - elif TaskUpdateType.COMPRESSION == task_update.type and\ + elif TaskUpdateType.COMPRESSION == task_update.type and \ TaskStatus.SUCCEEDED == task_update.status: task_update = CompressionTaskSuccessUpdate.parse_raw(body) except ValidationError as err: @@ -483,7 +491,8 @@ def main(argv): # Start Job Processing Loop with closing(sql_adapter.create_connection(True)) as db_conn, \ closing(db_conn.cursor(dictionary=True)) as db_cursor: - search_and_schedule_new_tasks(db_conn, db_cursor, sql_adapter.database_config) + search_and_schedule_new_tasks(db_conn, db_cursor, sql_adapter.database_config, + clp_config.results_cache) update_completed_jobs(db_cursor, 'compression') update_completed_jobs(db_cursor, 'search') db_conn.commit() diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index 3c0b94784..1882f8270 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -23,6 +23,7 @@ #results_cache: # host: "localhost" # port: 27017 +# db_name: "clp-search" # ## Where archives should be output to #archive_output: