Skip to content

Commit

Permalink
Merge pull request #380 from SpiNNakerManchester/allow_count_p2p_timeout
Browse files Browse the repository at this point in the history
Allow count p2p timeout
  • Loading branch information
rowleya authored Nov 13, 2023
2 parents d323ebd + f531dd8 commit f1be305
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 29 deletions.
54 changes: 32 additions & 22 deletions spinnman/connections/scp_request_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class SCPRequestPipeLine(Generic[R]):
"_n_retries",
"_n_retry_code_resent",
"_n_timeouts",
"_non_fail_retry_codes",
"_packet_timeout",
"_retry_reason",
"_request_data",
Expand All @@ -78,7 +79,8 @@ class SCPRequestPipeLine(Generic[R]):

def __init__(self, connection: SCAMPConnection, n_channels=1,
intermediate_channel_waits=0,
n_retries=N_RETRIES, packet_timeout=SCP_TIMEOUT):
n_retries=N_RETRIES, packet_timeout=SCP_TIMEOUT,
non_fail_retry_codes=None):
"""
:param SCAMPConnection connection:
The connection over which the communication is to take place
Expand All @@ -91,6 +93,9 @@ def __init__(self, connection: SCAMPConnection, n_channels=1,
reason before an error is triggered
:param float packet_timeout: The number of elapsed seconds after
sending a packet before it is considered a timeout.
:param non_fail_retry_codes: Codes that could retry but won't fail, or
None if there are no such codes
:type non_fail_retry_codes: set(SCPResult) or None
"""
self._connection = connection
self._n_channels = n_channels
Expand Down Expand Up @@ -129,6 +134,9 @@ def __init__(self, connection: SCAMPConnection, n_channels=1,
# The number of packets that have been resent
self._n_resent = 0
self._n_retry_code_resent = 0
self._non_fail_retry_codes = non_fail_retry_codes
if self._non_fail_retry_codes is None:
self._non_fail_retry_codes = set()

# self._token_bucket = TokenBucket(43750, 4375000)
# self._token_bucket = TokenBucket(3408, 700000)
Expand Down Expand Up @@ -268,29 +276,31 @@ def _single_retrieve(self, timeout: float):
time.sleep(0.1)
self._resend(seq, request_sent, str(result))
self._n_retry_code_resent += 1
return
except Exception as e: # pylint: disable=broad-except
self._error_callbacks[seq](
request_sent, e,
cast(TracebackType, sys.exc_info()[2]),
self._connection)
self._remove_record(seq)
else:

# No retry is possible - try constructing the result
try:
response = request_sent.get_scp_response()
response.read_bytestring(raw_data, offset)
cb = self._callbacks[seq]
if cb is not None:
cb(response)
except Exception as e: # pylint: disable=broad-except
self._error_callbacks[seq](
request_sent, e,
cast(TracebackType, sys.exc_info()[2]),
self._connection)
if result not in self._non_fail_retry_codes:
self._error_callbacks[seq](
request_sent, e,
cast(TracebackType, sys.exc_info()[2]),
self._connection)
self._remove_record(seq)
return

# No retry is possible and not failed - try constructing the result
try:
response = request_sent.get_scp_response()
response.read_bytestring(raw_data, offset)
cb = self._callbacks[seq]
if cb is not None:
cb(response)
except Exception as e: # pylint: disable=broad-except
self._error_callbacks[seq](
request_sent, e,
cast(TracebackType, sys.exc_info()[2]),
self._connection)

# Remove the sequence from the outstanding responses
self._remove_record(seq)
# Remove the sequence from the outstanding responses
self._remove_record(seq)

def _handle_receive_timeout(self) -> None:
self._n_timeouts += 1
Expand Down
14 changes: 11 additions & 3 deletions spinnman/processes/abstract_multi_connection_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import sys
from types import TracebackType
from typing import (
Callable, Dict, Generator, Generic, List, Optional, TypeVar, cast)
Callable, Dict, Generator, Generic, List, Optional, TypeVar, cast, Set)
from typing_extensions import Self, TypeAlias
from spinn_utilities.log import FormatAdapter
from spinnman.connections import SCPRequestPipeLine
Expand All @@ -29,6 +29,7 @@
from spinnman.connections.udp_packet_connections import SCAMPConnection
from spinnman.messages.scp.abstract_messages import (
AbstractSCPRequest, AbstractSCPResponse)
from spinnman.messages.scp.enums.scp_result import SCPResult
#: Type of responses.
#: :meta private:
R = TypeVar("R", bound=AbstractSCPResponse)
Expand All @@ -53,13 +54,15 @@ class AbstractMultiConnectionProcess(Generic[R]):
"_intermediate_channel_waits",
"_n_channels",
"_n_retries",
"_non_fail_retry_codes",
"_conn_selector",
"_scp_request_pipelines",
"_timeout")

