diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index d166cf35f..d292656a7 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -83,24 +83,31 @@ def create_and_monitor_job_in_db( logger.error(f"job {job_id} finished with unexpected status: {job_status}") -async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - try: - unpacker = msgpack.Unpacker() - while True: - # Read some data from the worker and feed it to msgpack - buf = await reader.read(1024) - if b"" == buf: - # Worker closed - return - unpacker.feed(buf) +def get_worker_connection_handler(raw_output: bool): + async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + try: + unpacker = msgpack.Unpacker() + while True: + # Read some data from the worker and feed it to msgpack + buf = await reader.read(1024) + if b"" == buf: + # Worker closed + return + unpacker.feed(buf) - # Print out any messages we can decode in the form of ORIG_PATH: MSG - for unpacked in unpacker: - print(f"{unpacked[2]}: {unpacked[1]}", end="") - except asyncio.CancelledError: - return - finally: - writer.close() + # Print out any messages we can decode in the form of ORIG_PATH: MSG, or simply MSG + # if raw output is enabled. + for unpacked in unpacker: + if raw_output: + print(f"{unpacked[1]}", end="") + else: + print(f"{unpacked[2]}: {unpacked[1]}", end="") + except asyncio.CancelledError: + return + finally: + writer.close() + + return worker_connection_handler async def do_search_without_aggregation( @@ -112,6 +119,7 @@ async def do_search_without_aggregation( end_timestamp: int | None, ignore_case: bool, path_filter: str | None, + raw_output: bool, ): ip_list = socket.gethostbyname_ex(socket.gethostname())[2] if len(ip_list) == 0: @@ -125,7 +133,7 @@ async def do_search_without_aggregation( break server = await asyncio.start_server( - client_connected_cb=worker_connection_handler, + client_connected_cb=get_worker_connection_handler(raw_output), host=host, port=0, family=socket.AF_INET, @@ -184,6 +192,7 @@ async def do_search( path_filter: str | None, do_count_aggregation: bool | None, count_by_time_bucket_size: int | None, + raw_output: bool, ): if do_count_aggregation is None and count_by_time_bucket_size is None: await do_search_without_aggregation( @@ -195,6 +204,7 @@ async def do_search( end_timestamp, ignore_case, path_filter, + raw_output, ) else: await run_function_in_process( @@ -226,12 +236,12 @@ def main(argv): args_parser.add_argument( "--begin-time", type=int, - help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.", + help="Time range filter lower-bound (inclusive) as milliseconds from the UNIX epoch.", ) args_parser.add_argument( "--end-time", type=int, - help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.", + help="Time range filter upper-bound (inclusive) as milliseconds from the UNIX epoch.", ) args_parser.add_argument( "--ignore-case", @@ -250,6 +260,9 @@ def main(argv): type=int, help="Count the number of results in each time span of the given size (ms).", ) + args_parser.add_argument( + "--raw", action="store_true", help="Output the search results as raw logs." + ) parsed_args = args_parser.parse_args(argv[1:]) if ( @@ -281,6 +294,7 @@ def main(argv): parsed_args.file_path, parsed_args.count, parsed_args.count_by_time, + parsed_args.raw, ) ) except asyncio.CancelledError: diff --git a/components/clp-package-utils/clp_package_utils/scripts/search.py b/components/clp-package-utils/clp_package_utils/scripts/search.py index 38d528528..c01fb64b5 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/search.py @@ -42,12 +42,12 @@ def main(argv): args_parser.add_argument( "--begin-time", type=int, - help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.", + help="Time range filter lower-bound (inclusive) as milliseconds from the UNIX epoch.", ) args_parser.add_argument( "--end-time", type=int, - help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.", + help="Time range filter upper-bound (inclusive) as milliseconds from the UNIX epoch.", ) args_parser.add_argument( "--ignore-case", @@ -61,6 +61,9 @@ def main(argv): type=int, help="Count the number of results in each time span of the given size (ms).", ) + args_parser.add_argument( + "--raw", action="store_true", help="Output the search results as raw logs." + ) parsed_args = args_parser.parse_args(argv[1:]) # Validate and load config file @@ -119,6 +122,8 @@ def main(argv): if parsed_args.count_by_time is not None: search_cmd.append("--count-by-time") search_cmd.append(str(parsed_args.count_by_time)) + if parsed_args.raw: + search_cmd.append("--raw") cmd = container_start_cmd + search_cmd subprocess.run(cmd, check=True)