Skip to content

Commit

Permalink
[review comments] Refactor Timeouts to vocabulary type in Cython
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick M. Niedzielski <[email protected]>
  • Loading branch information
pniedzielski committed Nov 30, 2023
1 parent 418adf8 commit fe22261
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 135 deletions.
2 changes: 1 addition & 1 deletion src/blazingmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ._session import QueueOptions
from ._session import Session
from ._session import SessionOptions
from ._session import Timeouts
from ._timeouts import Timeouts
from ._typing import PropertyTypeDict
from ._typing import PropertyValueDict
from .exceptions import Error
Expand Down
7 changes: 2 additions & 5 deletions src/blazingmq/_ext.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ from blazingmq import CompressionAlgorithmType
from blazingmq import Message
from blazingmq import MessageHandle
from blazingmq import PropertyType
from blazingmq import Timeouts
from blazingmq.session_events import SessionEvent

DEFAULT_MAX_UNCONFIRMED_MESSAGES: int = ...
Expand All @@ -49,11 +50,7 @@ class Session:
channel_high_watermark: Optional[int] = None,
event_queue_watermarks: Optional[tuple[int, int]] = None,
stats_dump_interval: Optional[int | float] = None,
connect_timeout: Optional[int | float] = None,
disconnect_timeout: Optional[int | float] = None,
open_queue_timeout: Optional[int | float] = None,
configure_queue_timeout: Optional[int | float] = None,
close_queue_timeout: Optional[int | float] = None,
timeouts: Timeouts = (Timeouts()),
monitor_host_health: bool = False,
fake_host_health_monitor: Optional[FakeHostHealthMonitor] = None,
) -> None: ...
Expand Down
17 changes: 7 additions & 10 deletions src/blazingmq/_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ from . import _callbacks
from . import _enums
from . import _messages
from . import _script_name
from . import _timeouts
from . import session_events
from .exceptions import BrokerTimeoutError
from .exceptions import Error
Expand Down Expand Up @@ -170,11 +171,7 @@ cdef class Session:
channel_high_watermark: Optional[int] = None,
event_queue_watermarks: Optional[tuple[int,int]] = None,
stats_dump_interval: Optional[int|float] = None,
connect_timeout: Optional[int|float] = None,
disconnect_timeout: Optional[int|float] = None,
open_queue_timeout: Optional[int|float] = None,
configure_queue_timeout: Optional[int|float] = None,
close_queue_timeout: Optional[int|float] = None,
timeouts: _timeouts.Timeouts = (_timeouts.Timeouts()),
monitor_host_health: bool = False,
fake_host_health_monitor: FakeHostHealthMonitor = None,
_mock: Optional[object] = None,
Expand All @@ -185,11 +182,11 @@ cdef class Session:
cdef optional[int] c_channel_high_watermark
cdef optional[pair[int,int]] c_event_queue_watermarks
cdef TimeInterval c_stats_dump_interval = create_time_interval(stats_dump_interval)
cdef TimeInterval c_connect_timeout = create_time_interval(connect_timeout)
cdef TimeInterval c_disconnect_timeout = create_time_interval(disconnect_timeout)
cdef TimeInterval c_open_queue_timeout = create_time_interval(open_queue_timeout)
cdef TimeInterval c_configure_queue_timeout = create_time_interval(configure_queue_timeout)
cdef TimeInterval c_close_queue_timeout = create_time_interval(close_queue_timeout)
cdef TimeInterval c_connect_timeout = create_time_interval(timeouts.connect_timeout)
cdef TimeInterval c_disconnect_timeout = create_time_interval(timeouts.disconnect_timeout)
cdef TimeInterval c_open_queue_timeout = create_time_interval(timeouts.open_queue_timeout)
cdef TimeInterval c_configure_queue_timeout = create_time_interval(timeouts.configure_queue_timeout)
cdef TimeInterval c_close_queue_timeout = create_time_interval(timeouts.close_queue_timeout)

PyEval_InitThreads()

Expand Down
98 changes: 18 additions & 80 deletions src/blazingmq/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from ._messages import Message
from ._messages import MessageHandle
from ._monitors import BasicHealthMonitor
from ._timeouts import Timeouts
from ._typing import PropertyTypeDict
from ._typing import PropertyValueDict
from ._typing import PropertyValueType
Expand All @@ -55,6 +56,22 @@ def DefaultMonitor() -> Union[BasicHealthMonitor, None]:
KNOWN_MONITORS = ("blazingmq.BasicHealthMonitor",)


def _validate_timeouts(timeouts: Timeouts) -> Timeouts:
"""Validate a `.Timeouts` instance for use by the Cython layer.
If any of the timeouts contained within the `.Timeouts` instance are the
`DEFAULT_TIMEOUT` sentinel or `None`, return `None`. Otherwise, validate
that it is within the range accepted by `bsls::TimeInterval` and return it.
"""
return Timeouts(
connect_timeout=_convert_timeout(timeouts.connect_timeout),
disconnect_timeout=_convert_timeout(timeouts.disconnect_timeout),
open_queue_timeout=_convert_timeout(timeouts.open_queue_timeout),
configure_queue_timeout=_convert_timeout(timeouts.configure_queue_timeout),
close_queue_timeout=_convert_timeout(timeouts.close_queue_timeout)
)


def _convert_timeout(timeout: Optional[float]) -> Optional[float]:
"""Convert the timeout for use by the Cython layer.
Expand Down Expand Up @@ -221,81 +238,6 @@ def __repr__(self) -> str:
return f"QueueOptions({', '.join(params)})"


class Timeouts:
"""A value semantic type representing session timeouts.
Each option can be set either by passing it as a keyword argument when
constructing a *Timeouts* instance, or by setting it as an attribute on
a constructed instance.
The default for every option is `None`. When constructing a `Session`,
either directly or using `SessionOptions`, options set to `None` are given
reasonable default values.
Args:
connect_timeout:
The maximum number of seconds to wait for connection requests on
this session.
disconnect_timeout:
The maximum number of seconds to wait for disconnection requests
on this session.
open_queue_timeout:
The maximum number of seconds to wait for open queue requests on
this session.
configure_queue_timeout:
The maximum number of seconds to wait for configure queue requests
on this session.
close_queue_timeout:
The maximum number of seconds to wait for close queue requests on
this session.
"""

def __init__(
self,
connect_timeout: Optional[float] = None,
disconnect_timeout: Optional[float] = None,
open_queue_timeout: Optional[float] = None,
configure_queue_timeout: Optional[float] = None,
close_queue_timeout: Optional[float] = None,
) -> None:
self.connect_timeout = connect_timeout
self.disconnect_timeout = disconnect_timeout
self.open_queue_timeout = open_queue_timeout
self.configure_queue_timeout = configure_queue_timeout
self.close_queue_timeout = close_queue_timeout

def __eq__(self, other: object) -> bool:
if not isinstance(other, Timeouts):
return False
return (
self.connect_timeout == other.connect_timeout
and self.disconnect_timeout == other.disconnect_timeout
and self.open_queue_timeout == other.open_queue_timeout
and self.configure_queue_timeout == other.configure_queue_timeout
and self.close_queue_timeout == other.close_queue_timeout
)

def __ne__(self, other: object) -> bool:
return not self == other

def __repr__(self) -> str:
attrs = (
"connect_timeout",
"disconnect_timeout",
"open_queue_timeout",
"configure_queue_timeout",
"close_queue_timeout",
)

params = []
for attr in attrs:
value = getattr(self, attr)
if value is not None:
params.append(f"{attr}={value!r}")

return f"Timeouts({', '.join(params)})"


class SessionOptions:
"""A value semantic type representing session options.
Expand Down Expand Up @@ -513,11 +455,7 @@ def __init__(
channel_high_watermark=channel_high_watermark,
event_queue_watermarks=event_queue_watermarks,
stats_dump_interval=_convert_stats_dump_interval(stats_dump_interval),
connect_timeout=_convert_timeout(timeout.connect_timeout),
disconnect_timeout=_convert_timeout(timeout.disconnect_timeout),
open_queue_timeout=_convert_timeout(timeout.open_queue_timeout),
configure_queue_timeout=_convert_timeout(timeout.configure_queue_timeout),
close_queue_timeout=_convert_timeout(timeout.close_queue_timeout),
timeouts=_validate_timeouts(timeout),
monitor_host_health=monitor_host_health,
fake_host_health_monitor=fake_host_health_monitor,
)
Expand Down
13 changes: 9 additions & 4 deletions tests/unit/test_ext_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pytest

from blazingmq import exceptions
from blazingmq import Timeouts
from blazingmq._ext import Session
from blazingmq._ext import ensure_stop_session
from blazingmq.session_events import InterfaceError
Expand Down Expand Up @@ -120,9 +121,10 @@ def test_start_connect_timeout():
# GIVEN
mock = sdk_mock(start=0, stop=None)
timeout = 1.12345
timeouts = Timeouts(connect_timeout=timeout)

# WHEN
session = Session(dummy_callback, connect_timeout=timeout, _mock=mock)
session = Session(dummy_callback, timeouts=timeouts, _mock=mock)

# THEN
mock.start.assert_called_once_with(timeout=timeout)
Expand Down Expand Up @@ -207,13 +209,16 @@ def test_session_timeout_passed_to_queue_defaults():
open_queue_timeout = 321
configure_queue_timeout = 432
close_queue_timeout = 543
timeouts = Timeouts(
open_queue_timeout=open_queue_timeout,
configure_queue_timeout=configure_queue_timeout,
close_queue_timeout=close_queue_timeout
)

# WHEN
Session(
dummy_callback,
open_queue_timeout=open_queue_timeout,
configure_queue_timeout=configure_queue_timeout,
close_queue_timeout=close_queue_timeout,
timeouts=timeouts,
_mock=mock,
)

Expand Down
54 changes: 19 additions & 35 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ def dummy2():
channel_high_watermark=8000000,
event_queue_watermarks=(6000000, 7000000),
stats_dump_interval=30.0,
connect_timeout=None,
disconnect_timeout=None,
open_queue_timeout=60.0,
configure_queue_timeout=60.0,
close_queue_timeout=60.0,
timeouts=Timeouts(
connect_timeout=None,
disconnect_timeout=None,
open_queue_timeout=60.0,
configure_queue_timeout=60.0,
close_queue_timeout=60.0
),
monitor_host_health=False,
fake_host_health_monitor=None,
)
Expand Down Expand Up @@ -123,11 +125,7 @@ def dummy2():
channel_high_watermark=8000000,
event_queue_watermarks=(6000000, 7000000),
stats_dump_interval=30.0,
connect_timeout=60.0,
disconnect_timeout=70.0,
open_queue_timeout=80.0,
configure_queue_timeout=90.0,
close_queue_timeout=100.0,
timeouts=timeouts,
monitor_host_health=False,
fake_host_health_monitor=None,
)
Expand Down Expand Up @@ -171,11 +169,7 @@ def dummy2():
channel_high_watermark=8000000,
event_queue_watermarks=(6000000, 7000000),
stats_dump_interval=30.0,
connect_timeout=None,
disconnect_timeout=None,
open_queue_timeout=None,
configure_queue_timeout=None,
close_queue_timeout=None,
timeouts=timeouts,
monitor_host_health=False,
fake_host_health_monitor=None,
)
Expand Down Expand Up @@ -210,11 +204,7 @@ def dummy2():
channel_high_watermark=None,
event_queue_watermarks=None,
stats_dump_interval=None,
connect_timeout=None,
disconnect_timeout=None,
open_queue_timeout=None,
configure_queue_timeout=None,
close_queue_timeout=None,
timeouts=Timeouts(),
monitor_host_health=False,
fake_host_health_monitor=None,
)
Expand Down Expand Up @@ -266,11 +256,7 @@ def dummy2():
channel_high_watermark=8000000,
event_queue_watermarks=(6000000, 7000000),
stats_dump_interval=30.0,
connect_timeout=60.0,
disconnect_timeout=70.0,
open_queue_timeout=80.0,
configure_queue_timeout=90.0,
close_queue_timeout=100.0,
timeouts=timeouts,
monitor_host_health=False,
fake_host_health_monitor=None,
)
Expand Down Expand Up @@ -309,11 +295,13 @@ def dummy2():
channel_high_watermark=None,
event_queue_watermarks=None,
stats_dump_interval=None,
connect_timeout=None,
disconnect_timeout=None,
open_queue_timeout=60.0,
configure_queue_timeout=60.0,
close_queue_timeout=60.0,
timeouts=Timeouts(
connect_timeout=None,
disconnect_timeout=None,
open_queue_timeout=60.0,
configure_queue_timeout=60.0,
close_queue_timeout=60.0
),
monitor_host_health=True,
fake_host_health_monitor=monitor._monitor,
)
Expand Down Expand Up @@ -344,11 +332,7 @@ def dummy2():
channel_high_watermark=None,
event_queue_watermarks=None,
stats_dump_interval=None,
connect_timeout=None,
disconnect_timeout=None,
open_queue_timeout=None,
configure_queue_timeout=None,
close_queue_timeout=None,
timeouts=Timeouts(),
monitor_host_health=False,
fake_host_health_monitor=None,
)
Expand Down

0 comments on commit fe22261

Please sign in to comment.