diff --git a/pslab/bus/__init__.py b/pslab/bus/__init__.py index 5c3d5ee1..453711a6 100644 --- a/pslab/bus/__init__.py +++ b/pslab/bus/__init__.py @@ -1,10 +1,14 @@ """Contains modules for interfacing with the PSLab's I2C, SPI, and UART buses. -(SPI and UART still TODO) +(UART still TODO) """ + from pslab.bus.i2c import I2CMaster, I2CSlave +from pslab.bus.spi import SPIMaster, SPISlave __all__ = ( "I2CMaster", "I2CSlave", + "SPIMaster", + "SPISlave", ) diff --git a/pslab/bus/busio.py b/pslab/bus/busio.py index a7f7ce26..618e4fd9 100644 --- a/pslab/bus/busio.py +++ b/pslab/bus/busio.py @@ -27,9 +27,11 @@ >>> sensor = adafruit_bno055.BNO055_I2C(i2c) >>> print(sensor.gyro) """ + from typing import List, Union -from pslab.bus.i2c import I2CPrimitive +from pslab.bus.i2c import _I2CPrimitive +from pslab.bus.spi import _SPIPrimitive from pslab.serial_handler import SerialHandler __all__ = "I2C" @@ -37,7 +39,7 @@ WriteableBuffer = Union[bytearray, memoryview] -class I2C(I2CPrimitive): +class I2C(_I2CPrimitive): """Busio I2C Class for CircuitPython Compatibility. Parameters @@ -177,3 +179,197 @@ def writeto_then_readfrom( self._restart(address, 1) buffer_in[in_start:in_end] = self._read(bytes_to_read) self._stop() + + +class SPI(_SPIPrimitive): + """Busio SPI Class for CircuitPython Compatibility. + + Parameters + ---------- + device : :class:`SerialHandler`, optional + Serial connection to PSLab device. If not provided, a new one will be + created. + """ + + def __init__(self, device: SerialHandler = None, frequency: int = 125e3): + super().__init__(device) + ppre, spre = self._get_prescaler(25e4) + self._set_parameters(ppre, spre, 1, 0, 1) + self._bits = 8 + + @property + def frequency(self) -> int: + """Get the actual SPI bus frequency (rounded). + + This may not match the frequency requested due to internal limitations. + """ + return round(self._frequency) + + def deinit(self) -> None: + """Just a dummy method.""" + pass + + def __enter__(self): + """Just a dummy context manager.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + """Call :meth:`deinit` on context exit.""" + self.deinit() + + def configure( + self, + *, + baudrate: int = 100000, + polarity: int = 0, + phase: int = 0, + bits: int = 8, + ) -> None: + """Configure the SPI bus. + + Parameters + ---------- + baudrate : int + The desired clock rate in Hertz. The actual clock rate may be + higher or lower due to the granularity of available clock settings. + Check the frequency attribute for the actual clock rate. + polarity : int + The base state of the clock line (0 or 1) + phase : int + The edge of the clock that data is captured. First (0) or second (1). + Rising or falling depends on clock polarity. + bits : int + The number of bits per word. + """ + if polarity not in (0, 1): + raise ValueError("Invalid polarity") + if phase not in (0, 1): + raise ValueError("Invalid phase") + if bits not in self._INTEGER_TYPE_MAP: + raise ValueError("Invalid number of bits") + + ppre, spre = self._get_prescaler(baudrate) + cke = (phase ^ 1) & 1 + self._set_parameters(ppre, spre, cke, polarity, 1) + self._bits = bits + + def try_lock(self) -> bool: # pylint: disable=no-self-use + """Just a dummy method.""" + return True + + def unlock(self) -> None: + """Just a dummy method.""" + pass + + def write( + self, + buffer: Union[ReadableBuffer, List[int]], + *, + start: int = 0, + end: int = None, + ) -> None: + """Write the data contained in buffer. If the buffer is empty, nothing happens. + + Parameters + ---------- + buffer : bytes or bytearray or memoryview or list_of_int (for bits >8) + Write out the data in this buffer. + start : int + Start of the slice of `buffer` to write out: `buffer[start:end]`. + end : int + End of the slice; this index is not included. Defaults to `len(buffer)`. + """ + end = len(buffer) if end is None else end + buffer = buffer[start:end] + + if not buffer: + return + + self._start() + self._write_bulk(buffer, self._bits) + self._stop() + + def readinto( + self, + buffer: Union[WriteableBuffer, List[int]], + *, + start: int = 0, + end: int = None, + write_value: int = 0, + ) -> None: + """Read into `buffer` while writing `write_value` for each byte read. + + If the number of bytes to read is 0, nothing happens. + + Parameters + ---------- + buffer : bytearray or memoryview or list_of_int (for bits >8) + Read data into this buffer. + start : int + Start of the slice of `buffer` to read into: `buffer[start:end]`. + end : int + End of the slice; this index is not included. Defaults to `len(buffer)`. + write_value : int + Value to write while reading. (Usually ignored.) + """ + end = len(buffer) if end is None else end + bytes_to_read = end - start + + if bytes_to_read == 0: + return + + self._start() + data = self._transfer_bulk([write_value] * bytes_to_read, self._bits) + self._stop() + + for i, v in zip(range(start, end), data): + buffer[i] = v + + def write_readinto( + self, + buffer_out: Union[ReadableBuffer, List[int]], + buffer_in: Union[WriteableBuffer, List[int]], + *, + out_start: int = 0, + out_end: int = None, + in_start: int = 0, + in_end: int = None, + ): + """Write out the data in buffer_out while simultaneously read into buffer_in. + + The lengths of the slices defined by buffer_out[out_start:out_end] and + buffer_in[in_start:in_end] must be equal. If buffer slice lengths are both 0, + nothing happens. + + Parameters + ---------- + buffer_out : bytes or bytearray or memoryview or list_of_int (for bits >8) + Write out the data in this buffer. + buffer_in : bytearray or memoryview or list_of_int (for bits >8) + Read data into this buffer. + out_start : int + Start of the slice of `buffer_out` to write out: + `buffer_out[out_start:out_end]`. + out_end : int + End of the slice; this index is not included. Defaults to `len(buffer_out)` + in_start : int + Start of the slice of `buffer_in` to read into:`buffer_in[in_start:in_end]` + in_end : int + End of the slice; this index is not included. Defaults to `len(buffer_in)` + """ + out_end = len(buffer_out) if out_end is None else out_end + in_end = len(buffer_in) if in_end is None else in_end + buffer_out = buffer_out[out_start:out_end] + bytes_to_read = in_end - in_start + + if len(buffer_out) != bytes_to_read: + raise ValueError("buffer slices must be of equal length") + if bytes_to_read == 0: + return + + self._start() + data = self._transfer_bulk(buffer_out, self._bits) + self._stop() + + for i, v in zip(range(in_start, in_end), data): + buffer_in[i] = v diff --git a/pslab/bus/i2c.py b/pslab/bus/i2c.py index ece0efd0..a86299bb 100644 --- a/pslab/bus/i2c.py +++ b/pslab/bus/i2c.py @@ -17,6 +17,7 @@ >>> rtc = I2CSlave(address=104) """ + import logging from typing import List @@ -31,7 +32,7 @@ logger = logging.getLogger(__name__) -class I2CPrimitive: +class _I2CPrimitive: """I2C primitive commands. Handles all the I2C subcommands coded in pslab-firmware. @@ -385,6 +386,7 @@ def _read(self, bytes_to_read: int) -> bytearray: for _ in range(bytes_to_read - 1): data.append(self._read_more()) + data.append(self._read_end()) return data @@ -427,7 +429,7 @@ def _read_bulk( return bytearray(data) -class I2CMaster(I2CPrimitive): +class I2CMaster(_I2CPrimitive): """I2C bus controller. Handles slave independent functionality with the I2C port. @@ -478,7 +480,7 @@ def scan(self) -> List[int]: return addrs -class I2CSlave(I2CPrimitive): +class I2CSlave(_I2CPrimitive): """I2C slave device. Parameters @@ -513,6 +515,7 @@ def ping(self) -> bool: """ response = self._start(self.address, self._READ) self._stop() + return response == self._ACK def read(self, bytes_to_read: int, register_address: int = 0x0) -> bytearray: @@ -572,6 +575,7 @@ def read_int(self, register_address: int = 0x0) -> int: Two bytes interpreted as a uint16. """ data = self.read(2, register_address) + return CP.ShortInt.unpack(data)[0] def read_long(self, register_address: int = 0x0) -> int: @@ -589,6 +593,7 @@ def read_long(self, register_address: int = 0x0) -> int: Four bytes interpreted as a uint32. """ data = self.read(4, register_address) + return CP.Integer.unpack(data)[0] def write(self, bytes_to_write: bytearray, register_address: int = 0x0): diff --git a/pslab/bus/spi.py b/pslab/bus/spi.py new file mode 100644 index 00000000..05a5f984 --- /dev/null +++ b/pslab/bus/spi.py @@ -0,0 +1,695 @@ +"""Control the PSLab's SPI bus and devices connected on the bus. + +Examples +-------- +Set SPI bus speed to 200 kbit/s: + +>>> from pslab.bus.spi import SPIMaster, SPISlave +>>> bus = SPIMaster() +>>> bus.set_parameters(primary_prescaler=0, secondary_prescaler=3) # 64e6/(64*5) + +Set SPI bus to mode 3 (1,1): + +>>> bus.set_parameters(0, 3, CKE=0, CKP=1) + +Transfer a random byte over SPI: + +>>> slave = SPISlave() +>>> slave.transfer8(0x55) +0 +""" + +import sys +from typing import List, Tuple + +import pslab.protocol as CP +from pslab.serial_handler import SerialHandler + +__all__ = ( + "SPIMaster", + "SPISlave", +) +# Default values, refer pslab-firmware. +_PPRE = 0 +_SPRE = 2 +# SPI mode 0 (0,0) +_CKP = 0 # Clock Polarity 0 +_CKE = 1 # Clock Phase 0 | Clock Edge 1 +_SMP = 1 + + +class classmethod_(classmethod): + """Support chaining classmethod and property.""" + + def __init__(self, f): + self.f = f + super().__init__(f) + + def __get__(self, obj, cls=None): + # classmethod() to support chained decorators; new in python 3.9. + if sys.version_info < (3, 9) and isinstance(self.f, property): + return self.f.__get__(cls) + else: + return super().__get__(obj, cls) + + +class _SPIPrimitive: + """SPI primitive commands. + + Handles all the SPI subcommands coded in pslab-firmware. + + Parameters + ---------- + device : :class:`SerialHandler`, optional + Serial connection to PSLab device. If not provided, a new one will be + created. + """ + + _TRANSFER_COMMANDS_MAP = { + 8: CP.SEND_SPI8, + 16: CP.SEND_SPI16, + } # PSLab only supports 8 and 16 bits. + _INTEGER_TYPE_MAP = { + 8: CP.Byte, + 16: CP.ShortInt, + } # Keys in `_INTEGER_TYPE_MAP` should match `_TRANSFER_COMMANDS_MAP`. + _PPRE_MAP = [64, 16, 4, 1] + _SPRE_MAP = [8, 7, 6, 5, 4, 3, 2, 1] + + _primary_prescaler = _PPRE + _secondary_prescaler = _SPRE + _clock_polarity = _CKP # Clock Polarity bit. + _clock_edge = _CKE # Clock Edge Select bit (inverse of Clock Phase bit). + _smp = _SMP # Data Input Sample Phase bit. + + def __init__(self, device: SerialHandler = None): + self._device = device if device is not None else SerialHandler() + + @classmethod_ + @property + def _frequency(cls) -> float: + ppre = cls._PPRE_MAP[cls._primary_prescaler] + spre = cls._SPRE_MAP[cls._secondary_prescaler] + + return CP.CLOCK_RATE / (ppre * spre) + + @classmethod_ + @property + def _clock_phase(cls) -> int: + return (cls._clock_edge ^ 1) & 1 + + @classmethod + def _get_prescaler(cls, frequency: float) -> Tuple[int]: + min_diff = CP.CLOCK_RATE # highest + # minimum frequency + ppre = 0 + spre = 0 + + for p in range(len(cls._PPRE_MAP)): + for s in range(len(cls._SPRE_MAP)): + + freq = CP.CLOCK_RATE / (cls._PPRE_MAP[p] * cls._SPRE_MAP[s]) + if frequency >= freq: + + diff = frequency - freq + if min_diff > diff: + # better match + min_diff = diff + ppre = p + spre = s + + return ppre, spre + + @staticmethod + def _save_config( + primary_prescaler: int, + secondary_prescaler: int, + CKE: int, + CKP: int, + SMP: int, + ): + """Save the SPI parameters. + + See Also + -------- + _set_parameters : To set SPI parameters. + """ + _SPIPrimitive._primary_prescaler = primary_prescaler + _SPIPrimitive._secondary_prescaler = secondary_prescaler + _SPIPrimitive._clock_edge = CKE + _SPIPrimitive._clock_polarity = CKP + _SPIPrimitive._smp = SMP + + def _set_parameters( + self, + primary_prescaler: int, + secondary_prescaler: int, + CKE: int, + CKP: int, + SMP: int, + ): + """Set SPI parameters. + + It is a primitive SPI method, prefered to use :meth:`SPIMaster.set_parameters`. + + Parameters + ---------- + primary_prescaler : {0, 1, 2, 3} + Primary Prescaler for system clock :const:`CP.CLOCK_RATE`Hz. + (0,1,2,3) -> (64:1,16:1,4:1,1:1). + secondary_prescaler : {0, 1, 2, 3, 4, 5, 6, 7} + Secondary prescaler (0,1,..7) -> (8:1,7:1,..1:1). + CKE : {0, 1} + SPIx Clock Edge Select bit. Serial output data changes on transition + {0: from Idle clock state to active clock state, + 1: from active clock state to Idle clock state}. + CKP : {0, 1} + Clock Polarity Select bit. + Idle state for clock is a {0: low, 1: high} level. + SMP : {0, 1} + Input data is sampled at the {0: end, 1: middle} of data output time. + + Raises + ------ + ValueError + If any one of arguments is not in its shown range. + """ + error_message = [] + if primary_prescaler not in range(0, 4): + error_message.append("Primary Prescaler must be in 2-bits.") + if secondary_prescaler not in range(0, 8): + error_message.append("Secondary Prescale must be in 3-bits.") + if CKE not in (0, 1): + error_message.append("Clock Edge Select must be a bit.") + if CKP not in (0, 1): + error_message.append("Clock Polarity must be a bit.") + if SMP not in (0, 1): + error_message.append("SMP must be a bit.") + if error_message: + raise ValueError("\n".join(error_message)) + + self._device.send_byte(CP.SPI_HEADER) + self._device.send_byte(CP.SET_SPI_PARAMETERS) + # 0Bhgfedcba - > : modebit CKP,: modebit CKE, :primary prescaler, + # :secondary prescaler + self._device.send_byte( + secondary_prescaler + | (primary_prescaler << 3) + | (CKE << 5) + | (CKP << 6) + | (SMP << 7) + ) + self._device.get_ack() + self._save_config(primary_prescaler, secondary_prescaler, CKE, CKP, SMP) + + @classmethod + def _get_parameters(cls) -> Tuple[int]: + """Get SPI parameters. + + Returns + ------- + primary_prescaler : {0, 1, 2, 3} + Primary Prescaler for system clock :const:`CP.CLOCK_RATE`Hz. + (0,1,2,3) -> (64:1,16:1,4:1,1:1). + secondary_prescaler : {0, 1, 2, 3, 4, 5, 6, 7} + Secondary prescaler (0,1,..7) -> (8:1,7:1,..1:1). + CKE : {0, 1} + SPIx Clock Edge Select bit. Serial output data changes on transition + {0: from Idle clock state to active clock state, + 1: from active clock state to Idle clock state}. + CKP : {0, 1} + Clock Polarity Select bit. + Idle state for clock is a {0: low, 1: high} level. + SMP : {0, 1} + Input data is sampled at the {0: end, 1: middle} of data output time. + """ + return ( + cls._primary_prescaler, + cls._secondary_prescaler, + cls._clock_edge, + cls._clock_polarity, + cls._smp, + ) + + def _start(self): + """Select SPI channel to enable. + + Basically sets the relevant chip select pin to LOW. + + External ChipSelect pins: + version < 5 : {6, 7} # RC5, RC4 (dropped support) + version == 5 : {} (don't have any external CS pins) + version == 6 : {7} # RC4 + """ + self._device.send_byte(CP.SPI_HEADER) + self._device.send_byte(CP.START_SPI) + self._device.send_byte(7) # SPI.CS v6 + # No ACK because `RESPONSE == DO_NOT_BOTHER` in firmware. + + def _stop(self): + """Select SPI channel to disable. + + Sets the relevant chip select pin to HIGH. + """ + self._device.send_byte(CP.SPI_HEADER) + self._device.send_byte(CP.STOP_SPI) + self._device.send_byte(7) # SPI.CS v6 + + def _transfer(self, data: int, bits: int) -> int: + """Send data over SPI and receive data from SPI simultaneously. + + Relevent SPI channel need to be enabled first. + + It is a primitive SPI method, prefered to use :meth:`SPISlave.transfer8` + and :meth:`SPISlave.transfer16`. + + Parameters + ---------- + data : int + Data to transmit. + bits : int + The number of bits per word. + + Returns + ------- + data_in : int + Data returned by slave device. + + Raises + ------ + ValueError + If given bits per word not supported by PSLab board. + """ + command = self._TRANSFER_COMMANDS_MAP.get(bits) + interger_type = self._INTEGER_TYPE_MAP.get(bits) + + if not command: + raise ValueError( + f"PSLab only supports {set(self._TRANSFER_COMMANDS_MAP.keys())}" + + " bits per word." + ) + + self._device.send_byte(CP.SPI_HEADER) + self._device.send_byte(command) + self._device.write(interger_type.pack(data)) + data_in = interger_type.unpack(self._device.read(bits))[0] + self._device.get_ack() + + return data_in + + def _transfer_bulk(self, data: List[int], bits: int) -> List[int]: + """Transfer data array over SPI. + + Relevent SPI channel need to be enabled first. + + It is a primitive SPI method, prefered to use :meth:`SPISlave.transfer8_bulk` + and :meth:`SPISlave.transfer16_bulk`. + + Parameters + ---------- + data : list of int + List of data to transmit. + bits : int + The number of bits per word. + + Returns + ------- + data_in : list of int + List of data returned by slave device. + + Raises + ------ + ValueError + If given bits per word not supported by PSLab board. + """ + data_in = [] + + for a in data: + data_in.append(self._transfer(a, bits)) + + return data_in + + def _read(self, bits: int) -> int: + """Read data while transmit zero. + + Relevent SPI channel need to be enabled first. + + It is a primitive SPI method, prefered to use :meth:`SPISlave.read8` + and :meth:`SPISlave.read16`. + + Parameters + ---------- + bits : int + The number of bits per word. + + Returns + ------- + int + Data returned by slave device. + + Raises + ------ + ValueError + If given bits per word not supported by PSLab board. + """ + return self._transfer(0, bits) + + def _read_bulk(self, data_to_read: int, bits: int) -> List[int]: + """Read data array while transmitting zeros. + + Relevent SPI channel need to be enabled first. + + It is a primitive SPI method, prefered to use :meth:`SPISlave.read8_bulk` + and :meth:`SPISlave.read16_bulk`. + + Parameters + ---------- + data_to_read : int + Number of data to read from slave device. + bits : int + The number of bits per word. + + Returns + ------- + list of int + List of data returned by slave device. + + Raises + ------ + ValueError + If given bits per word not supported by PSLab board. + """ + return self._transfer_bulk([0] * data_to_read, bits) + + def _write(self, data: int, bits: int): + """Send data over SPI. + + Relevent SPI channel need to be enabled first. + + It is a primitive SPI method, prefered to use :meth:`SPISlave.write8` + and :meth:`SPISlave.write16`. + + Parameters + ---------- + data : int + Data to transmit. + bits : int + The number of bits per word. + + Raises + ------ + ValueError + If given bits per word not supported by PSLab board. + """ + self._transfer(data, bits) + + def _write_bulk(self, data: List[int], bits: int): + """Send data array over SPI. + + Relevent SPI channel need to be enabled first. + + It is a primitive SPI method, prefered to use :meth:`SPISlave.write8_bulk` + and :meth:`SPISlave.write16_bulk`. + + Parameters + ---------- + data : list of int + List of data to transmit. + bits : int + The number of bits per word. + + Raises + ------ + ValueError + If given bits per word not supported by PSLab board. + """ + self._transfer_bulk(data, bits) + + +class SPIMaster(_SPIPrimitive): + """SPI bus controller. + + Parameters + ---------- + device : :class:`SerialHandler`, optional + Serial connection to PSLab device. If not provided, a new one will be + created. + """ + + def __init__(self, device: SerialHandler = None): + super().__init__(device) + # Reset config + self.set_parameters() + + def set_parameters( + self, + primary_prescaler: int = _PPRE, + secondary_prescaler: int = _SPRE, + CKE: int = _CKE, + CKP: int = _CKP, + SMP: int = _SMP, + ): + """Set SPI parameters. + + Parameters + ---------- + primary_prescaler : {0, 1, 2, 3} + Primary Prescaler for system clock :const:`CP.CLOCK_RATE`Hz. + (0,1,2,3) -> (64:1,16:1,4:1,1:1). + secondary_prescaler : {0, 1, 2, 3, 4, 5, 6, 7} + Secondary prescaler (0,1,..7) -> (8:1,7:1,..1:1). + CKE : {0, 1} + SPIx Clock Edge Select bit. Serial output data changes on transition + {0: from Idle clock state to active clock state, + 1: from active clock state to Idle clock state}. + CKP : {0, 1} + Clock Polarity Select bit. + Idle state for clock is a {0: low, 1: high} level. + SMP : {0, 1} + Input data is sampled at the {0: end, 1: middle} of data output time. + + Raises + ------ + ValueError + If any one of arguments is not in its shown range. + """ + self._set_parameters(primary_prescaler, secondary_prescaler, CKE, CKP, SMP) + + @classmethod + def get_parameters(cls) -> Tuple[int]: + """Get SPI parameters. + + Returns + ------- + primary_prescaler : {0, 1, 2, 3} + Primary Prescaler for system clock :const:`CP.CLOCK_RATE`Hz. + (0,1,2,3) -> (64:1,16:1,4:1,1:1). + secondary_prescaler : {0, 1, 2, 3, 4, 5, 6, 7} + Secondary prescaler (0,1,..7) -> (8:1,7:1,..1:1). + CKE : {0, 1} + SPIx Clock Edge Select bit. Serial output data changes on transition + {0: from Idle clock state to active clock state, + 1: from active clock state to Idle clock state}. + CKP : {0, 1} + Clock Polarity Select bit. + Idle state for clock is a {0: low, 1: high} level. + SMP : {0, 1} + Input data is sampled at the {0: end, 1: middle} of data output time. + """ + return cls._get_parameters() + + +class SPISlave(_SPIPrimitive): + """SPI slave device. + + Parameters + ---------- + device : :class:`SerialHandler`, optional + Serial connection to PSLab device. If not provided, a new one will be + created. + """ + + def __init__(self, device: SerialHandler = None): + super().__init__(device) + + def transfer8(self, data: int) -> int: + """Send 8-bit data over SPI and receive 8-bit data from SPI simultaneously. + + Parameters + ---------- + data : int + Data to transmit. + + Returns + ------- + data_in : int + Data returned by slave device. + """ + self._start() + data_in = self._transfer(data, 8) + self._stop() + + return data_in + + def transfer16(self, data: int) -> int: + """Send 16-bit data over SPI and receive 16-bit data from SPI simultaneously. + + Parameters + ---------- + data : int + Data to transmit. + + Returns + ------- + data_in : int + Data returned by slave device. + """ + self._start() + data_in = self._transfer(data, 16) + self._stop() + + return data_in + + def transfer8_bulk(self, data: List[int]) -> List[int]: + """Transfer 8-bit data array over SPI. + + Parameters + ---------- + data : list of int + List of 8-bit data to transmit. + + Returns + ------- + data_in : list of int + List of 8-bit data returned by slave device. + """ + self._start() + data_in = self._transfer_bulk(data, 8) + self._stop() + + return data_in + + def transfer16_bulk(self, data: List[int]) -> List[int]: + """Transfer 16-bit data array over SPI. + + Parameters + ---------- + data : list of int + List of 16-bit data to transmit. + + Returns + ------- + data_in : list of int + List of 16-bit data returned by slave device. + """ + self._start() + data_in = self._transfer_bulk(data, 16) + self._stop() + + return data_in + + def read8(self) -> int: + """Read 8-bit data while transmit zero. + + Returns + ------- + int + Data returned by slave device. + """ + self._start() + data_in = self._read(8) + self._stop() + + return data_in + + def read16(self) -> int: + """Read 16-bit data while transmit zero. + + Returns + ------- + int + Data returned by slave device. + """ + self._start() + data_in = self._read(16) + self._stop() + + return data_in + + def read8_bulk(self, data_to_read: int) -> List[int]: + """Read 8-bit data array while transmitting zeros. + + Parameters + ---------- + data_to_read : int + Number of 8-bit data to read from slave device. + + Returns + ------- + list of int + List of 8-bit data returned by slave device. + """ + self._start() + data_in = self._read_bulk(data_to_read, 8) + self._stop() + + return data_in + + def read16_bulk(self, data_to_read: int) -> List[int]: + """Read 16-bit data array while transmitting zeros. + + Parameters + ---------- + data_to_read : int + Number of 16-bit data to read from slave device. + + Returns + ------- + list of int + List of 16-bit date returned by slave device. + """ + self._start() + data_in = self._read_bulk(data_to_read, 16) + self._stop() + + return data_in + + def write8(self, data: int): + """Send 8-bit data over SPI. + + Parameters + ---------- + data : int + Data to transmit. + """ + self.transfer8(data) + + def write16(self, data: int): + """Send 16-bit data over SPI. + + Parameters + ---------- + data : int + Data to transmit. + """ + self.transfer16(data) + + def write8_bulk(self, data: List[int]): + """Send 8-bit data array over SPI. + + Parameters + ---------- + data : list of int + List of 8-bit data to transmit. + """ + self.transfer8_bulk(data) + + def write16_bulk(self, data: List[int]): + """Send 16-bit data array over SPI. + + Parameters + ---------- + data : list of int + List of 16-bit data to transmit. + """ + self.transfer16_bulk(data) diff --git a/pslab/instrument/oscilloscope.py b/pslab/instrument/oscilloscope.py index c6a6821b..32b34ef2 100644 --- a/pslab/instrument/oscilloscope.py +++ b/pslab/instrument/oscilloscope.py @@ -13,6 +13,7 @@ import numpy as np import pslab.protocol as CP +from pslab.bus.spi import SPIMaster from pslab.instrument.analog import ANALOG_CHANNELS, AnalogInput, GAIN_VALUES from pslab.serial_handler import ADCBufferMixin, SerialHandler @@ -372,6 +373,12 @@ def select_range(self, channel: str, voltage_range: Union[int, float]): self._set_gain(channel, gain) def _set_gain(self, channel: str, gain: int): + spi_config_supported = self._check_spi_config() + + if not spi_config_supported: + spi_parameters = SPIMaster.get_parameters() + spi = SPIMaster(self._device) # Initializing SPIMaster will reset config. + self._channels[channel].gain = gain pga = self._channels[channel].programmable_gain_amplifier gain_idx = GAIN_VALUES.index(gain) @@ -380,3 +387,23 @@ def _set_gain(self, channel: str, gain: int): self._device.send_byte(pga) self._device.send_byte(gain_idx) self._device.get_ack() + + if not spi_config_supported: + spi.set_parameters(*spi_parameters) + + @staticmethod + def _check_spi_config() -> bool: + """Check whether current SPI config is supported by PGA. + + Returns + ------- + bool + Returns True if SPI config is supported by PGA, otherwise False. + """ + # Check the SPI mode. PGA only supports mode 0 and mode 3. + if (SPIMaster._clock_polarity, SPIMaster._clock_phase) not in [(0, 0), (1, 1)]: + return False + if SPIMaster._frequency > 10e6: # PGA only supports max of 10MHz. + return False + + return True diff --git a/pslab/peripherals.py b/pslab/peripherals.py index 3dc01f4a..f8e9ad7e 100644 --- a/pslab/peripherals.py +++ b/pslab/peripherals.py @@ -6,194 +6,6 @@ logger = logging.getLogger(__name__) -class SPI(): - """ - Methods to interact with the SPI port. An instance of Packet_Handler must be passed to the init function - - """ - - def __init__(self, H): - self.H = H - - def set_parameters(self, primary_prescaler=0, secondary_prescaler=2, CKE=1, CKP=0, SMP=1): - """ - sets SPI parameters. - - .. tabularcolumns:: |p{3cm}|p{11cm}| - - ================ ============================================================================================ - **Arguments** - ================ ============================================================================================ - primary_pres Primary Prescaler(0,1,2,3) for 64MHz clock->(64:1,16:1,4:1,1:1) - secondary_pres Secondary prescaler(0,1,..7)->(8:1,7:1,..1:1) - CKE CKE 0 or 1. - CKP CKP 0 or 1. - ================ ============================================================================================ - - """ - self.H.send_byte(CP.SPI_HEADER) - self.H.send_byte(CP.SET_SPI_PARAMETERS) - # 0Bhgfedcba - > : modebit CKP,: modebit CKE, :primary pre,:secondary pre - self.H.send_byte(secondary_prescaler | (primary_prescaler << 3) | (CKE << 5) | (CKP << 6) | (SMP << 7)) - self.H.get_ack() - - def start(self, channel): - """ - selects SPI channel to enable. - Basically lowers the relevant chip select pin . - - .. tabularcolumns:: |p{3cm}|p{11cm}| - - ================ ============================================================================================ - **Arguments** - ================ ============================================================================================ - channel 1-7 ->[PGA1 connected to CH1,PGA2,PGA3,PGA4,PGA5,external chip select 1,external chip select 2] - 8 -> sine1 - 9 -> sine2 - ================ ============================================================================================ - - """ - self.H.send_byte(CP.SPI_HEADER) - self.H.send_byte(CP.START_SPI) - self.H.send_byte(channel) # value byte - # self.H.get_ack() - - def set_cs(self, channel, state): - """ - Enable or disable a chip select - - .. tabularcolumns:: |p{3cm}|p{11cm}| - - ================ ============================================================================================ - **Arguments** - ================ ============================================================================================ - channel 'CS1','CS2' - state 1 for HIGH, 0 for LOW - ================ ============================================================================================ - - """ - channel = channel.upper() - if channel in ['CS1', 'CS2']: - csnum = ['CS1', 'CS2'].index(channel) + 9 # chip select number 9=CSOUT1,10=CSOUT2 - self.H.send_byte(CP.SPI_HEADER) - if state: - self.H.send_byte(CP.STOP_SPI) - else: - self.H.send_byte(CP.START_SPI) - self.H.send_byte(csnum) - else: - print('Channel does not exist') - - def stop(self, channel): - """ - selects SPI channel to disable. - Sets the relevant chip select pin to HIGH. - - .. tabularcolumns:: |p{3cm}|p{11cm}| - - ================ ============================================================================================ - **Arguments** - ================ ============================================================================================ - channel 1-7 ->[PGA1 connected to CH1,PGA2,PGA3,PGA4,PGA5,external chip select 1,external chip select 2] - ================ ============================================================================================ - - - """ - self.H.send_byte(CP.SPI_HEADER) - self.H.send_byte(CP.STOP_SPI) - self.H.send_byte(channel) # value byte - # self.H.get_ack() - - def send8(self, value): - """ - SENDS 8-bit data over SPI - - .. tabularcolumns:: |p{3cm}|p{11cm}| - - ================ ============================================================================================ - **Arguments** - ================ ============================================================================================ - value value to transmit - ================ ============================================================================================ - - :return: value returned by slave device - """ - self.H.send_byte(CP.SPI_HEADER) - self.H.send_byte(CP.SEND_SPI8) - self.H.send_byte(value) # value byte - v = self.H.get_byte() - self.H.get_ack() - return v - - def send16(self, value): - """ - SENDS 16-bit data over SPI - - .. tabularcolumns:: |p{3cm}|p{11cm}| - - ================ ============================================================================================ - **Arguments** - ================ ============================================================================================ - value value to transmit - ================ ============================================================================================ - - :return: value returned by slave device - :rtype: int - """ - self.H.send_byte(CP.SPI_HEADER) - self.H.send_byte(CP.SEND_SPI16) - self.H.send_int(value) # value byte - v = self.H.get_int() - self.H.get_ack() - return v - - def send8_burst(self, value): - """ - SENDS 8-bit data over SPI - No acknowledge/return value - - .. tabularcolumns:: |p{3cm}|p{11cm}| - - ================ ============================================================================================ - **Arguments** - ================ ============================================================================================ - value value to transmit - ================ ============================================================================================ - - :return: Nothing - """ - self.H.send_byte(CP.SPI_HEADER) - self.H.send_byte(CP.SEND_SPI8_BURST) - self.H.send_byte(value) # value byte - - def send16_burst(self, value): - """ - SENDS 16-bit data over SPI - no acknowledge/return value - - .. tabularcolumns:: |p{3cm}|p{11cm}| - - ============== ============================================================================================ - **Arguments** - ============== ============================================================================================ - value value to transmit - ============== ============================================================================================ - - :return: nothing - """ - self.H.send_byte(CP.SPI_HEADER) - self.H.send_byte(CP.SEND_SPI16_BURST) - self.H.send_int(value) # value byte - - def xfer(self, chan, data): - self.start(chan) - reply = [] - for a in data: - reply.append(self.send8(a)) - self.stop(chan) - return reply - - class NRF24L01(): # Commands R_REG = 0x00 diff --git a/pslab/sciencelab.py b/pslab/sciencelab.py index 82e780c5..327fc2d7 100644 --- a/pslab/sciencelab.py +++ b/pslab/sciencelab.py @@ -8,6 +8,7 @@ import pslab.protocol as CP from pslab.bus.i2c import I2CMaster +from pslab.bus.spi import SPIMaster from pslab.instrument.logic_analyzer import LogicAnalyzer from pslab.instrument.multimeter import Multimeter from pslab.instrument.oscilloscope import Oscilloscope @@ -46,6 +47,7 @@ def __init__( self.multimeter = Multimeter(device=self) self.power_supply = PowerSupply(device=self) self.i2c = I2CMaster(device=self) + self.spi = SPIMaster(device=self) self.nrf = NRF24L01(device=self) if "V6" in self.version: # Set the built-in WS2812B to green :) diff --git a/tests/recordings/spi/test_chip_select.json b/tests/recordings/spi/test_chip_select.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_chip_select.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_read16.json b/tests/recordings/spi/test_read16.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_read16.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_read16_bulk.json b/tests/recordings/spi/test_read16_bulk.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_read16_bulk.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_read8.json b/tests/recordings/spi/test_read8.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_read8.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_read8_bulk.json b/tests/recordings/spi/test_read8_bulk.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_read8_bulk.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_set_parameter_clock_edge[0].json b/tests/recordings/spi/test_set_parameter_clock_edge[0].json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_set_parameter_clock_edge[0].json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_set_parameter_clock_edge[1].json b/tests/recordings/spi/test_set_parameter_clock_edge[1].json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_set_parameter_clock_edge[1].json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_set_parameter_clock_polarity[0].json b/tests/recordings/spi/test_set_parameter_clock_polarity[0].json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_set_parameter_clock_polarity[0].json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_set_parameter_clock_polarity[1].json b/tests/recordings/spi/test_set_parameter_clock_polarity[1].json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_set_parameter_clock_polarity[1].json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_set_parameter_frequency.json b/tests/recordings/spi/test_set_parameter_frequency.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_set_parameter_frequency.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_set_parameter_smp[0].json b/tests/recordings/spi/test_set_parameter_smp[0].json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_set_parameter_smp[0].json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_set_parameter_smp[1].json b/tests/recordings/spi/test_set_parameter_smp[1].json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_set_parameter_smp[1].json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_transfer16.json b/tests/recordings/spi/test_transfer16.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_transfer16.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_transfer16_bulk.json b/tests/recordings/spi/test_transfer16_bulk.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_transfer16_bulk.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_transfer8.json b/tests/recordings/spi/test_transfer8.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_transfer8.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_transfer8_bulk.json b/tests/recordings/spi/test_transfer8_bulk.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_transfer8_bulk.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_write16.json b/tests/recordings/spi/test_write16.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_write16.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_write16_bulk.json b/tests/recordings/spi/test_write16_bulk.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_write16_bulk.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_write8.json b/tests/recordings/spi/test_write8.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_write8.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/recordings/spi/test_write8_bulk.json b/tests/recordings/spi/test_write8_bulk.json new file mode 100644 index 00000000..e6f94764 --- /dev/null +++ b/tests/recordings/spi/test_write8_bulk.json @@ -0,0 +1 @@ +[[], []] \ No newline at end of file diff --git a/tests/test_spi.py b/tests/test_spi.py new file mode 100644 index 00000000..1ab489c9 --- /dev/null +++ b/tests/test_spi.py @@ -0,0 +1,314 @@ +"""Tests for pslab.bus.spi. + +When integration testing, the PSLab's logic analyzer and PWM output are used to +verify the function of the SPI bus. Before running the integration tests, connect: + SCK -> LA1 + SDO -> LA2 + SDI -> SQ1 and LA4 + SPI.CS -> LA3 +""" + +import pytest +import re +from numpy import ndarray + +from pslab.bus.spi import SPIMaster, SPISlave +from pslab.instrument.logic_analyzer import LogicAnalyzer +from pslab.instrument.waveform_generator import PWMGenerator +from pslab.serial_handler import SerialHandler, MockHandler + +SPI_SUPPORTED_DEVICES = [ + # "PSLab vMOCK", # Uncomment after adding recording json files. + "PSLab V6", +] + +WRITE_DATA8 = 0b10100101 +WRITE_DATA16 = 0xAA55 +SCK = "LA1" +SDO = "LA2" +SDI = ["LA4", "SQ1"] +CS = "LA3" +SPIMaster._primary_prescaler = PPRE = 0 +SPIMaster._secondary_prescaler = SPRE = 0 +PWM_FERQUENCY = SPIMaster._frequency * 2 / 3 +MICROSECONDS = 1e-6 +RELTOL = 0.05 +# Number of expected logic level changes. +CS_START = 1 +CS_STOP = 1 +SCK_WRITE8 = 16 +SCK_WRITE16 = 2 * 16 +SDO_WRITE_DATA8 = 8 +SDO_WRITE_DATA16 = 16 + + +@pytest.fixture +def master(handler: SerialHandler) -> SPIMaster: + if handler.version not in SPI_SUPPORTED_DEVICES: + pytest.skip("SPI not supported by this device.") + handler._logging = True + spi_master = SPIMaster(device=handler) + yield spi_master + spi_master.set_parameters() + + +@pytest.fixture +def slave(handler: SerialHandler) -> SPISlave: + if handler.version not in SPI_SUPPORTED_DEVICES: + pytest.skip("SPI not supported by this device.") + handler._logging = True + return SPISlave(device=handler) + + +@pytest.fixture +def la(handler: SerialHandler) -> LogicAnalyzer: + if not isinstance(handler, MockHandler): + pwm = PWMGenerator(handler) + pwm.generate(SDI[1], PWM_FERQUENCY, 0.5) + handler._logging = True + return LogicAnalyzer(handler) + + +def verify_value( + value: int, + sck_timestamps: ndarray, + sdi_initstate: int, + sdi_timestamps: ndarray, + smp: int = 0, +): + sck_ts = sck_timestamps[smp::2] + pwm_half_period = ((1 / PWM_FERQUENCY) * 1e6) / 2 # microsecond + + pattern = "" + for t in sck_ts: + d, m = divmod(t - sdi_timestamps[0], pwm_half_period) + if m == pytest.approx(0, abs=0.1) or m == pytest.approx(pwm_half_period, abs=0.1): + pattern += "[0,1]" + elif d % 2: + pattern += "1" if sdi_initstate else "0" + else: + pattern += "0" if sdi_initstate else "1" + + pattern = re.compile(pattern) + bits = len(sck_ts) + value = bin(value)[2:].zfill(bits) + + return bool(pattern.match(value)) + + +def test_set_parameter_frequency(la: LogicAnalyzer, master: SPIMaster, slave: SPISlave): + # frequency 166666.66666666666 + ppre = 0 + spre = 2 + master.set_parameters(primary_prescaler=ppre, secondary_prescaler=spre) + la.capture(1, block=False) + slave.write8(0) + la.stop() + (sck,) = la.fetch_data() + write_start = sck[0] + write_stop = sck[-2] # Output data on rising edge only (in mode 0) + start_to_stop = 7 + period = (write_stop - write_start) / start_to_stop + assert (period * MICROSECONDS) ** -1 == pytest.approx(master._frequency, rel=RELTOL) + + +@pytest.mark.parametrize("ckp", [0, 1]) +def test_set_parameter_clock_polarity( + la: LogicAnalyzer, master: SPIMaster, slave: SPISlave, ckp: int +): + master.set_parameters(CKP=ckp) + assert la.get_states()[SCK] == bool(ckp) + + +@pytest.mark.parametrize("cke", [0, 1]) +def test_set_parameter_clock_edge( + la: LogicAnalyzer, master: SPIMaster, slave: SPISlave, cke: int +): + master.set_parameters(CKE=cke) + la.capture(2, block=False) + slave.write8(WRITE_DATA8) + la.stop() + (sck, sdo) = la.fetch_data() + idle_to_active = sck[0] + first_bit = sdo[0] + # Serial output data changes on transition + # {0: from Idle clock state to active state (first event before data change), + # 1: from active clock state to Idle state (data change before first event)}. + assert first_bit < idle_to_active == bool(cke) + + +@pytest.mark.parametrize("smp", [0, 1]) +def test_set_parameter_smp( + la: LogicAnalyzer, master: SPIMaster, slave: SPISlave, pwm: PWMGenerator, smp: int +): + master.set_parameters(SMP=smp) + la.capture([SCK, SDI[0]], block=False) + value = slave.read8() + la.stop() + (sck, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert verify_value(value, sck, sdi_initstate, sdi, smp) + + +def test_chip_select(la: LogicAnalyzer, slave: SPISlave): + assert la.get_states()[CS] + + la.capture(CS, block=False) + slave._start() + slave._stop() + la.stop() + (cs,) = la.fetch_data() + assert len(cs) == (CS_START + CS_STOP) + + +def test_write8(la: LogicAnalyzer, slave: SPISlave): + la.capture(3, block=False) + slave.write8(WRITE_DATA8) + la.stop() + (sck, sdo, cs) = la.fetch_data() + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE8 + assert len(sdo) == SDO_WRITE_DATA8 + + +def test_write16(la: LogicAnalyzer, slave: SPISlave): + la.capture(3, block=False) + slave.write16(WRITE_DATA16) + la.stop() + (sck, sdo, cs) = la.fetch_data() + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE16 + assert len(sdo) == SDO_WRITE_DATA16 + + +def test_read8(la: LogicAnalyzer, slave: SPISlave): + la.capture(4, block=False) + value = slave.read8() + la.stop() + (sck, sdo, cs, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE8 + assert len(sdo) == 0 + assert verify_value(value, sck, sdi_initstate, sdi) + + +def test_read16(la: LogicAnalyzer, slave: SPISlave): + la.capture(4, block=False) + value = slave.read16() + la.stop() + (sck, sdo, cs, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE16 + assert len(sdo) == 0 + assert verify_value(value, sck, sdi_initstate, sdi) + + +def test_transfer8(la: LogicAnalyzer, slave: SPISlave): + la.capture(4, block=False) + value = slave.transfer8(WRITE_DATA8) + la.stop() + (sck, sdo, cs, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE8 + assert len(sdo) == SDO_WRITE_DATA8 + assert verify_value(value, sck, sdi_initstate, sdi) + + +def test_transfer16(la: LogicAnalyzer, slave: SPISlave): + la.capture(4, block=False) + value = slave.transfer16(WRITE_DATA16) + la.stop() + (sck, sdo, cs, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE16 + assert len(sdo) == SDO_WRITE_DATA16 + assert verify_value(value, sck, sdi_initstate, sdi) + + +def test_write8_bulk(la: LogicAnalyzer, slave: SPISlave): + la.capture(3, block=False) + slave.write8_bulk([WRITE_DATA8, WRITE_DATA8]) + la.stop() + (sck, sdo, cs) = la.fetch_data() + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE8 + SCK_WRITE8 + assert len(sdo) == SDO_WRITE_DATA8 + SDO_WRITE_DATA16 + + +def test_write16_bulk(la: LogicAnalyzer, slave: SPISlave): + la.capture(3, block=False) + slave.write16_bulk([WRITE_DATA16, WRITE_DATA16]) + la.stop() + (sck, sdo, cs) = la.fetch_data() + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE16 + SCK_WRITE16 + assert len(sdo) == SDO_WRITE_DATA16 + SDO_WRITE_DATA16 + + +def test_read8_bulk(la: LogicAnalyzer, slave: SPISlave): + la.capture(4, block=False) + value = slave.read8_bulk(2) + la.stop() + (sck, sdo, cs, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE8 + SCK_WRITE8 + assert len(sdo) == 0 + assert verify_value(value[0], sck, sdi_initstate, sdi[:16]) + assert verify_value(value[1], sck, sdi_initstate, sdi[16:]) + + +def test_read16_bulk(la: LogicAnalyzer, slave: SPISlave): + la.capture(4, block=False) + value = slave.read16_bulk(2) + la.stop() + (sck, sdo, cs, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE16 + SCK_WRITE16 + assert len(sdo) == 0 + assert verify_value(value[0], sck, sdi_initstate, sdi[:32]) + assert verify_value(value[1], sck, sdi_initstate, sdi[32:]) + + +def test_transfer8_bulk(la: LogicAnalyzer, slave: SPISlave): + la.capture(4, block=False) + value = slave.transfer8_bulk([WRITE_DATA8, WRITE_DATA8]) + la.stop() + (sck, sdo, cs, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE8 + SCK_WRITE8 + assert len(sdo) == 0 + assert verify_value(value[0], sck, sdi_initstate, sdi[:16]) + assert verify_value(value[1], sck, sdi_initstate, sdi[16:]) + + +def test_transfer16_bulk(la: LogicAnalyzer, slave: SPISlave): + la.capture(4, block=False) + value = slave.transfer16_bulk([WRITE_DATA16, WRITE_DATA16]) + la.stop() + (sck, sdo, cs, sdi) = la.fetch_data() + sdi_initstate = la.get_initial_states()[SDI[0]] + + assert len(cs) == (CS_START + CS_STOP) + assert len(sck) == SCK_WRITE16 + SCK_WRITE16 + assert len(sdo) == 0 + assert verify_value(value[0], sck, sdi_initstate, sdi[:32]) + assert verify_value(value[1], sck, sdi_initstate, sdi[32:])