Skip to content

Commit

Permalink
feat(package): Add option to output search results as raw logs. (#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 authored Dec 19, 2024
1 parent 594968a commit 38b79b1
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,31 @@ def create_and_monitor_job_in_db(
logger.error(f"job {job_id} finished with unexpected status: {job_status}")


async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
unpacker = msgpack.Unpacker()
while True:
# Read some data from the worker and feed it to msgpack
buf = await reader.read(1024)
if b"" == buf:
# Worker closed
return
unpacker.feed(buf)
def get_worker_connection_handler(raw_output: bool):
async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
unpacker = msgpack.Unpacker()
while True:
# Read some data from the worker and feed it to msgpack
buf = await reader.read(1024)
if b"" == buf:
# Worker closed
return
unpacker.feed(buf)

# Print out any messages we can decode in the form of ORIG_PATH: MSG
for unpacked in unpacker:
print(f"{unpacked[2]}: {unpacked[1]}", end="")
except asyncio.CancelledError:
return
finally:
writer.close()
# Print out any messages we can decode in the form of ORIG_PATH: MSG, or simply MSG
# if raw output is enabled.
for unpacked in unpacker:
if raw_output:
print(f"{unpacked[1]}", end="")
else:
print(f"{unpacked[2]}: {unpacked[1]}", end="")
except asyncio.CancelledError:
return
finally:
writer.close()

return worker_connection_handler


async def do_search_without_aggregation(
Expand All @@ -112,6 +119,7 @@ async def do_search_without_aggregation(
end_timestamp: int | None,
ignore_case: bool,
path_filter: str | None,
raw_output: bool,
):
ip_list = socket.gethostbyname_ex(socket.gethostname())[2]
if len(ip_list) == 0:
Expand All @@ -125,7 +133,7 @@ async def do_search_without_aggregation(
break

server = await asyncio.start_server(
client_connected_cb=worker_connection_handler,
client_connected_cb=get_worker_connection_handler(raw_output),
host=host,
port=0,
family=socket.AF_INET,
Expand Down Expand Up @@ -184,6 +192,7 @@ async def do_search(
path_filter: str | None,
do_count_aggregation: bool | None,
count_by_time_bucket_size: int | None,
raw_output: bool,
):
if do_count_aggregation is None and count_by_time_bucket_size is None:
await do_search_without_aggregation(
Expand All @@ -195,6 +204,7 @@ async def do_search(
end_timestamp,
ignore_case,
path_filter,
raw_output,
)
else:
await run_function_in_process(
Expand Down Expand Up @@ -226,12 +236,12 @@ def main(argv):
args_parser.add_argument(
"--begin-time",
type=int,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-time",
type=int,
help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter upper-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--ignore-case",
Expand All @@ -250,6 +260,9 @@ def main(argv):
type=int,
help="Count the number of results in each time span of the given size (ms).",
)
args_parser.add_argument(
"--raw", action="store_true", help="Output the search results as raw logs."
)
parsed_args = args_parser.parse_args(argv[1:])

if (
Expand Down Expand Up @@ -281,6 +294,7 @@ def main(argv):
parsed_args.file_path,
parsed_args.count,
parsed_args.count_by_time,
parsed_args.raw,
)
)
except asyncio.CancelledError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def main(argv):
args_parser.add_argument(
"--begin-time",
type=int,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-time",
type=int,
help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter upper-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--ignore-case",
Expand All @@ -61,6 +61,9 @@ def main(argv):
type=int,
help="Count the number of results in each time span of the given size (ms).",
)
args_parser.add_argument(
"--raw", action="store_true", help="Output the search results as raw logs."
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
Expand Down Expand Up @@ -119,6 +122,8 @@ def main(argv):
if parsed_args.count_by_time is not None:
search_cmd.append("--count-by-time")
search_cmd.append(str(parsed_args.count_by_time))
if parsed_args.raw:
search_cmd.append("--raw")
cmd = container_start_cmd + search_cmd
subprocess.run(cmd, check=True)

Expand Down

0 comments on commit 38b79b1

Please sign in to comment.