Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(package): Add option to output search results as raw logs. #641

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,31 @@ def create_and_monitor_job_in_db(
logger.error(f"job {job_id} finished with unexpected status: {job_status}")


async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
unpacker = msgpack.Unpacker()
while True:
# Read some data from the worker and feed it to msgpack
buf = await reader.read(1024)
if b"" == buf:
# Worker closed
return
unpacker.feed(buf)
def get_worker_connection_handler(raw_output: bool):
async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
unpacker = msgpack.Unpacker()
while True:
# Read some data from the worker and feed it to msgpack
buf = await reader.read(1024)
if b"" == buf:
# Worker closed
return
unpacker.feed(buf)

# Print out any messages we can decode in the form of ORIG_PATH: MSG
for unpacked in unpacker:
print(f"{unpacked[2]}: {unpacked[1]}", end="")
except asyncio.CancelledError:
return
finally:
writer.close()
# Print out any messages we can decode in the form of ORIG_PATH: MSG, or simply MSG
# if raw output is enabled.
for unpacked in unpacker:
if raw_output:
print(f"{unpacked[1]}", end="")
else:
print(f"{unpacked[2]}: {unpacked[1]}", end="")
except asyncio.CancelledError:
return
finally:
writer.close()

return worker_connection_handler


async def do_search_without_aggregation(
Expand All @@ -112,6 +119,7 @@ async def do_search_without_aggregation(
end_timestamp: int | None,
ignore_case: bool,
path_filter: str | None,
raw_output: bool,
):
ip_list = socket.gethostbyname_ex(socket.gethostname())[2]
if len(ip_list) == 0:
Expand All @@ -125,7 +133,7 @@ async def do_search_without_aggregation(
break

server = await asyncio.start_server(
client_connected_cb=worker_connection_handler,
client_connected_cb=get_worker_connection_handler(raw_output),
host=host,
port=0,
family=socket.AF_INET,
Expand Down Expand Up @@ -184,6 +192,7 @@ async def do_search(
path_filter: str | None,
do_count_aggregation: bool | None,
count_by_time_bucket_size: int | None,
raw_output: bool,
):
if do_count_aggregation is None and count_by_time_bucket_size is None:
await do_search_without_aggregation(
Expand All @@ -195,6 +204,7 @@ async def do_search(
end_timestamp,
ignore_case,
path_filter,
raw_output,
)
else:
await run_function_in_process(
Expand Down Expand Up @@ -226,12 +236,12 @@ def main(argv):
args_parser.add_argument(
"--begin-time",
type=int,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-time",
type=int,
help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter upper-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--ignore-case",
Expand All @@ -250,6 +260,9 @@ def main(argv):
type=int,
help="Count the number of results in each time span of the given size (ms).",
)
args_parser.add_argument(
"--raw", action="store_true", help="Output the search results as raw logs."
)
parsed_args = args_parser.parse_args(argv[1:])

if (
Expand Down Expand Up @@ -281,6 +294,7 @@ def main(argv):
parsed_args.file_path,
parsed_args.count,
parsed_args.count_by_time,
parsed_args.raw,
)
)
except asyncio.CancelledError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def main(argv):
args_parser.add_argument(
"--begin-time",
type=int,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-time",
type=int,
help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter upper-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--ignore-case",
Expand All @@ -61,6 +61,9 @@ def main(argv):
type=int,
help="Count the number of results in each time span of the given size (ms).",
)
args_parser.add_argument(
"--raw", action="store_true", help="Output the search results as raw logs."
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
Expand Down Expand Up @@ -119,6 +122,8 @@ def main(argv):
if parsed_args.count_by_time is not None:
search_cmd.append("--count-by-time")
search_cmd.append(str(parsed_args.count_by_time))
if parsed_args.raw:
search_cmd.append("--raw")
cmd = container_start_cmd + search_cmd
subprocess.run(cmd, check=True)

Expand Down
Loading