Skip to content

Commit

Permalink
Allow overriding max_concurrent_commands
Browse files Browse the repository at this point in the history
in the listener via environmental variable
  • Loading branch information
gregorjerse committed Nov 19, 2023
1 parent c09aa96 commit 75c6062
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
3 changes: 3 additions & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ Changed
-------
- Redis cache in listener is updated when data fields are retrieved from the
database

Added
-----
- Add processes allow and ignore list to dispatcher, controlled by
environmental variables ``FLOW_PROCESSES_ALLOW_LIST`` and
``FLOW_PROCESSES_IGNORE_LIST```
- Allow ovirriding the maximal number of commands listener can process
concurrently

Fixed
-----
Expand Down
15 changes: 14 additions & 1 deletion resolwe/flow/managers/listener/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ def __init__(
port: int,
protocol: str,
zmq_socket: Optional[zmq.asyncio.Socket] = None,
max_concurrent_commands: int = 10,
):
"""Initialize."""
if zmq_socket is None:
Expand All @@ -546,6 +547,7 @@ def __init__(
super().__init__(
ZMQCommunicator(zmq_socket, "listener <-> workers", logger),
logger,
max_concurrent_commands,
)
self.communicator.heartbeat_handler = self.heartbeat_handler
self._message_processor = Processor(self)
Expand Down Expand Up @@ -688,6 +690,13 @@ def __init__(
getattr(settings, "LISTENER_CONNECTION", {}).get("protocol", "tcp"),
)

self.max_concurrent_commands = kwargs.get(
"max_concurrent_commands",
getattr(settings, "LISTENER_CONNECTION", {}).get(
"max_concurrent_commands", 10
),
)

# When zmq_socket kwarg is not None, use this one instead of creating
# a new one.
self.zmq_socket = kwargs.get("zmq_socket")
Expand Down Expand Up @@ -729,7 +738,11 @@ def listener_protocol(self) -> ListenerProtocol:
"""
if self._listener_protocol is None:
self._listener_protocol = ListenerProtocol(
self.hosts, self.port, self.protocol, self.zmq_socket
self.hosts,
self.port,
self.protocol,
self.zmq_socket,
self.max_concurrent_commands,
)
return self._listener_protocol

Expand Down
4 changes: 4 additions & 0 deletions tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
"min_port": 50000,
"max_port": 60000,
"protocol": "tcp",
# Define the max number of commands listener can process simultaneously.
"max_concurrent_commands": config(
"RESOLWE_LISTENER_MAX_CONCURRENT_COMMANDS", cast=int, default=10
),
}

# The IP address where listener is available from the communication container.
Expand Down

0 comments on commit 75c6062

Please sign in to comment.