Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send search results through the results cache in the CLP package #223

Merged
merged 31 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6a247c1
add MongoDB dependency in CMakeLists.txt
wraymo Jan 9, 2024
ebb3493
change command line arguments
wraymo Jan 10, 2024
4a4a800
delete ControllerMonitoringThread files
wraymo Jan 10, 2024
ad66f63
add mongocxx to .clang-format
wraymo Jan 10, 2024
202fae8
complete mongodb support for source code
wraymo Jan 10, 2024
5e89ff5
remove redundant target
wraymo Jan 10, 2024
1bd26cf
change search script
wraymo Jan 10, 2024
4aa1d12
complete support for sending results to mongodb
wraymo Jan 10, 2024
364d032
rename url and add db_name to clp_config.yml
wraymo Jan 10, 2024
5b2b7de
change usage message
wraymo Jan 11, 2024
2ef6f7e
change helper message
wraymo Jan 11, 2024
3423ca8
add flush
wraymo Jan 11, 2024
405cef2
change results_cache get_uri()
wraymo Jan 11, 2024
6a7ed11
fix lint error
wraymo Jan 11, 2024
006168e
fix a bug
wraymo Jan 11, 2024
d7f6ecf
add controller monitor thread back to handle cancellation
wraymo Jan 15, 2024
a352e9e
add new line deliemeter
wraymo Jan 15, 2024
b3f47c5
add search controller back
wraymo Jan 15, 2024
35af38a
fix a bug
wraymo Jan 16, 2024
8c82a53
fix a bug
wraymo Jan 16, 2024
b6ddeed
remove unused header file
wraymo Jan 16, 2024
0ab8cb0
fix some issues
wraymo Jan 17, 2024
b1882a1
fix a lint problem
wraymo Jan 17, 2024
72403ae
fix other issues
wraymo Jan 18, 2024
7e1378c
fix a package bug
wraymo Jan 18, 2024
ee753a1
fix some issues
wraymo Jan 18, 2024
033c7ff
fix a lint error
wraymo Jan 18, 2024
b8da5ca
fix a grammar error
wraymo Jan 18, 2024
85f25e2
add an exception handler for ResultsCacheClient constructor
wraymo Jan 18, 2024
dc8dcc4
print original_path and decompressed message instead of a mongodb record
wraymo Jan 18, 2024
e4f4a08
print without newline
wraymo Jan 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,27 @@
import asyncio
import datetime
import logging
import msgpack
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
import multiprocessing
import pathlib
import pymongo
import socket
import sys
import time
from asyncio import StreamReader, StreamWriter
import zstandard
from contextlib import closing

import msgpack
import zstandard

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
validate_and_load_config_file,
get_clp_home
)
from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, Database
from clp_py_utils.clp_config import (
CLP_METADATA_TABLE_PREFIX,
Database,
ResultsCache
)
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.job_config import SearchConfig
from job_orchestration.scheduler.constants import JobStatus
Expand Down Expand Up @@ -69,7 +73,7 @@ def process_error_callback(err):
pool.close()


def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,
def create_and_monitor_job_in_db(db_config: Database, results_cache: ResultsCache, wildcard_query: str,
begin_timestamp: int | None, end_timestamp: int | None,
path_filter: str, search_controller_host: str,
search_controller_port: int):
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -84,14 +88,14 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,

sql_adapter = SQL_Adapter(db_config)
zstd_cctx = zstandard.ZstdCompressor(level=3)
with closing(sql_adapter.create_connection(True)) as db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor:
with closing(sql_adapter.create_connection(True)) as \
db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor:
# Create job
db_cursor.execute(f"INSERT INTO `search_jobs` (`search_config`) VALUES (%s)",
(zstd_cctx.compress(msgpack.packb(search_config.dict())),))
db_conn.commit()
job_id = db_cursor.lastrowid

