diff --git a/spynnaker/pyNN/external_devices_models/abstract_multicast_controllable_device.py b/spynnaker/pyNN/external_devices_models/abstract_multicast_controllable_device.py index f156ad8742..8ebbf928c2 100644 --- a/spynnaker/pyNN/external_devices_models/abstract_multicast_controllable_device.py +++ b/spynnaker/pyNN/external_devices_models/abstract_multicast_controllable_device.py @@ -13,6 +13,7 @@ # limitations under the License. from enum import Enum +from typing import Optional from spinn_utilities.abstract_base import AbstractBase, abstractmethod @@ -95,7 +96,7 @@ def device_control_max_value(self) -> float: @property @abstractmethod - def device_control_timesteps_between_sending(self) -> int: + def device_control_timesteps_between_sending(self) -> Optional[int]: """ The number of timesteps between sending commands to the device. This defines the "sampling interval" for the device. @@ -124,10 +125,8 @@ def device_control_scaling_factor(self) -> int: # pragma: no cover return 1 @property - def device_control_first_send_timestep(self) -> int: + def device_control_first_send_timestep(self) -> Optional[int]: """ The first timestep that the device should send in (0 by default). - - :rtype: int """ return 0 diff --git a/spynnaker/pyNN/external_devices_models/push_bot/control/push_bot_lif_spinnaker_link.py b/spynnaker/pyNN/external_devices_models/push_bot/control/push_bot_lif_spinnaker_link.py index fa18ab8e9d..438e4dd7ee 100644 --- a/spynnaker/pyNN/external_devices_models/push_bot/control/push_bot_lif_spinnaker_link.py +++ b/spynnaker/pyNN/external_devices_models/push_bot/control/push_bot_lif_spinnaker_link.py @@ -12,7 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Iterable + from spynnaker.pyNN.external_devices_models import ExternalDeviceLifControl +from spynnaker.pyNN.external_devices_models.push_bot.ethernet import ( + PushBotEthernetDevice) from spynnaker.pyNN.protocols import MunichIoSpiNNakerLinkProtocol from spynnaker.pyNN.models.defaults import default_initial_values @@ -41,12 +45,15 @@ class PushBotLifSpinnakerLink(ExternalDeviceLifControl): @default_initial_values({"v", "isyn_exc", "isyn_inh"}) def __init__( - self, protocol, devices, + self, protocol: MunichIoSpiNNakerLinkProtocol, + devices: Iterable[PushBotEthernetDevice], # default params for the neuron model type - tau_m=20.0, cm=1.0, v_rest=0.0, v_reset=0.0, tau_syn_E=5.0, - tau_syn_I=5.0, tau_refrac=0.1, i_offset=0.0, v=0.0, - isyn_exc=0.0, isyn_inh=0.0): + tau_m: float = 20.0, cm: float = 1.0, v_rest: float = 0.0, + v_reset: float = 0.0, tau_syn_E: float = 5.0, + tau_syn_I: float = 5.0, tau_refrac: float = 0.1, + i_offset: float = 0.0, v: float = 0.0, isyn_exc: float = 0.0, + isyn_inh: float = 0.0): # pylint: disable=too-many-arguments command_protocol = MunichIoSpiNNakerLinkProtocol( diff --git a/spynnaker/pyNN/external_devices_models/push_bot/ethernet/push_bot_device.py b/spynnaker/pyNN/external_devices_models/push_bot/ethernet/push_bot_device.py index 45878d619f..bc30ff902e 100644 --- a/spynnaker/pyNN/external_devices_models/push_bot/ethernet/push_bot_device.py +++ b/spynnaker/pyNN/external_devices_models/push_bot/ethernet/push_bot_device.py @@ -12,13 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional + from spinn_utilities.overrides import overrides from spinn_utilities.abstract_base import AbstractBase, abstractmethod from spynnaker.pyNN.external_devices_models import ( AbstractMulticastControllableDevice, SendType) +from spynnaker.pyNN.external_devices_models.push_bot import ( + AbstractPushBotOutputDevice) from spynnaker.pyNN.protocols import MunichIoSpiNNakerLinkProtocol - # The default timestep to use for first send. Avoids clashes with other # control commands. _DEFAULT_FIRST_SEND_TIMESTEP = 100 @@ -31,9 +34,10 @@ class PushBotEthernetDevice( """ def __init__( - self, protocol: MunichIoSpiNNakerLinkProtocol, device, - uses_payload, time_between_send, - first_send_timestep=_DEFAULT_FIRST_SEND_TIMESTEP): + self, protocol: MunichIoSpiNNakerLinkProtocol, + device: AbstractPushBotOutputDevice, + uses_payload: bool, time_between_send: Optional[int], + first_send_timestep: Optional[int] =_DEFAULT_FIRST_SEND_TIMESTEP): """ :param MunichIoSpiNNakerLinkProtocol protocol: The protocol instance to get commands from @@ -80,7 +84,7 @@ def device_control_max_value(self) -> float: @property @overrides(AbstractMulticastControllableDevice .device_control_timesteps_between_sending) - def device_control_timesteps_between_sending(self) -> int: + def device_control_timesteps_between_sending(self) -> Optional[int]: return self.__time_between_send @property @@ -92,7 +96,7 @@ def device_control_send_type(self) -> SendType: @property @overrides(AbstractMulticastControllableDevice .device_control_first_send_timestep) - def device_control_first_send_timestep(self) -> int: + def device_control_first_send_timestep(self) -> Optional[int]: return self.__first_send_timestep @property @@ -106,7 +110,7 @@ def protocol(self) -> MunichIoSpiNNakerLinkProtocol: @abstractmethod def set_command_protocol( - self, command_protocol: MunichIoSpiNNakerLinkProtocol): + self, command_protocol: MunichIoSpiNNakerLinkProtocol) -> None: """ Set the protocol use to send setup and shutdown commands, separately from the protocol used to control the device. diff --git a/spynnaker/pyNN/external_devices_models/push_bot/ethernet/push_bot_retina_connection.py b/spynnaker/pyNN/external_devices_models/push_bot/ethernet/push_bot_retina_connection.py index 6b2f44c10a..bdf1cdb603 100644 --- a/spynnaker/pyNN/external_devices_models/push_bot/ethernet/push_bot_retina_connection.py +++ b/spynnaker/pyNN/external_devices_models/push_bot/ethernet/push_bot_retina_connection.py @@ -13,13 +13,21 @@ # limitations under the License. from threading import RLock +from typing import Optional + import numpy + from spinnman.connections import ConnectionListener + +from spinn_front_end_common.utilities.connections import LiveEventConnection from spinn_front_end_common.utilities.constants import BYTES_PER_SHORT + from spynnaker.pyNN.connections import SpynnakerLiveSpikesConnection from spynnaker.pyNN.external_devices_models.push_bot.parameters import ( PushBotRetinaResolution) +from .push_bot_wifi_connection import PushBotWIFIConnection + # Each value is a 16-bit 1yyyyyyy.pxxxxxxx _RETINA_PACKET_SIZE = BYTES_PER_SHORT _MIN_PIXEL_VALUE = 32768 @@ -54,9 +62,12 @@ class PushBotRetinaConnection(SpynnakerLiveSpikesConnection): "__ready") def __init__( - self, retina_injector_label, pushbot_wifi_connection, - resolution=PushBotRetinaResolution.NATIVE_128_X_128, - local_host=None, local_port=None): + self, retina_injector_label: str, + pushbot_wifi_connection: PushBotWIFIConnection, + resolution: PushBotRetinaResolution = ( + PushBotRetinaResolution.NATIVE_128_X_128), + local_host: Optional[str] = None, + local_port: Optional[int] = None): """ :param str retina_injector_label: :param PushBotWIFIConnection pushbot_wifi_connection: @@ -88,7 +99,7 @@ def __init__( self.__pushbot_listener.start() self.__lock = RLock() - self.__next_data = None + self.__next_data: Optional[bytearray] = None self.__ready = False self.add_start_resume_callback( @@ -97,16 +108,18 @@ def __init__( retina_injector_label, self.__push_bot_stop) # pylint: disable=unused-argument - def __push_bot_start(self, label, connection): + def __push_bot_start( + self, label: str, connection: LiveEventConnection) -> None: with self.__lock: self.__ready = True # pylint: disable=unused-argument - def __push_bot_stop(self, label, connection): + def __push_bot_stop( + self, label: str, connection: LiveEventConnection) -> None: with self.__lock: self.__ready = False - def _receive_retina_data(self, data): + def _receive_retina_data(self, data: bytearray) -> None: """ Receive retina packets from the PushBot and converts them into neuron spikes within the spike injector system. @@ -134,7 +147,7 @@ def _receive_retina_data(self, data): self.__next_data = data[i:i+1] # Filter out the usable data - data_filtered = numpy.fromstring(data_all, dtype=numpy.uint16) + data_filtered = numpy.frombuffer(data_all, dtype=numpy.uint16) y_values = (data_filtered >> self.__orig_y_shift) & self.__y_mask x_values = (data_filtered >> self.__orig_x_shift) & self.__x_mask polarity = (data_filtered >> _P_SHIFT) & _P_MASK