Skip to content

Commit

Permalink
complete support for sending results to mongodb
Browse files Browse the repository at this point in the history
  • Loading branch information
wraymo committed Jan 11, 2024
1 parent 36834bf commit 95b3684
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from __future__ import annotations

import argparse
import asyncio
import datetime
import logging
import pathlib
import socket
import pymongo
import sys
import time
from asyncio import StreamReader, StreamWriter
from contextlib import closing

import msgpack
Expand All @@ -19,7 +17,11 @@
validate_and_load_config_file,
get_clp_home
)
from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, Database
from clp_py_utils.clp_config import (
CLP_METADATA_TABLE_PREFIX,
Database,
ResultsCache
)
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.job_config import SearchConfig
from job_orchestration.scheduler.constants import JobStatus
Expand All @@ -35,13 +37,9 @@
logger.addHandler(logging_console_handler)


def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,
begin_timestamp: int | None, end_timestamp: int | None,
path_filter: str, search_controller_host: str,
search_controller_port: int):
def do_search(db_config: Database, results_cache: ResultsCache, wildcard_query: str,
begin_timestamp: int | None, end_timestamp: int | None, path_filter: str):
search_config = SearchConfig(
search_controller_host=search_controller_host,
search_controller_port=search_controller_port,
wildcard_query=wildcard_query,
begin_timestamp=begin_timestamp,
end_timestamp=end_timestamp,
Expand All @@ -50,14 +48,14 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,

sql_adapter = SQL_Adapter(db_config)
zstd_cctx = zstandard.ZstdCompressor(level=3)
with closing(sql_adapter.create_connection(True)) as db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor:
with closing(sql_adapter.create_connection(True)) as \
db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor:
# Create job
db_cursor.execute(f"INSERT INTO `search_jobs` (`search_config`) VALUES (%s)",
(zstd_cctx.compress(msgpack.packb(search_config.dict())),))
db_conn.commit()
job_id = db_cursor.lastrowid

# Create a task for each archive, in batches
next_pagination_id = 0
pagination_limit = 64
num_tasks_added = 0
Expand Down Expand Up @@ -102,7 +100,7 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,

# Wait for the job to be marked complete
job_complete = False
while not job_complete:
while job_complete:
db_cursor.execute(f"SELECT `status`, `status_msg` FROM `search_jobs` WHERE `id` = {job_id}")
# There will only ever be one row since it's impossible to have more than one job with the same ID
row = db_cursor.fetchall()[0]
Expand All @@ -113,39 +111,13 @@ def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str,
job_complete = True
db_conn.commit()

time.sleep(1)


async def do_search(db_config: Database, wildcard_query: str, begin_timestamp: int | None,
end_timestamp: int | None, path_filter: str):
search_config = SearchConfig(
wildcard_query=wildcard_query,
begin_timestamp=begin_timestamp,
end_timestamp=end_timestamp,
path_filter=path_filter
)

sql_adapter = SQL_Adapter(db_config)
with closing(sql_adapter.create_connection(True)) as db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor:
# Create job
db_cursor.execute(f"INSERT INTO `search_jobs` (`search_config`) VALUES (%s)",
((msgpack.packb(search_config.dict())),))
db_conn.commit()
job_id = db_cursor.lastrowid

while True:
db_cursor.execute(f"SELECT `status`, `status_msg` FROM `search_jobs` WHERE `id` = {job_id}")
# There will only ever be one row since it's impossible to have more than one job with the same ID
row = db_cursor.fetchall()[0]
if JobStatus.SUCCEEDED == row['status']:
break
elif JobStatus.FAILED == row['status']:
logger.error(row['status_msg'])
break
db_conn.commit()

time.sleep(0.5)

client = pymongo.MongoClient(results_cache.get_uri())
search_results_collection = client[results_cache.db_name][str(job_id)]
for document in search_results_collection.find():
print(document)


def main(argv):
clp_home = get_clp_home()
Expand Down Expand Up @@ -179,8 +151,8 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

do_search(clp_config.database, parsed_args.wildcard_query, parsed_args.begin_time,
parsed_args.end_time, parsed_args.file_path)
do_search(clp_config.database, clp_config.results_cache, parsed_args.wildcard_query,
parsed_args.begin_time, parsed_args.end_time, parsed_args.file_path)

return 0

Expand Down
4 changes: 4 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,17 @@ class Scheduler(BaseModel):
class ResultsCache(BaseModel):
host: str = 'localhost'
port: int = 27017
db_name: str = 'clp-search'

@validator('host')
def validate_host(cls, field):
if '' == field:
raise ValueError(f'{RESULTS_CACHE_COMPONENT_NAME}.host cannot be empty.')
return field

def get_uri(self):
return f"mongodb://{self.host}:{self.port}/{self.db_name}"


class Queue(BaseModel):
host: str = 'localhost'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@


def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_dir: pathlib.Path,
logs_dir: pathlib.Path, search_config: SearchConfig, archive_id: str):
logs_dir: pathlib.Path, search_config: SearchConfig, archive_id: str,
results_cache_url: str, results_cache_db_name: str):
"""
Searches the given archive for the given wildcard query
Expand All @@ -26,13 +27,16 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_di
:param logs_dir:
:param search_config:
:param archive_id:
:param results_cache_url:
:param results_cache_db_name:
:return: tuple -- (whether the search was successful, output messages)
"""
# Assemble search command
cmd = [
str(clp_home / 'bin' / 'clo'),
search_config.search_controller_host,
str(search_config.search_controller_port),
results_cache_url,
results_cache_db_name,
str(job_id),
str(archive_output_dir / archive_id),
search_config.wildcard_query
]
Expand Down Expand Up @@ -73,7 +77,8 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_di