# Create a task for each archive, in batches
next_pagination_id = 0
pagination_limit = 64
num_tasks_added = 0
Expand Down Expand Up @@ -147,32 +151,29 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,
job_complete = True
db_conn.commit()

time.sleep(1)
time.sleep(0.5)

client = pymongo.MongoClient(results_cache.get_uri())
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
search_results_collection = client[results_cache.db_name][str(job_id)]
for document in search_results_collection.find():
print(document)


async def worker_connection_handler(reader: StreamReader, writer: StreamWriter):
async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
unpacker = msgpack.Unpacker()
while True:
# Read some data from the worker and feed it to msgpack
buf = await reader.read(1024)
if b'' == buf:
# Worker closed
return
unpacker.feed(buf)

# Print out any messages we can decode
for unpacked in unpacker:
print(f"{unpacked[0]}: {unpacked[2]}", end='')
buf = await reader.read(1024)
if b'' == buf:
# Worker closed
return
except asyncio.CancelledError:
return
finally:
writer.close()


async def do_search(db_config: Database, wildcard_query: str, begin_timestamp: int | None,
end_timestamp: int | None, path_filter: str, host: str):
# Start server to receive and print results
async def do_search(db_config: Database, results_cache: ResultsCache, wildcard_query: str,
begin_timestamp: int | None, end_timestamp: int | None, path_filter: str, host: str):
# Start a server
try:
server = await asyncio.start_server(client_connected_cb=worker_connection_handler, host=host, port=0,
family=socket.AF_INET)
Expand All @@ -184,7 +185,7 @@ async def do_search(db_config: Database, wildcard_query: str, begin_timestamp: i
server_task = asyncio.ensure_future(server.serve_forever())

db_monitor_task = asyncio.ensure_future(
run_function_in_process(create_and_monitor_job_in_db, db_config, wildcard_query,
run_function_in_process(create_and_monitor_job_in_db, db_config, results_cache, wildcard_query,
begin_timestamp, end_timestamp, path_filter, host, port))

# Wait for the job to complete or an error to occur
Expand Down Expand Up @@ -245,8 +246,8 @@ def main(argv):
logger.error("Could not determine IP of local machine.")
return -1

asyncio.run(do_search(clp_config.database, parsed_args.wildcard_query, parsed_args.begin_time,
parsed_args.end_time, parsed_args.file_path, host_ip))
asyncio.run(do_search(clp_config.database, clp_config.results_cache, parsed_args.wildcard_query,
parsed_args.begin_time, parsed_args.end_time, parsed_args.file_path, host_ip))
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved

return 0

Expand Down
1 change: 1 addition & 0 deletions components/clp-package-utils/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ readme = "README.md"

[tool.poetry.dependencies]
python = "^3.8 || ^3.10"
pymongo = "^4.6.1"
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
msgpack = "^1.0.7"
PyYAML = "^6.0.1"
zstandard = "~0.22"
Expand Down
4 changes: 4 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,17 @@ class Scheduler(BaseModel):
class ResultsCache(BaseModel):
host: str = 'localhost'
port: int = 27017
db_name: str = 'clp-search'

@validator('host')
def validate_host(cls, field):
if '' == field:
raise ValueError(f'{RESULTS_CACHE_COMPONENT_NAME}.host cannot be empty.')
return field

def get_uri(self):
return f"mongodb://{self.host}:{self.port}"
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved


class Queue(BaseModel):
host: str = 'localhost'
Expand Down
4 changes: 2 additions & 2 deletions components/core/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ IncludeCategories:
# NOTE: A header is grouped by first matching regex
# Library headers. Update when adding new libraries.
# NOTE: clang-format retains leading white-space on a line in violation of the YAML spec.
- Regex: "<(absl|antlr4|archive|boost|catch2|date|fmt|json|log_surgeon|mariadb|simdjson|spdlog\
|sqlite3|string_utils|yaml-cpp|zstd)"
- Regex: "<(absl|antlr4|archive|boost|catch2|date|fmt|json|log_surgeon|mariadb|mongocxx|simdjson\
|spdlog|sqlite3|string_utils|yaml-cpp|zstd)"
Priority: 3
# C system headers
- Regex: "^<.+\\.h>"
Expand Down
14 changes: 14 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for MariaDBClient")
endif()

# Find and setup mongocxx
if(CLP_USE_STATIC_LIBS)
set(MONGOCXX_TARGET mongo::mongocxx_static)
else()
set(MONGOCXX_TARGET mongo::mongocxx_shared)
endif()

find_package(mongocxx REQUIRED)
if(mongocxx_FOUND)
message(STATUS "Found mongocxx ${mongocxx_VERSION}")
else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for mongocxx")
endif()

# Find and setup msgpack
find_package(msgpack-cxx 6.0.0 REQUIRED)
if(msgpack-cxx_FOUND)
Expand Down
4 changes: 3 additions & 1 deletion components/core/src/clp/clo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ set(
CommandLineArguments.hpp
ControllerMonitoringThread.cpp
ControllerMonitoringThread.hpp
MongoDBClient.cpp
MongoDBClient.hpp
)

add_executable(clo ${CLO_SOURCES})
Expand All @@ -120,7 +122,7 @@ target_link_libraries(clo
Boost::filesystem Boost::iostreams Boost::program_options
fmt::fmt
log_surgeon::log_surgeon
msgpack-cxx
${MONGOCXX_TARGET}
spdlog::spdlog
${sqlite_LIBRARY_DEPENDENCIES}
${STD_FS_LIBS}
Expand Down
37 changes: 34 additions & 3 deletions components/core/src/clp/clo/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
)(
"search-controller-port",
po::value<string>(&m_search_controller_port)
)(
"mongodb-uri",
po::value<string>(&m_mongodb_uri)
)(
"mongodb-database",
po::value<string>(&m_mongodb_database)
)(
"mongodb-collection",
po::value<string>(&m_mongodb_collection)
)(
"archive-path",
po::value<string>(&m_archive_path)
Expand All @@ -108,6 +117,9 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
po::positional_options_description positional_options_description;
positional_options_description.add("search-controller-host", 1);
positional_options_description.add("search-controller-port", 1);
positional_options_description.add("mongodb-uri", 1);
positional_options_description.add("mongodb-database", 1);
positional_options_description.add("mongodb-collection", 1);
positional_options_description.add("archive-path", 1);
positional_options_description.add("wildcard-string", 1);
positional_options_description.add("file-path", 1);
Expand Down Expand Up @@ -159,10 +171,13 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
cerr << endl;

cerr << "Examples:" << endl;
cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to the controller)"
R"( at localhost:5555)"
cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to )"
R"(mongodb://127.0.0.1:27017 "test" database and "result" collection )"
R"(and use localhost:5555 as the search controller)"
<< endl;
cerr << " " << get_program_name() << R"( localhost 5555 ARCHIVE_PATH " ERROR ")"
cerr << " " << get_program_name()
<< R"(localhost 5555 mongodb://127.0.0.1:27017 )"
R"(test result ARCHIVE_PATH " ERROR ")"
<< endl;
cerr << endl;

Expand All @@ -188,6 +203,21 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
throw invalid_argument("SEARCH_CONTROLLER_PORT not specified or empty.");
}

