Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send search results through the results cache in the CLP package #223

Merged
merged 31 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6a247c1
add MongoDB dependency in CMakeLists.txt
wraymo Jan 9, 2024
ebb3493
change command line arguments
wraymo Jan 10, 2024
4a4a800
delete ControllerMonitoringThread files
wraymo Jan 10, 2024
ad66f63
add mongocxx to .clang-format
wraymo Jan 10, 2024
202fae8
complete mongodb support for source code
wraymo Jan 10, 2024
5e89ff5
remove redundant target
wraymo Jan 10, 2024
1bd26cf
change search script
wraymo Jan 10, 2024
4aa1d12
complete support for sending results to mongodb
wraymo Jan 10, 2024
364d032
rename url and add db_name to clp_config.yml
wraymo Jan 10, 2024
5b2b7de
change usage message
wraymo Jan 11, 2024
2ef6f7e
change helper message
wraymo Jan 11, 2024
3423ca8
add flush
wraymo Jan 11, 2024
405cef2
change results_cache get_uri()
wraymo Jan 11, 2024
6a7ed11
fix lint error
wraymo Jan 11, 2024
006168e
fix a bug
wraymo Jan 11, 2024
d7f6ecf
add controller monitor thread back to handle cancellation
wraymo Jan 15, 2024
a352e9e
add new line deliemeter
wraymo Jan 15, 2024
b3f47c5
add search controller back
wraymo Jan 15, 2024
35af38a
fix a bug
wraymo Jan 16, 2024
8c82a53
fix a bug
wraymo Jan 16, 2024
b6ddeed
remove unused header file
wraymo Jan 16, 2024
0ab8cb0
fix some issues
wraymo Jan 17, 2024
b1882a1
fix a lint problem
wraymo Jan 17, 2024
72403ae
fix other issues
wraymo Jan 18, 2024
7e1378c
fix a package bug
wraymo Jan 18, 2024
ee753a1
fix some issues
wraymo Jan 18, 2024
033c7ff
fix a lint error
wraymo Jan 18, 2024
b8da5ca
fix a grammar error
wraymo Jan 18, 2024
85f25e2
add an exception handler for ResultsCacheClient constructor
wraymo Jan 18, 2024
dc8dcc4
print original_path and decompressed message instead of a mongodb record
wraymo Jan 18, 2024
e4f4a08
print without newline
wraymo Jan 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@
import socket
import sys
import time
from asyncio import StreamReader, StreamWriter
from contextlib import closing

import msgpack
import pymongo
import zstandard

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
validate_and_load_config_file,
get_clp_home
)
from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, Database
from clp_py_utils.clp_config import (
CLP_METADATA_TABLE_PREFIX,
Database,
ResultsCache
)
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.job_config import SearchConfig
from job_orchestration.scheduler.constants import JobStatus
Expand Down Expand Up @@ -69,10 +73,10 @@ def process_error_callback(err):
pool.close()


