Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SPI & Add busio.SPI #199

Merged
merged 10 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pslab/bus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""Contains modules for interfacing with the PSLab's I2C, SPI, and UART buses.

(SPI and UART still TODO)
(UART still TODO)
"""

from pslab.bus.i2c import I2CMaster, I2CSlave
from pslab.bus.spi import SPIMaster, SPISlave

__all__ = (
"I2CMaster",
"I2CSlave",
"SPIMaster",
"SPISlave",
)
200 changes: 198 additions & 2 deletions pslab/bus/busio.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@
>>> sensor = adafruit_bno055.BNO055_I2C(i2c)
>>> print(sensor.gyro)
"""

from typing import List, Union

from pslab.bus.i2c import I2CPrimitive
from pslab.bus.i2c import _I2CPrimitive
from pslab.bus.spi import _SPIPrimitive
from pslab.serial_handler import SerialHandler

__all__ = "I2C"
ReadableBuffer = Union[bytes, bytearray, memoryview]
WriteableBuffer = Union[bytearray, memoryview]


class I2C(I2CPrimitive):
class I2C(_I2CPrimitive):
"""Busio I2C Class for CircuitPython Compatibility.

Parameters
Expand Down Expand Up @@ -177,3 +179,197 @@ def writeto_then_readfrom(
self._restart(address, 1)
buffer_in[in_start:in_end] = self._read(bytes_to_read)
self._stop()


class SPI(_SPIPrimitive):
"""Busio SPI Class for CircuitPython Compatibility.

Parameters
----------
device : :class:`SerialHandler`, optional
Serial connection to PSLab device. If not provided, a new one will be
created.
"""

def __init__(self, device: SerialHandler = None, frequency: int = 125e3):
super().__init__(device)
ppre, spre = self._get_prescaler(25e4)
self._set_parameters(ppre, spre, 1, 0, 1)
self._bits = 8

@property
def frequency(self) -> int:
"""Get the actual SPI bus frequency (rounded).

This may not match the frequency requested due to internal limitations.
"""
return round(self._frequency)

def deinit(self) -> None:
"""Just a dummy method."""
pass

def __enter__(self):
"""Just a dummy context manager."""
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Call :meth:`deinit` on context exit."""
self.deinit()

def configure(
self,
*,
baudrate: int = 100000,
polarity: int = 0,
phase: int = 0,
bits: int = 8,
) -> None:
"""Configure the SPI bus.

Parameters
----------
baudrate : int
The desired clock rate in Hertz. The actual clock rate may be
higher or lower due to the granularity of available clock settings.
Check the frequency attribute for the actual clock rate.
polarity : int
The base state of the clock line (0 or 1)
phase : int
The edge of the clock that data is captured. First (0) or second (1).
Rising or falling depends on clock polarity.
bits : int
The number of bits per word.
"""
if polarity not in (0, 1):
raise ValueError("Invalid polarity")
if phase not in (0, 1):
raise ValueError("Invalid phase")
if bits not in self._INTEGER_TYPE_MAP:
raise ValueError("Invalid number of bits")

ppre, spre = self._get_prescaler(baudrate)
cke = (phase ^ 1) & 1
self._set_parameters(ppre, spre, cke, polarity, 1)
self._bits = bits

def try_lock(self) -> bool: # pylint: disable=no-self-use
"""Just a dummy method."""
return True

def unlock(self) -> None:
"""Just a dummy method."""
pass

def write(
self,
buffer: Union[ReadableBuffer, List[int]],
*,
start: int = 0,
end: int = None,
) -> None:
"""Write the data contained in buffer. If the buffer is empty, nothing happens.

Parameters
----------
buffer : bytes or bytearray or memoryview or list_of_int (for bits >8)
Write out the data in this buffer.
start : int
Start of the slice of `buffer` to write out: `buffer[start:end]`.
end : int
End of the slice; this index is not included. Defaults to `len(buffer)`.
"""
end = len(buffer) if end is None else end
buffer = buffer[start:end]

if not buffer:
return

self._start()
self._write_bulk(buffer, self._bits)
self._stop()

