Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command_timeout to async client #3436

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* Close Unix sockets if the connection attempt fails. This prevents `ResourceWarning`s. (#3314)
* Close SSL sockets if the connection attempt fails, or if validations fail. (#3317)
* Eliminate mutable default arguments in the `redis.commands.core.Script` class. (#3332)
* Add command_timeout to async client. (#3436)

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
Expand Down
2 changes: 2 additions & 0 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def __init__(
redis_connect_func=None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
command_timeout: Optional[float] = None,
):
"""
Initialize a new Redis client.
Expand Down Expand Up @@ -282,6 +283,7 @@ def __init__(
"lib_version": lib_version,
"redis_connect_func": redis_connect_func,
"protocol": protocol,
"command_timeout": command_timeout,
}
# based on input, setup appropriate connection args
if unix_socket_path is not None:
Expand Down
2 changes: 2 additions & 0 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def __init__(
ssl_ciphers: Optional[str] = None,
protocol: Optional[int] = 2,
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
command_timeout: Optional[float] = None,
) -> None:
if db:
raise RedisClusterException(
Expand Down Expand Up @@ -311,6 +312,7 @@ def __init__(
"socket_keepalive": socket_keepalive,
"socket_keepalive_options": socket_keepalive_options,
"socket_timeout": socket_timeout,
"command_timeout": command_timeout,
"retry": retry,
"protocol": protocol,
}
Expand Down
19 changes: 14 additions & 5 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class AbstractConnection:
"credential_provider",
"password",
"socket_timeout",
"command_timeout",
"socket_connect_timeout",
"redis_connect_func",
"retry_on_timeout",
Expand Down Expand Up @@ -148,6 +149,7 @@ def __init__(
encoder_class: Type[Encoder] = Encoder,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
command_timeout: Optional[float] = None,
):
if (username or password) and credential_provider is not None:
raise DataError(
Expand All @@ -167,6 +169,7 @@ def __init__(
if socket_connect_timeout is None:
socket_connect_timeout = socket_timeout
self.socket_connect_timeout = socket_connect_timeout
self.command_timeout = command_timeout
self.retry_on_timeout = retry_on_timeout
if retry_on_error is SENTINEL:
retry_on_error = []
Expand Down Expand Up @@ -206,6 +209,13 @@ def __init__(
raise ConnectionError("protocol must be either 2 or 3")
self.protocol = protocol

def _get_command_timeout(self, timeout: Optional[float] = None):
if timeout is not None:
return timeout
if self.command_timeout is not None:
return self.command_timeout
return self.socket_timeout

def __del__(self, _warnings: Any = warnings):
# For some reason, the individual streams don't get properly garbage
# collected and therefore produce no resource warnings. We add one
Expand Down Expand Up @@ -466,10 +476,9 @@ async def send_packed_command(
command = command.encode()
if isinstance(command, bytes):
command = [command]
if self.socket_timeout:
await asyncio.wait_for(
self._send_packed_command(command), self.socket_timeout
)
timeout = self._get_command_timeout()
if timeout:
await asyncio.wait_for(self._send_packed_command(command), timeout)
else:
self._writer.writelines(command)
await self._writer.drain()
Expand Down Expand Up @@ -518,7 +527,7 @@ async def read_response(
push_request: Optional[bool] = False,
):
"""Read the response from a previously sent command"""
read_timeout = timeout if timeout is not None else self.socket_timeout
read_timeout = self._get_command_timeout(timeout)
host_error = self._host_error()
try:
if (
Expand Down
1 change: 1 addition & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def parse_cluster_myshardid(resp, **options):
"credential_provider",
"db",
"decode_responses",
"command_timeout",
"encoding",
"encoding_errors",
"errors",
Expand Down