Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to update worker name #7746

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/unreleased/agent_executor_6.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
description: Update process name to make it easier to identify workers
change-type: patch
destination-branches: [master]
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ toml==0.10.2
typing_inspect==0.9.0
build==1.2.1
ruamel.yaml==0.18.6
setproctitle==1.3.3

# Optional import in code
graphviz==0.20.3
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"typing_inspect~=0.9",
"ruamel.yaml~=0.17",
"toml~=0.10 ",
"setproctitle~=1.3"
]


Expand Down
23 changes: 20 additions & 3 deletions src/inmanta/agent/forking_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import inmanta.util
from inmanta.agent import executor
from inmanta.protocol.ipc_light import FinalizingIPCClient, IPCServer, LogReceiver, LogShipper
from setproctitle import setproctitle

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -91,7 +92,7 @@ class ExecutorServer(IPCServer[ExecutorContext]):
Pipe break: go to 3a and 3b
"""

def __init__(self, name: str, take_over_logging: bool = True) -> None:
def __init__(self, name: str, logger: logging.Logger, take_over_logging: bool = True) -> None:
"""
:param take_over_logging: when we are connected and are able to stream logs, do we remove all other log handlers?
"""
Expand All @@ -101,6 +102,11 @@ def __init__(self, name: str, take_over_logging: bool = True) -> None:
self.ctx = ExecutorContext(self)
self.log_transport: typing.Optional[LogShipper] = None
self.take_over_logging = take_over_logging
self.logger = logger

def set_status(self, status: str) -> None:
"""Update the process name to reflect the identity and status of this executor"""
set_executor_status(self.name, status)

def connection_made(self, transport: transports.Transport) -> None:
super().connection_made(transport)
Expand All @@ -116,6 +122,8 @@ def connection_made(self, transport: transports.Transport) -> None:

self.log_transport = LogShipper(self, asyncio.get_running_loop())
logging.getLogger().addHandler(self.log_transport)
self.logger.info(f"Started executor with PID: {os.getpid()}")
self.set_status("connected")

def _detach_log_shipper(self) -> None:
# Once connection is lost, we want to detach asap to keep the logging clean and efficient
Expand Down Expand Up @@ -144,6 +152,7 @@ def connection_lost(self, exc: Exception | None) -> None:
"""We lost connection to the controler, bail out"""
self._detach_log_shipper()
self.logger.info("Connection lost", exc_info=exc)
self.set_status("disconnected")
self._sync_stop()
self.stopped.set()

Expand Down Expand Up @@ -287,6 +296,12 @@ async def call(self, context: ExecutorContext) -> inmanta.types.Apireturn:
return await context.executor.get_facts(self.resource)


def set_executor_status(name: str, status: str) -> None:
"""Update the process name to reflect the identity and status of the executor"""
# Lives outside the ExecutorServer class, so we can set status early in the boot process
setproctitle(f"inmanta: executor {name} - {status}")


def mp_worker_entrypoint(
socket: socket.socket,
name: str,
Expand All @@ -296,6 +311,7 @@ def mp_worker_entrypoint(
) -> None:
"""Entry point for child processes"""

set_executor_status(name, "connecting")
# Set up logging stage 1
# Basic config, starts on std.out
config_builder = inmanta.logging.LoggingConfigBuilder()
Expand All @@ -305,7 +321,6 @@ def mp_worker_entrypoint(

# Set up our own logger
logger = logging.getLogger(f"agent.executor.{name}")
logger.info(f"Started with PID: {os.getpid()}")

# Load config
inmanta.config.Config.load_config_from_dict(config)
Expand All @@ -315,7 +330,9 @@ async def serve() -> None:
# Start serving
# also performs setup of log shipper
# this is part of stage 2 logging setup
transport, protocol = await loop.connect_accepted_socket(functools.partial(ExecutorServer, name, not cli_log), socket)
transport, protocol = await loop.connect_accepted_socket(
functools.partial(ExecutorServer, name, logger, not cli_log), socket
)
inmanta.signals.setup_signal_handlers(protocol.stop)
await protocol.stopped.wait()

Expand Down
4 changes: 4 additions & 0 deletions tests/forking_agent/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import base64
import logging

import psutil
import pytest

import inmanta.agent
Expand Down Expand Up @@ -165,6 +166,9 @@ async def test_executor_server_dirty_shutdown(mpmanager: MPManager, caplog):
assert ["aaaa"] == result
print("Child there")

process_name = psutil.Process(pid=child1.process.pid).name()
assert process_name == "inmanta: executor test - connected"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondering whether we should include the full identity of an executor here, because multiple agents with the same name can exist at the same time. For example when a dry-run is requested for an unreleased version. With this implementation it will be hard to keep them apart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree (see description), I just don't see how to make this identity meaningful...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make it the concatenation of all known model versions (with a limit), collapsed into ranges? e.g. v1-10,12-20.

Not pretty, but the best I can think of atm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is volatile and not part of the current interfaces, but I kind of like it.

The ranges may compact quite badly, because not every version may reach the executor so it will not be aware of everything.

I don't think I'm going to add it now, but it is something to keep in mind

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved remark to #7692


await asyncio.get_running_loop().run_in_executor(None, child1.process.kill)
print("Kill sent")

Expand Down