// Validate mongodb uri was specified
if (m_mongodb_uri.empty()) {
throw invalid_argument("MONGODB_URI not specified or empty.");
}

// Validate mongodb database was specified
if (m_mongodb_database.empty()) {
throw invalid_argument("MONGODB_DATABASE not specified or empty.");
}

// Validate mongodb collection was specified
if (m_mongodb_collection.empty()) {
throw invalid_argument("MONGODB_COLLECTION not specified or empty.");
}

// Validate archive path was specified
if (m_archive_path.empty()) {
throw invalid_argument("ARCHIVE_PATH not specified or empty.");
Expand Down Expand Up @@ -258,6 +288,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
void CommandLineArguments::print_basic_usage() const {
cerr << "Usage: " << get_program_name()
<< " [OPTIONS] SEARCH_CONTROLLER_HOST SEARCH_CONTROLLER_PORT "
"MONGODB_URI MONGODB_DATABASE MONGODB_COLLECTION "
<< R"(ARCHIVE_PATH "WILDCARD STRING" [FILE])" << endl;
}
} // namespace clp::clo
9 changes: 9 additions & 0 deletions components/core/src/clp/clo/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class CommandLineArguments : public CommandLineArgumentsBase {

std::string const& get_search_controller_port() const { return m_search_controller_port; }

std::string const& get_mongodb_uri() const { return m_mongodb_uri; }

std::string const& get_mongodb_database() const { return m_mongodb_database; }

std::string const& get_mongodb_collection() const { return m_mongodb_collection; }

std::string const& get_archive_path() const { return m_archive_path; }

bool ignore_case() const { return m_ignore_case; }
Expand All @@ -45,6 +51,9 @@ class CommandLineArguments : public CommandLineArgumentsBase {
// Variables
std::string m_search_controller_host;
std::string m_search_controller_port;
std::string m_mongodb_uri;
std::string m_mongodb_database;
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
std::string m_mongodb_collection;
std::string m_archive_path;
bool m_ignore_case;
std::string m_search_string;
Expand Down
40 changes: 40 additions & 0 deletions components/core/src/clp/clo/MongoDBClient.cpp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're using mongocxx methods which can throw, we should probably catch them in this file and throw a TraceableException so that it's easier to catch the exception at the topmost caller. Otherwise, we have to maintain a catch branch (at the topmost layer) for all the mongocxx exceptions.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "MongoDBClient.hpp"

namespace clp::clo {
MongoDBClient::MongoDBClient(
std::string const& uri,
std::string const& db,
std::string const& collection,
uint64_t batch_size
)
: batch_size_(batch_size) {
client_ = std::move(mongocxx::client((mongocxx::uri(uri))));
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
collection_ = client_[db][collection];
}

void MongoDBClient::flush() {
if (!results_.empty()) {
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
collection_.insert_many(results_);
results_.clear();
}
}

void MongoDBClient::add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
) {
auto document = bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp("original_path", original_path),
bsoncxx::builder::basic::kvp("message", message),
bsoncxx::builder::basic::kvp("timestamp", timestamp)
);

results_.push_back(std::move(document));

if (results_.size() >= batch_size_) {
collection_.insert_many(results_);
results_.clear();
}
}
} // namespace clp::clo
49 changes: 49 additions & 0 deletions components/core/src/clp/clo/MongoDBClient.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef CLP_CLO_MONGODBCLIENT_HPP
#define CLP_CLO_MONGODBCLIENT_HPP

#include <string>

#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
#include <mongocxx/exception/exception.hpp>
#include <mongocxx/instance.hpp>
#include <mongocxx/uri.hpp>

#include "../Defs.h"

namespace clp::clo {
class MongoDBClient {
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
public:
// Constructors
MongoDBClient(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring.

std::string const& uri,
std::string const& db,
std::string const& collection,
uint64_t batch_size = 1000
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
);

// Methods
/**
* Adds a result to the batch.
* @param original_path The original path of the log event.
* @param message The content of the log event.
* @param timestamp The timestamp of the log event.
*/
void
add_result(std::string const& original_path, std::string const& message, epochtime_t timestamp);

/**
* Flushes the batch.
*/
void flush();

private:
mongocxx::instance instance_{};
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
mongocxx::client client_;
mongocxx::collection collection_;
std::vector<bsoncxx::document::value> results_;
uint64_t batch_size_;
};
} // namespace clp::clo

#endif // CLP_CLO_MONGODBCLIENT_HPP
Loading
Loading