def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,
begin_timestamp: int | None, end_timestamp: int | None,
path_filter: str, search_controller_host: str,
search_controller_port: int):
def create_and_monitor_job_in_db(db_config: Database, results_cache: ResultsCache,
wildcard_query: str, begin_timestamp: int | None,
end_timestamp: int | None, path_filter: str,
search_controller_host: str, search_controller_port: int):
search_config = SearchConfig(
search_controller_host=search_controller_host,
search_controller_port=search_controller_port,
Expand All @@ -84,14 +88,14 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,

sql_adapter = SQL_Adapter(db_config)
zstd_cctx = zstandard.ZstdCompressor(level=3)
with closing(sql_adapter.create_connection(True)) as db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor:
with closing(sql_adapter.create_connection(True)) as \
db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor:
# Create job
db_cursor.execute(f"INSERT INTO `search_jobs` (`search_config`) VALUES (%s)",
(zstd_cctx.compress(msgpack.packb(search_config.dict())),))
db_conn.commit()
job_id = db_cursor.lastrowid

# Create a task for each archive, in batches
next_pagination_id = 0
pagination_limit = 64
num_tasks_added = 0
Expand Down Expand Up @@ -147,32 +151,29 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,
job_complete = True
db_conn.commit()

time.sleep(1)
time.sleep(0.5)

with pymongo.MongoClient(results_cache.get_uri()) as client:
search_results_collection = client[results_cache.db_name][str(job_id)]
for document in search_results_collection.find():
print(f"{document['original_path']}: {document['message']}", end='')


async def worker_connection_handler(reader: StreamReader, writer: StreamWriter):
async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
unpacker = msgpack.Unpacker()
while True:
# Read some data from the worker and feed it to msgpack
buf = await reader.read(1024)
if b'' == buf:
# Worker closed
return
unpacker.feed(buf)

# Print out any messages we can decode
for unpacked in unpacker:
print(f"{unpacked[0]}: {unpacked[2]}", end='')
buf = await reader.read(1024)
if b'' == buf:
# Worker closed
return
except asyncio.CancelledError:
return
finally:
writer.close()


async def do_search(db_config: Database, wildcard_query: str, begin_timestamp: int | None,
end_timestamp: int | None, path_filter: str, host: str):
# Start server to receive and print results
async def do_search(db_config: Database, results_cache: ResultsCache, wildcard_query: str,
begin_timestamp: int | None, end_timestamp: int | None, path_filter: str, host: str):
# Start a server
try:
server = await asyncio.start_server(client_connected_cb=worker_connection_handler, host=host, port=0,
family=socket.AF_INET)
Expand All @@ -184,7 +185,7 @@ async def do_search(db_config: Database, wildcard_query: str, begin_timestamp: i
server_task = asyncio.ensure_future(server.serve_forever())

db_monitor_task = asyncio.ensure_future(
run_function_in_process(create_and_monitor_job_in_db, db_config, wildcard_query,
run_function_in_process(create_and_monitor_job_in_db, db_config, results_cache, wildcard_query,
begin_timestamp, end_timestamp, path_filter, host, port))

# Wait for the job to complete or an error to occur
Expand Down Expand Up @@ -245,8 +246,9 @@ def main(argv):
logger.error("Could not determine IP of local machine.")
return -1

asyncio.run(do_search(clp_config.database, parsed_args.wildcard_query, parsed_args.begin_time,
parsed_args.end_time, parsed_args.file_path, host_ip))
asyncio.run(do_search(clp_config.database, clp_config.results_cache, parsed_args.wildcard_query,
parsed_args.begin_time, parsed_args.end_time, parsed_args.file_path,
host_ip))

return 0

Expand Down
1 change: 1 addition & 0 deletions components/clp-package-utils/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.8 || ^3.10"
msgpack = "^1.0.7"
pymongo = "^4.6.1"
PyYAML = "^6.0.1"
zstandard = "~0.22"

Expand Down
4 changes: 4 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,17 @@ class Scheduler(BaseModel):
class ResultsCache(BaseModel):
host: str = 'localhost'
port: int = 27017
db_name: str = 'clp-search'

@validator('host')
def validate_host(cls, field):
if '' == field:
raise ValueError(f'{RESULTS_CACHE_COMPONENT_NAME}.host cannot be empty.')
return field

def get_uri(self):
return f"mongodb://{self.host}:{self.port}/{self.db_name}"


class Queue(BaseModel):
host: str = 'localhost'
Expand Down
4 changes: 2 additions & 2 deletions components/core/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ IncludeCategories:
# NOTE: A header is grouped by first matching regex
# Library headers. Update when adding new libraries.
# NOTE: clang-format retains leading white-space on a line in violation of the YAML spec.
- Regex: "<(absl|antlr4|archive|boost|catch2|date|fmt|json|log_surgeon|mariadb|simdjson|spdlog\
|sqlite3|string_utils|yaml-cpp|zstd)"
- Regex: "<(absl|antlr4|archive|boost|catch2|date|fmt|json|log_surgeon|mariadb|mongocxx|simdjson\
|spdlog|sqlite3|string_utils|yaml-cpp|zstd)"
Priority: 3
# C system headers
- Regex: "^<.+\\.h>"
Expand Down
14 changes: 14 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for MariaDBClient")
endif()

# Find and setup mongocxx
if(CLP_USE_STATIC_LIBS)
set(MONGOCXX_TARGET mongo::mongocxx_static)
else()
set(MONGOCXX_TARGET mongo::mongocxx_shared)
endif()

find_package(mongocxx REQUIRED)
if(mongocxx_FOUND)
message(STATUS "Found mongocxx ${mongocxx_VERSION}")
else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for mongocxx")
endif()

# Find and setup msgpack
find_package(msgpack-cxx 6.0.0 REQUIRED)
if(msgpack-cxx_FOUND)
Expand Down
4 changes: 3 additions & 1 deletion components/core/src/clp/clo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ set(
CommandLineArguments.hpp
ControllerMonitoringThread.cpp
ControllerMonitoringThread.hpp
ResultsCacheClient.cpp
ResultsCacheClient.hpp
)

add_executable(clo ${CLO_SOURCES})
Expand All @@ -120,7 +122,7 @@ target_link_libraries(clo
Boost::filesystem Boost::iostreams Boost::program_options
fmt::fmt
log_surgeon::log_surgeon
msgpack-cxx
${MONGOCXX_TARGET}
spdlog::spdlog
${sqlite_LIBRARY_DEPENDENCIES}
${STD_FS_LIBS}
Expand Down
42 changes: 39 additions & 3 deletions components/core/src/clp/clo/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,18 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
"Ignore case distinctions in both WILDCARD STRING and the input files"
);

po::options_description options_batch_control("Batch Controls");
options_batch_control.add_options()(
"batch-size,b",
po::value<uint64_t>(&m_batch_size)->value_name("SIZE")->default_value(m_batch_size),
"Batch size"
);

// Define visible options
po::options_description visible_options;
visible_options.add(options_general);
visible_options.add(options_match_control);
visible_options.add(options_batch_control);

// Define hidden positional options (not shown in Boost's program options help message)
po::options_description hidden_positional_options;
Expand All @@ -94,6 +102,12 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
)(
"search-controller-port",
po::value<string>(&m_search_controller_port)
)(
"mongodb-uri",
po::value<string>(&m_mongodb_uri)
)(
"mongodb-collection",
po::value<string>(&m_mongodb_collection)
)(
"archive-path",
po::value<string>(&m_archive_path)
Expand All @@ -108,6 +122,8 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
po::positional_options_description positional_options_description;
positional_options_description.add("search-controller-host", 1);
positional_options_description.add("search-controller-port", 1);
positional_options_description.add("mongodb-uri", 1);
positional_options_description.add("mongodb-collection", 1);
positional_options_description.add("archive-path", 1);
positional_options_description.add("wildcard-string", 1);
positional_options_description.add("file-path", 1);
Expand All @@ -116,6 +132,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
po::options_description all_options;
all_options.add(options_general);
all_options.add(options_match_control);
all_options.add(options_batch_control);
all_options.add(hidden_positional_options);

// Parse options
Expand Down Expand Up @@ -159,10 +176,13 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
cerr << endl;

cerr << "Examples:" << endl;
cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to the controller)"
R"( at localhost:5555)"
cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to )"
R"(mongodb://127.0.0.1:27017/test "result" collection )"
R"(and use localhost:5555 as the search controller)"
<< endl;
cerr << " " << get_program_name() << R"( localhost 5555 ARCHIVE_PATH " ERROR ")"
cerr << " " << get_program_name()
<< R"(localhost 5555 mongodb://127.0.0.1:27017/test result )"
R"(ARCHIVE_PATH " ERROR ")"
<< endl;
cerr << endl;

Expand All @@ -188,6 +208,16 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
throw invalid_argument("SEARCH_CONTROLLER_PORT not specified or empty.");
}