def __init__(self, next_connection_selector: ConnectionSelector,
n_retries: int = N_RETRIES, timeout: float = SCP_TIMEOUT,
n_channels: int = 8, intermediate_channel_waits: int = 7):
n_channels: int = 8, intermediate_channel_waits: int = 7,
non_fail_retry_codes: Optional[Set[SCPResult]] = None):
"""
:param ConnectionSelector next_connection_selector:
How to choose the connection.
Expand All @@ -74,6 +77,9 @@ def __init__(self, next_connection_selector: ConnectionSelector,
:param int intermediate_channel_waits:
The maximum number of outstanding message/reply pairs to have on a
particular connection. Passed to :py:class:`SCPRequestPipeLine`
:param Optional[Set[SCPResult]] non_fail_retry_codes:
Optional set of responses that result in retry but after retrying
don't then result in failure even if returned on the last call.
"""
self._exceptions: List[Exception] = []
self._tracebacks: List[TracebackType] = []
Expand All @@ -86,6 +92,7 @@ def __init__(self, next_connection_selector: ConnectionSelector,
self._n_channels = n_channels
self._intermediate_channel_waits = intermediate_channel_waits
self._conn_selector = next_connection_selector
self._non_fail_retry_codes = non_fail_retry_codes

def _send_request(self, request: AbstractSCPRequest[R],
callback: Optional[Callable[[R], None]] = None,
Expand All @@ -98,7 +105,8 @@ def _send_request(self, request: AbstractSCPRequest[R],
connection, n_retries=self._n_retries,
packet_timeout=self._timeout,
n_channels=self._n_channels,
intermediate_channel_waits=self._intermediate_channel_waits)
intermediate_channel_waits=self._intermediate_channel_waits,
non_fail_retry_codes=self._non_fail_retry_codes)
self._scp_request_pipelines[connection].send_request(
request, callback, error_callback)

Expand Down
4 changes: 3 additions & 1 deletion spinnman/processes/get_n_cores_in_state_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from spinnman.messages.scp.impl import CountState
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
from spinnman.messages.scp.enums.scp_result import SCPResult

# Timeout for getting core state count; higher due to more waiting needed
GET_CORE_COUNT_TIMEOUT = 2.0
Expand All @@ -29,7 +30,8 @@ def __init__(self, connection_selector):
:type connection_selector:
AbstractMultiConnectionProcessConnectionSelector
"""
super().__init__(connection_selector, timeout=GET_CORE_COUNT_TIMEOUT)
super().__init__(connection_selector, timeout=GET_CORE_COUNT_TIMEOUT,
non_fail_retry_codes={SCPResult.RC_P2P_NOREPLY})
self._n_cores = 0

def __handle_response(self, response):
Expand Down
12 changes: 9 additions & 3 deletions spinnman/processes/send_single_command_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Generic, Optional, TypeVar
from typing import Generic, Optional, TypeVar, Set
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
from spinnman.constants import SCP_TIMEOUT
from spinnman.messages.scp.abstract_messages import AbstractSCPResponse
from spinnman.messages.scp.abstract_messages import AbstractSCPRequest
from .abstract_multi_connection_process_connection_selector import (
ConnectionSelector)
from spinnman.messages.scp.enums.scp_result import SCPResult
#: Type of responses.
#: :meta private:
R = TypeVar("R", bound=AbstractSCPResponse)
Expand All @@ -30,7 +31,8 @@ class SendSingleCommandProcess(AbstractMultiConnectionProcess, Generic[R]):
__slots__ = ("_response", )

def __init__(self, connection_selector: ConnectionSelector,
n_retries: int = 3, timeout: float = SCP_TIMEOUT):
n_retries: int = 3, timeout: float = SCP_TIMEOUT,
non_fail_retry_codes: Optional[Set[SCPResult]] = None):
"""
:param ConnectionSelector connection_selector:
:param int n_retries:
Expand All @@ -39,9 +41,13 @@ def __init__(self, connection_selector: ConnectionSelector,
:param float timeout:
The timeout, in seconds. Passed to
:py:class:`SCPRequestPipeLine`
:param Optional[Set[SCPResult]] non_fail_retry_codes:
Optional set of responses that result in retry but after retrying
don't then result in failure even if returned on the last call.
"""
super().__init__(
connection_selector, n_retries=n_retries, timeout=timeout)
connection_selector, n_retries=n_retries, timeout=timeout,
non_fail_retry_codes=non_fail_retry_codes)
self._response: Optional[R] = None

def __handle_response(self, response: R):
Expand Down

0 comments on commit f1be305

Please sign in to comment.