def readinto(
self,
buffer: Union[WriteableBuffer, List[int]],
*,
start: int = 0,
end: int = None,
write_value: int = 0,
) -> None:
"""Read into `buffer` while writing `write_value` for each byte read.

If the number of bytes to read is 0, nothing happens.

Parameters
----------
buffer : bytearray or memoryview or list_of_int (for bits >8)
Read data into this buffer.
start : int
Start of the slice of `buffer` to read into: `buffer[start:end]`.
end : int
End of the slice; this index is not included. Defaults to `len(buffer)`.
write_value : int
Value to write while reading. (Usually ignored.)
"""
end = len(buffer) if end is None else end
bytes_to_read = end - start

if bytes_to_read == 0:
return

self._start()
data = self._transfer_bulk([write_value] * bytes_to_read, self._bits)
self._stop()

for i, v in zip(range(start, end), data):
buffer[i] = v

def write_readinto(
self,
buffer_out: Union[ReadableBuffer, List[int]],
buffer_in: Union[WriteableBuffer, List[int]],
*,
out_start: int = 0,
out_end: int = None,
in_start: int = 0,
in_end: int = None,
):
"""Write out the data in buffer_out while simultaneously read into buffer_in.

The lengths of the slices defined by buffer_out[out_start:out_end] and
buffer_in[in_start:in_end] must be equal. If buffer slice lengths are both 0,
nothing happens.

Parameters
----------
buffer_out : bytes or bytearray or memoryview or list_of_int (for bits >8)
Write out the data in this buffer.
buffer_in : bytearray or memoryview or list_of_int (for bits >8)
Read data into this buffer.
out_start : int
Start of the slice of `buffer_out` to write out:
`buffer_out[out_start:out_end]`.
out_end : int
End of the slice; this index is not included. Defaults to `len(buffer_out)`
in_start : int
Start of the slice of `buffer_in` to read into:`buffer_in[in_start:in_end]`
in_end : int
End of the slice; this index is not included. Defaults to `len(buffer_in)`
"""
out_end = len(buffer_out) if out_end is None else out_end
in_end = len(buffer_in) if in_end is None else in_end
buffer_out = buffer_out[out_start:out_end]
bytes_to_read = in_end - in_start

if len(buffer_out) != bytes_to_read:
raise ValueError("buffer slices must be of equal length")
if bytes_to_read == 0:
return

self._start()
data = self._transfer_bulk(buffer_out, self._bits)
self._stop()

for i, v in zip(range(in_start, in_end), data):
buffer_in[i] = v
11 changes: 8 additions & 3 deletions pslab/bus/i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

>>> rtc = I2CSlave(address=104)
"""

import logging
from typing import List

Expand All @@ -31,7 +32,7 @@
logger = logging.getLogger(__name__)


class I2CPrimitive:
class _I2CPrimitive:
"""I2C primitive commands.

Handles all the I2C subcommands coded in pslab-firmware.
Expand Down Expand Up @@ -385,6 +386,7 @@ def _read(self, bytes_to_read: int) -> bytearray:

for _ in range(bytes_to_read - 1):
data.append(self._read_more())

data.append(self._read_end())

return data
Expand Down Expand Up @@ -427,7 +429,7 @@ def _read_bulk(
return bytearray(data)


class I2CMaster(I2CPrimitive):
class I2CMaster(_I2CPrimitive):
"""I2C bus controller.

Handles slave independent functionality with the I2C port.
Expand Down Expand Up @@ -478,7 +480,7 @@ def scan(self) -> List[int]:
return addrs


class I2CSlave(I2CPrimitive):
class I2CSlave(_I2CPrimitive):
"""I2C slave device.

Parameters
Expand Down Expand Up @@ -513,6 +515,7 @@ def ping(self) -> bool:
"""
response = self._start(self.address, self._READ)
self._stop()

return response == self._ACK

def read(self, bytes_to_read: int, register_address: int = 0x0) -> bytearray:
Expand Down Expand Up @@ -572,6 +575,7 @@ def read_int(self, register_address: int = 0x0) -> int:
Two bytes interpreted as a uint16.
"""
data = self.read(2, register_address)

return CP.ShortInt.unpack(data)[0]

def read_long(self, register_address: int = 0x0) -> int:
Expand All @@ -589,6 +593,7 @@ def read_long(self, register_address: int = 0x0) -> int:
Four bytes interpreted as a uint32.
"""
data = self.read(4, register_address)

return CP.Integer.unpack(data)[0]

def write(self, bytes_to_write: bytearray, register_address: int = 0x0):
Expand Down
Loading