// Validate mongodb uri was specified
if (m_mongodb_uri.empty()) {
throw invalid_argument("MONGODB_URI not specified or empty.");
}

// Validate mongodb collection was specified
if (m_mongodb_collection.empty()) {
throw invalid_argument("MONGODB_COLLECTION not specified or empty.");
}

// Validate archive path was specified
if (m_archive_path.empty()) {
throw invalid_argument("ARCHIVE_PATH not specified or empty.");
Expand Down Expand Up @@ -245,6 +275,11 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
);
}
}

// Validate batch size
if (m_batch_size == 0) {
throw invalid_argument("Batch size cannot be 0.");
}
} catch (exception& e) {
SPDLOG_ERROR("{}", e.what());
print_basic_usage();
Expand All @@ -258,6 +293,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
void CommandLineArguments::print_basic_usage() const {
cerr << "Usage: " << get_program_name()
<< " [OPTIONS] SEARCH_CONTROLLER_HOST SEARCH_CONTROLLER_PORT "
"MONGODB_URI MONGODB_COLLECTION "
<< R"(ARCHIVE_PATH "WILDCARD STRING" [FILE])" << endl;
}
} // namespace clp::clo
10 changes: 10 additions & 0 deletions components/core/src/clp/clo/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
// Constructors
explicit CommandLineArguments(std::string const& program_name)
: CommandLineArgumentsBase(program_name),
m_batch_size(1000),
m_ignore_case(false),
m_search_begin_ts(cEpochTimeMin),
m_search_end_ts(cEpochTimeMax) {}
Expand All @@ -26,6 +27,12 @@ class CommandLineArguments : public CommandLineArgumentsBase {

std::string const& get_search_controller_port() const { return m_search_controller_port; }

std::string const& get_mongodb_uri() const { return m_mongodb_uri; }

std::string const& get_mongodb_collection() const { return m_mongodb_collection; }

uint64_t get_batch_size() const { return m_batch_size; }

std::string const& get_archive_path() const { return m_archive_path; }

bool ignore_case() const { return m_ignore_case; }
Expand All @@ -45,6 +52,9 @@ class CommandLineArguments : public CommandLineArgumentsBase {
// Variables
std::string m_search_controller_host;
std::string m_search_controller_port;
std::string m_mongodb_uri;
std::string m_mongodb_collection;
uint64_t m_batch_size;
std::string m_archive_path;
bool m_ignore_case;
std::string m_search_string;
Expand Down
52 changes: 52 additions & 0 deletions components/core/src/clp/clo/ResultsCacheClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include "ResultsCacheClient.hpp"

namespace clp::clo {
ResultsCacheClient::ResultsCacheClient(
std::string const& uri,
std::string const& collection,
uint64_t batch_size
)
: m_batch_size(batch_size) {
try {
auto mongo_uri = mongocxx::uri(uri);
m_client = mongocxx::client(mongo_uri);
m_collection = m_client[mongo_uri.database()][collection];
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCode::ErrorCode_BadParam_DB_URI, __FILE__, __LINE__);
}
}

void ResultsCacheClient::flush() {
try {
if (false == m_results.empty()) {
m_collection.insert_many(m_results);
m_results.clear();
}
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCode::ErrorCode_Failure_DB_Bulk_Write, __FILE__, __LINE__);
}
}

void ResultsCacheClient::add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
) {
try {
auto document = bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp("original_path", original_path),
bsoncxx::builder::basic::kvp("message", message),
bsoncxx::builder::basic::kvp("timestamp", timestamp)
);

m_results.push_back(std::move(document));

if (m_results.size() >= m_batch_size) {
m_collection.insert_many(m_results);
m_results.clear();
}
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCode::ErrorCode_Failure_DB_Bulk_Write, __FILE__, __LINE__);
}
}
} // namespace clp::clo
Loading
Loading