-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/mc 514 consume logs from client side #226
Feature/mc 514 consume logs from client side #226
Conversation
src/parlant/bin/client.py
Outdated
def stream_logs(ctx: click.Context, filters: list[str]) -> Iterator[dict[str, Any]]: | ||
try: | ||
context = zmq.Context.instance() | ||
sub_socket = context.socket(zmq.SUB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sub_socket
needs to be initialized before the try
for the finally
section to make sense
src/parlant/bin/client.py
Outdated
|
||
while True: | ||
try: | ||
message = cast(dict[str, Any], sub_socket.recv_json(flags=zmq.NOBLOCK)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using NOBLOCK
here?
And then sleeping for 0.01s?
That's a lot of idle spinning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the block. I thought in the begining it will be async and maybe pulling multiple topics. nvm. ^^
src/parlant/bin/client.py
Outdated
@@ -1984,10 +2033,21 @@ class Config: | |||
metavar="ADDRESS[:PORT]", | |||
default="http://localhost:8800", | |||
) | |||
@click.option( | |||
"--log-server-address", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How it's written now we would have to write the following to access a remote server:
parlant --server http://1.2.3.4:8800 --log-server-address tcp://1.2.3.4:8799 [...]
Instead let's assume that the log server is on the same host as the server itself, and just allow specifying a different port. So the option would be --log-port
and we'd use the host name from the --server
option.
@@ -233,7 +233,7 @@ async def _process_guideline_batch( | |||
guideline = guidelines_dict[int(proposition.guideline_number)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add 2 logs here:
[GuidelineProposer][Prompt]
(debug)
[GuidelineProposer][Completion]
(debug) --> output all of the response's content as indented JSON
return [MessageEventGenerationResult(generation_info, [])] | ||
except Exception as exc: | ||
self._logger.warning( | ||
f"Generation attempt {generation_attempt} failed: {traceback.format_exception(exc)}" | ||
f"[MessageEventGenerator] Generation attempt {generation_attempt} failed: {traceback.format_exception(exc)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In MEG let's also add the log
[MessageEventGenerator][Completion]
--> logging all of the generated content as indented JSON.
And BTW let's do the same for ToolCaller too... so we have [Prompt]
and [Completion]
for all components.
@@ -171,7 +171,7 @@ async def generate_events( | |||
) | |||
|
|||
if response_message is not None: | |||
self._logger.debug(f'Message production result: "{response_message}"') | |||
self._logger.debug(f'[MessageEventGenerator][Result]: "{response_message}"') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename, this isn't just "result", this is [MessageEventGenerator][GeneratedMessage]
@@ -518,7 +522,7 @@ async def _generate_response_message( | |||
and not final_revision.is_repeat_message | |||
): | |||
self._logger.warning( | |||
f"PROBLEMATIC MESSAGE EVENT PRODUCER RESPONSE: {final_revision.content}" | |||
f"[MessageEventGenerator] PROBLEMATIC MESSAGE EVENT PRODUCER RESPONSE: {final_revision.content}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll have to rebase here as this part was changed already
@@ -555,7 +557,7 @@ async def _run_inference( | |||
|
|||
log_message = json.dumps(inference.content.model_dump(mode="json"), indent=2) | |||
|
|||
self._logger.debug(f"Tool call request results: {log_message}") | |||
self._logger.debug(f"[ToolCaller][RequestResults]: {log_message}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to [ToolCaller][Completion]
@@ -142,7 +142,7 @@ async def generate_events( | |||
shots=await self.shots(), | |||
) | |||
|
|||
self._logger.debug(f"Message production prompt:\n{prompt}") | |||
self._logger.debug(f"[MessageEventGenerator][Prompt]:\n{prompt}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you sometimes have a colon (:) after the [..]
part and sometimes don't.
Let's be consistent: go over all instances and let's go with no colon. Just a space after the last bracket (in this case a newline is fine instead of the space since it's a multiline prompt)
src/parlant/core/logging.py
Outdated
yield | ||
finally: | ||
for context in reversed(contexts): | ||
context.__exit__(None, None, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bug here. What if we got an exception? We'll be calling __exit__
here without one.
Why not use ExitStack
to write this cleanly?
sub_socket.connect(f"tcp://localhost:{PARLANT_LOG_PORT}") | ||
sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") | ||
|
||
await asyncio.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to sleep here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It need to to connect. otherwise we the messages are sent without the subscriber receiving in
…parlant log "Revisions" "Evaluation"
…now exclusively listen to the log topic. - Replaced the `--log-server-address` parameter with `--log-port` for specifying the log server port.
9355906
to
2e7217c
Compare
CLI Logging Commands:
* Added a log command group to the CLI, allowing users to view logs in real-time with optional filters.
* Support for filtering logs by specific components:
* --guideline-proposer (-g)
* --tool-caller (-t)
* --message-event-generator (-m)
* Support for logical operators (AND/OR) between multiple filters.
* Added support for additional pattern matching on log messages.
* Example CLI commands: