From 654330160a9826fb93e3ed041d2bf62a32a67411 Mon Sep 17 00:00:00 2001 From: Andrew Rowley Date: Fri, 10 Nov 2023 09:43:21 +0000 Subject: [PATCH 1/3] Try to let p2p_timeout pass --- spinnman/connections/scp_request_pipeline.py | 54 ++++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/spinnman/connections/scp_request_pipeline.py b/spinnman/connections/scp_request_pipeline.py index c1747303f..f234cf87c 100644 --- a/spinnman/connections/scp_request_pipeline.py +++ b/spinnman/connections/scp_request_pipeline.py @@ -69,6 +69,7 @@ class SCPRequestPipeLine(Generic[R]): "_n_retries", "_n_retry_code_resent", "_n_timeouts", + "_non_fail_retry_codes", "_packet_timeout", "_retry_reason", "_request_data", @@ -78,7 +79,8 @@ class SCPRequestPipeLine(Generic[R]): def __init__(self, connection: SCAMPConnection, n_channels=1, intermediate_channel_waits=0, - n_retries=N_RETRIES, packet_timeout=SCP_TIMEOUT): + n_retries=N_RETRIES, packet_timeout=SCP_TIMEOUT, + non_fail_retry_codes=None): """ :param SCAMPConnection connection: The connection over which the communication is to take place @@ -91,6 +93,9 @@ def __init__(self, connection: SCAMPConnection, n_channels=1, reason before an error is triggered :param float packet_timeout: The number of elapsed seconds after sending a packet before it is considered a timeout. + :param non_fail_retry_codes: Codes that could retry but won't fail, or + None if there are no such codes + :type non_fail_retry_codes: set(SCPResult) or None """ self._connection = connection self._n_channels = n_channels @@ -129,6 +134,9 @@ def __init__(self, connection: SCAMPConnection, n_channels=1, # The number of packets that have been resent self._n_resent = 0 self._n_retry_code_resent = 0 + self._non_fail_retry_codes = non_fail_retry_codes + if self._non_fail_retry_codes is None: + self._non_fail_retry_codes = set() # self._token_bucket = TokenBucket(43750, 4375000) # self._token_bucket = TokenBucket(3408, 700000) @@ -268,29 +276,31 @@ def _single_retrieve(self, timeout: float): time.sleep(0.1) self._resend(seq, request_sent, str(result)) self._n_retry_code_resent += 1 + return except Exception as e: # pylint: disable=broad-except - self._error_callbacks[seq]( - request_sent, e, - cast(TracebackType, sys.exc_info()[2]), - self._connection) - self._remove_record(seq) - else: - - # No retry is possible - try constructing the result - try: - response = request_sent.get_scp_response() - response.read_bytestring(raw_data, offset) - cb = self._callbacks[seq] - if cb is not None: - cb(response) - except Exception as e: # pylint: disable=broad-except - self._error_callbacks[seq]( - request_sent, e, - cast(TracebackType, sys.exc_info()[2]), - self._connection) + if result not in self._non_fail_retry_codes: + self._error_callbacks[seq]( + request_sent, e, + cast(TracebackType, sys.exc_info()[2]), + self._connection) + self._remove_record(seq) + return + + # No retry is possible and not failed - try constructing the result + try: + response = request_sent.get_scp_response() + response.read_bytestring(raw_data, offset) + cb = self._callbacks[seq] + if cb is not None: + cb(response) + except Exception as e: # pylint: disable=broad-except + self._error_callbacks[seq]( + request_sent, e, + cast(TracebackType, sys.exc_info()[2]), + self._connection) - # Remove the sequence from the outstanding responses - self._remove_record(seq) + # Remove the sequence from the outstanding responses + self._remove_record(seq) def _handle_receive_timeout(self) -> None: self._n_timeouts += 1 From e7697628dc2878fbfd39e478a0087c0286e8767e Mon Sep 17 00:00:00 2001 From: Andrew Rowley Date: Fri, 10 Nov 2023 12:29:41 +0000 Subject: [PATCH 2/3] See if this lets things pass --- .../processes/abstract_multi_connection_process.py | 12 ++++++++++-- spinnman/processes/get_n_cores_in_state_process.py | 4 +++- spinnman/processes/send_single_command_process.py | 10 ++++++++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/spinnman/processes/abstract_multi_connection_process.py b/spinnman/processes/abstract_multi_connection_process.py index 302505f71..3700a76fd 100644 --- a/spinnman/processes/abstract_multi_connection_process.py +++ b/spinnman/processes/abstract_multi_connection_process.py @@ -29,6 +29,7 @@ from spinnman.connections.udp_packet_connections import SCAMPConnection from spinnman.messages.scp.abstract_messages import ( AbstractSCPRequest, AbstractSCPResponse) +from spinnman.messages.scp.enums.scp_result import SCPResult #: Type of responses. #: :meta private: R = TypeVar("R", bound=AbstractSCPResponse) @@ -53,13 +54,15 @@ class AbstractMultiConnectionProcess(Generic[R]): "_intermediate_channel_waits", "_n_channels", "_n_retries", + "_non_fail_retry_codes", "_conn_selector", "_scp_request_pipelines", "_timeout") def __init__(self, next_connection_selector: ConnectionSelector, n_retries: int = N_RETRIES, timeout: float = SCP_TIMEOUT, - n_channels: int = 8, intermediate_channel_waits: int = 7): + n_channels: int = 8, intermediate_channel_waits: int = 7, + non_fail_retry_codes: Optional[set(SCPResult)] = None): """ :param ConnectionSelector next_connection_selector: How to choose the connection. @@ -74,6 +77,9 @@ def __init__(self, next_connection_selector: ConnectionSelector, :param int intermediate_channel_waits: The maximum number of outstanding message/reply pairs to have on a particular connection. Passed to :py:class:`SCPRequestPipeLine` + :param Optional[set(SCPResult)] non_fail_retry_codes: + Optional set of responses that result in retry but after retrying + don't then result in failure even if returned on the last call. """ self._exceptions: List[Exception] = [] self._tracebacks: List[TracebackType] = [] @@ -86,6 +92,7 @@ def __init__(self, next_connection_selector: ConnectionSelector, self._n_channels = n_channels self._intermediate_channel_waits = intermediate_channel_waits self._conn_selector = next_connection_selector + self._non_fail_retry_codes = non_fail_retry_codes def _send_request(self, request: AbstractSCPRequest[R], callback: Optional[Callable[[R], None]] = None, @@ -98,7 +105,8 @@ def _send_request(self, request: AbstractSCPRequest[R], connection, n_retries=self._n_retries, packet_timeout=self._timeout, n_channels=self._n_channels, - intermediate_channel_waits=self._intermediate_channel_waits) + intermediate_channel_waits=self._intermediate_channel_waits, + non_fail_retry_codes=self._non_fail_retry_codes) self._scp_request_pipelines[connection].send_request( request, callback, error_callback) diff --git a/spinnman/processes/get_n_cores_in_state_process.py b/spinnman/processes/get_n_cores_in_state_process.py index 5778c5698..d557c661c 100644 --- a/spinnman/processes/get_n_cores_in_state_process.py +++ b/spinnman/processes/get_n_cores_in_state_process.py @@ -14,6 +14,7 @@ from spinnman.messages.scp.impl import CountState from .abstract_multi_connection_process import AbstractMultiConnectionProcess +from spinnman.messages.scp.enums.scp_result import SCPResult # Timeout for getting core state count; higher due to more waiting needed GET_CORE_COUNT_TIMEOUT = 2.0 @@ -29,7 +30,8 @@ def __init__(self, connection_selector): :type connection_selector: AbstractMultiConnectionProcessConnectionSelector """ - super().__init__(connection_selector, timeout=GET_CORE_COUNT_TIMEOUT) + super().__init__(connection_selector, timeout=GET_CORE_COUNT_TIMEOUT, + non_fail_retry_codes={SCPResult.RC_P2P_NOREPLY}) self._n_cores = 0 def __handle_response(self, response): diff --git a/spinnman/processes/send_single_command_process.py b/spinnman/processes/send_single_command_process.py index e4a5d8ba9..0d3f09fdf 100644 --- a/spinnman/processes/send_single_command_process.py +++ b/spinnman/processes/send_single_command_process.py @@ -18,6 +18,7 @@ from spinnman.messages.scp.abstract_messages import AbstractSCPRequest from .abstract_multi_connection_process_connection_selector import ( ConnectionSelector) +from spinnman.messages.scp.enums.scp_result import SCPResult #: Type of responses. #: :meta private: R = TypeVar("R", bound=AbstractSCPResponse) @@ -30,7 +31,8 @@ class SendSingleCommandProcess(AbstractMultiConnectionProcess, Generic[R]): __slots__ = ("_response", ) def __init__(self, connection_selector: ConnectionSelector, - n_retries: int = 3, timeout: float = SCP_TIMEOUT): + n_retries: int = 3, timeout: float = SCP_TIMEOUT, + non_fail_retry_codes: Optional[set(SCPResult)] = None): """ :param ConnectionSelector connection_selector: :param int n_retries: @@ -39,9 +41,13 @@ def __init__(self, connection_selector: ConnectionSelector, :param float timeout: The timeout, in seconds. Passed to :py:class:`SCPRequestPipeLine` + :param Optional[set(SCPResult)] non_fail_retry_codes: + Optional set of responses that result in retry but after retrying + don't then result in failure even if returned on the last call. """ super().__init__( - connection_selector, n_retries=n_retries, timeout=timeout) + connection_selector, n_retries=n_retries, timeout=timeout, + non_fail_retry_codes=non_fail_retry_codes) self._response: Optional[R] = None def __handle_response(self, response: R): From f531dd869c63cbd44f60ab2e75156f3ba1f67ea7 Mon Sep 17 00:00:00 2001 From: Andrew Rowley Date: Fri, 10 Nov 2023 12:34:14 +0000 Subject: [PATCH 3/3] Fix types --- spinnman/processes/abstract_multi_connection_process.py | 6 +++--- spinnman/processes/send_single_command_process.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spinnman/processes/abstract_multi_connection_process.py b/spinnman/processes/abstract_multi_connection_process.py index 3700a76fd..6c8715049 100644 --- a/spinnman/processes/abstract_multi_connection_process.py +++ b/spinnman/processes/abstract_multi_connection_process.py @@ -17,7 +17,7 @@ import sys from types import TracebackType from typing import ( - Callable, Dict, Generator, Generic, List, Optional, TypeVar, cast) + Callable, Dict, Generator, Generic, List, Optional, TypeVar, cast, Set) from typing_extensions import Self, TypeAlias from spinn_utilities.log import FormatAdapter from spinnman.connections import SCPRequestPipeLine @@ -62,7 +62,7 @@ class AbstractMultiConnectionProcess(Generic[R]): def __init__(self, next_connection_selector: ConnectionSelector, n_retries: int = N_RETRIES, timeout: float = SCP_TIMEOUT, n_channels: int = 8, intermediate_channel_waits: int = 7, - non_fail_retry_codes: Optional[set(SCPResult)] = None): + non_fail_retry_codes: Optional[Set[SCPResult]] = None): """ :param ConnectionSelector next_connection_selector: How to choose the connection. @@ -77,7 +77,7 @@ def __init__(self, next_connection_selector: ConnectionSelector, :param int intermediate_channel_waits: The maximum number of outstanding message/reply pairs to have on a particular connection. Passed to :py:class:`SCPRequestPipeLine` - :param Optional[set(SCPResult)] non_fail_retry_codes: + :param Optional[Set[SCPResult]] non_fail_retry_codes: Optional set of responses that result in retry but after retrying don't then result in failure even if returned on the last call. """ diff --git a/spinnman/processes/send_single_command_process.py b/spinnman/processes/send_single_command_process.py index 0d3f09fdf..b16c79248 100644 --- a/spinnman/processes/send_single_command_process.py +++ b/spinnman/processes/send_single_command_process.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Generic, Optional, TypeVar +from typing import Generic, Optional, TypeVar, Set from .abstract_multi_connection_process import AbstractMultiConnectionProcess from spinnman.constants import SCP_TIMEOUT from spinnman.messages.scp.abstract_messages import AbstractSCPResponse @@ -32,7 +32,7 @@ class SendSingleCommandProcess(AbstractMultiConnectionProcess, Generic[R]): def __init__(self, connection_selector: ConnectionSelector, n_retries: int = 3, timeout: float = SCP_TIMEOUT, - non_fail_retry_codes: Optional[set(SCPResult)] = None): + non_fail_retry_codes: Optional[Set[SCPResult]] = None): """ :param ConnectionSelector connection_selector: :param int n_retries: @@ -41,7 +41,7 @@ def __init__(self, connection_selector: ConnectionSelector, :param float timeout: The timeout, in seconds. Passed to :py:class:`SCPRequestPipeLine` - :param Optional[set(SCPResult)] non_fail_retry_codes: + :param Optional[Set[SCPResult]] non_fail_retry_codes: Optional set of responses that result in retry but after retrying don't then result in failure even if returned on the last call. """