@app.task()
def search(job_id: int, task_id: int, search_config_json: str, archive_id: str):
def search(job_id: int, task_id: int, search_config_json: str, archive_id: str,
results_cache_uri: str, results_cache_db_name: str):
clp_home = os.getenv('CLP_HOME')
archive_output_dir = os.getenv('CLP_ARCHIVE_OUTPUT_DIR')
logs_dir = os.getenv('CLP_LOGS_DIR')
Expand All @@ -92,7 +97,8 @@ def search(job_id: int, task_id: int, search_config_json: str, archive_id: str):

search_successful, worker_output = run_clo(job_id, task_id, pathlib.Path(clp_home),
pathlib.Path(archive_output_dir),
pathlib.Path(logs_dir), search_config, archive_id)
pathlib.Path(logs_dir), search_config, archive_id,
results_cache_uri, results_cache_db_name)

if search_successful:
task_update.status = TaskStatus.SUCCEEDED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import zstandard
from pydantic import ValidationError

from clp_py_utils.clp_config import CLPConfig, Database
from clp_py_utils.clp_config import (
CLPConfig,
Database,
ResultsCache,
)
from clp_py_utils.core import read_yaml_config_file
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.compression_task import compress
Expand Down Expand Up @@ -141,12 +145,15 @@ def schedule_compression_task(job: CompressionJob, task: CompressionTask, databa
return compress.apply_async(args, task_id=str(task.id), queue=QueueName.COMPRESSION, priority=task.priority)


def schedule_search_task(job: SearchJob, task: SearchTask, dctx: zstandard.ZstdDecompressor):
args = (job.id, task.id, job.get_search_config_json_str(dctx), task.archive_id)
def schedule_search_task(job: SearchJob, task: SearchTask, results_cache: ResultsCache,
dctx: zstandard.ZstdDecompressor):
args = (job.id, task.id, job.get_search_config_json_str(dctx), task.archive_id,
results_cache.get_uri(), results_cache.db_name)
return search.apply_async(args, task_id=str(task.id), queue=QueueName.SEARCH, priority=task.priority)


def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database):
def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database,
results_cache: ResultsCache):
"""
For all task with SUBMITTED status, push them to task queue to be processed, if finished, update them
"""
Expand Down Expand Up @@ -183,10 +190,11 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database)
num_tasks_completed=task_row['num_tasks_completed'],
search_config=task_row['search_config'],
)
update_search_job_metadata(db_cursor, job_id, dict(start_time=now.strftime('%Y-%m-%d %H:%M:%S')))
update_search_job_metadata(db_cursor, job_id,
dict(start_time=now.strftime('%Y-%m-%d %H:%M:%S')))
id_to_search_job[search_job.id] = search_job

celery_task = schedule_search_task(search_job, search_task, dctx)
celery_task = schedule_search_task(search_job, search_task, results_cache, dctx)

update_search_task_metadata(db_cursor, search_task.id, dict(
status=TaskStatus.SCHEDULED,
Expand Down Expand Up @@ -419,7 +427,7 @@ def callback(ch, method, properties, body):
task_update = TaskUpdate.parse_raw(body)
if TaskStatus.FAILED == task_update.status:
task_update = TaskFailureUpdate.parse_raw(body)
elif TaskUpdateType.COMPRESSION == task_update.type and\
elif TaskUpdateType.COMPRESSION == task_update.type and \
TaskStatus.SUCCEEDED == task_update.status:
task_update = CompressionTaskSuccessUpdate.parse_raw(body)
except ValidationError as err:
Expand Down Expand Up @@ -483,7 +491,8 @@ def main(argv):
# Start Job Processing Loop
with closing(sql_adapter.create_connection(True)) as db_conn, \
closing(db_conn.cursor(dictionary=True)) as db_cursor:
search_and_schedule_new_tasks(db_conn, db_cursor, sql_adapter.database_config)
search_and_schedule_new_tasks(db_conn, db_cursor, sql_adapter.database_config,
clp_config.results_cache)
update_completed_jobs(db_cursor, 'compression')
update_completed_jobs(db_cursor, 'search')
db_conn.commit()
Expand Down

0 comments on commit 95b3684

Please sign in to comment.