diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 70ade69..2a8cf3e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,6 +7,11 @@ repos: rev: 23.3.0 hooks: - id: black + - repo: https://github.com/PyCQA/isort + rev: 5.13.2 + hooks: + - id: isort + args: ["--profile", "black", "--filter-files"] - repo: https://github.com/fsfe/reuse-tool rev: v1.1.2 hooks: @@ -18,7 +23,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/pycqa/pylint - rev: v2.17.4 + rev: v3.0.4 hooks: - id: pylint name: pylint (library code) @@ -40,3 +45,7 @@ repos: files: "^tests/" args: - --disable=missing-docstring,consider-using-f-string,duplicate-code + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.11.0 + hooks: + - id: mypy diff --git a/adafruit_ble/__init__.py b/adafruit_ble/__init__.py index 693a47f..c8725f4 100755 --- a/adafruit_ble/__init__.py +++ b/adafruit_ble/__init__.py @@ -12,9 +12,10 @@ from __future__ import annotations +import sys + # pylint: disable=wrong-import-position -import sys if sys.implementation.name == "circuitpython" and sys.implementation.version[0] <= 4: raise ImportError( @@ -24,17 +25,31 @@ import _bleio -from .services import Service from .advertising import Advertisement +from .services import Service +from .uuid import UUID try: - from typing import Iterator, NoReturn, Optional, Tuple, Type, TYPE_CHECKING, Union + from typing import ( + TYPE_CHECKING, + Dict, + Iterator, + List, + NoReturn, + Optional, + Tuple, + Type, + Union, + ) + from typing_extensions import Literal if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID - from adafruit_ble.characteristics import Characteristic + + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -55,11 +70,11 @@ class BLEConnection: def __init__(self, bleio_connection: _bleio.Connection) -> None: self._bleio_connection = bleio_connection # _bleio.Service objects representing services found during discovery. - self._discovered_bleio_services = {} + self._discovered_bleio_services: Dict[Uuid, _bleio.Service] = {} # Service objects that wrap remote services. - self._constructed_services = {} + self._constructed_services: Dict[Uuid, Service] = {} - def _discover_remote(self, uuid: UUID) -> Optional[_bleio.Service]: + def _discover_remote(self, uuid: Uuid) -> Optional[_bleio.Service]: remote_service = None if uuid in self._discovered_bleio_services: remote_service = self._discovered_bleio_services[uuid] @@ -72,7 +87,7 @@ def _discover_remote(self, uuid: UUID) -> Optional[_bleio.Service]: self._discovered_bleio_services[uuid] = remote_service return remote_service - def __contains__(self, key: Union[UUID, Type[Service]]) -> bool: + def __contains__(self, key: Union[Uuid, Type[Service]]) -> bool: """ Allows easy testing for a particular Service class or a particular UUID associated with this connection. @@ -85,16 +100,15 @@ def __contains__(self, key: Union[UUID, Type[Service]]) -> bool: if StandardUUID(0x1234) in connection: # do something """ - uuid = key - if hasattr(key, "uuid"): - uuid = key.uuid + uuid = key if isinstance(key, UUID) else key.uuid return self._discover_remote(uuid) is not None - def __getitem__(self, key: Union[UUID, Type[Service]]) -> Optional[Service]: + def __getitem__(self, key: Union[Uuid, Type[Service]]) -> Optional[Service]: """Return the Service for the given Service class or uuid, if any.""" - uuid = key - maybe_service = False - if hasattr(key, "uuid"): + if isinstance(key, UUID): + uuid = key + maybe_service = False + else: uuid = key.uuid maybe_service = True @@ -104,7 +118,7 @@ def __getitem__(self, key: Union[UUID, Type[Service]]) -> Optional[Service]: remote_service = self._discover_remote(uuid) if remote_service: constructed_service = None - if maybe_service: + if maybe_service and not isinstance(key, UUID): constructed_service = key(service=remote_service) self._constructed_services[uuid] = constructed_service return constructed_service @@ -166,7 +180,7 @@ def __init__(self, adapter: Optional[_bleio.Adapter] = None) -> None: raise RuntimeError("No adapter available") self._adapter = adapter or _bleio.adapter self._current_advertisement = None - self._connection_cache = {} + self._connection_cache: Dict[_bleio.Connection, BLEConnection] = {} def start_advertising( self, @@ -223,7 +237,7 @@ def stop_advertising(self) -> None: """Stops advertising.""" self._adapter.stop_advertising() - def start_scan( + def start_scan( # pylint: disable=too-many-arguments self, *advertisement_types: Type[Advertisement], buffer_size: int = 512, @@ -311,9 +325,13 @@ def connect( :return: the connection to the peer :rtype: BLEConnection """ - if not isinstance(peer, _bleio.Address): - peer = peer.address - connection = self._adapter.connect(peer, timeout=timeout) + if isinstance(peer, _bleio.Address): + peer_ = peer + else: + assert peer.address is not None + peer_ = peer.address + + connection = self._adapter.connect(peer_, timeout=timeout) self._clean_connection_cache() self._connection_cache[connection] = BLEConnection(connection) return self._connection_cache[connection] @@ -328,7 +346,7 @@ def connections(self) -> Tuple[Optional[BLEConnection], ...]: """A tuple of active `BLEConnection` objects.""" self._clean_connection_cache() connections = self._adapter.connections - wrapped_connections = [None] * len(connections) + wrapped_connections: List[Optional[BLEConnection]] = [None] * len(connections) for i, connection in enumerate(connections): if connection not in self._connection_cache: self._connection_cache[connection] = BLEConnection(connection) diff --git a/adafruit_ble/advertising/__init__.py b/adafruit_ble/advertising/__init__.py index 02324d3..ba49376 100644 --- a/adafruit_ble/advertising/__init__.py +++ b/adafruit_ble/advertising/__init__.py @@ -11,16 +11,29 @@ import struct try: - from typing import Dict, Any, Union, List, Optional, Type, TypeVar, TYPE_CHECKING + from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Optional, + Tuple, + Type, + TypeVar, + Union, + ) + from typing_extensions import Literal if TYPE_CHECKING: - from _bleio import ScanEntry + from _bleio import Address, ScanEntry LazyObjectField_GivenClass = TypeVar( # pylint: disable=invalid-name "LazyObjectField_GivenClass" ) + DataDict = Dict[int, Union[bytes, List[bytes]]] + except ImportError: pass @@ -35,13 +48,11 @@ def to_bytes_literal(seq: bytes) -> str: return 'b"' + "".join("\\x{:02x}".format(v) for v in seq) + '"' -def decode_data( - data: bytes, *, key_encoding: str = "B" -) -> Dict[Any, Union[bytes, List[bytes]]]: +def decode_data(data: bytes, *, key_encoding: str = "B") -> DataDict: """Helper which decodes length encoded structures into a dictionary with the given key encoding.""" i = 0 - data_dict = {} + data_dict: DataDict = {} key_size = struct.calcsize(key_encoding) while i < len(data): item_length = data[i] @@ -51,18 +62,18 @@ def decode_data( key = struct.unpack_from(key_encoding, data, i)[0] value = data[i + key_size : i + item_length] if key in data_dict: - if not isinstance(data_dict[key], list): - data_dict[key] = [data_dict[key]] - data_dict[key].append(value) + cur_value = data_dict[key] + if isinstance(cur_value, list): + cur_value.append(value) + else: + data_dict[key] = [cur_value, value] else: data_dict[key] = value i += item_length return data_dict -def compute_length( - data_dict: Dict[Any, Union[bytes, List[bytes]]], *, key_encoding: str = "B" -) -> int: +def compute_length(data_dict: DataDict, *, key_encoding: str = "B") -> int: """Computes the length of the encoded data dictionary.""" value_size = 0 for value in data_dict.values(): @@ -74,9 +85,7 @@ def compute_length( return len(data_dict) + len(data_dict) * struct.calcsize(key_encoding) + value_size -def encode_data( - data_dict: Dict[Any, Union[bytes, List[bytes]]], *, key_encoding: str = "B" -) -> bytes: +def encode_data(data_dict: DataDict, *, key_encoding: str = "B") -> bytes: """Helper which encodes dictionaries into length encoded structures with the given key encoding.""" length = compute_length(data_dict, key_encoding=key_encoding) @@ -137,7 +146,9 @@ def __init__( self._adt = advertising_data_type self.flags = 0 if self._adt in self._advertisement.data_dict: - self.flags = self._advertisement.data_dict[self._adt][0] + value = self._advertisement.data_dict[self._adt] + assert not isinstance(value, list) + self.flags = value[0] def __len__(self) -> Literal[1]: return 1 @@ -170,7 +181,9 @@ def __get__( return self if self._adt not in obj.data_dict: return None - return str(obj.data_dict[self._adt], "utf-8") + value = obj.data_dict[self._adt] + assert not isinstance(value, list) + return str(value, "utf-8") def __set__(self, obj: "Advertisement", value: str) -> None: obj.data_dict[self._adt] = value.encode("utf-8") @@ -190,7 +203,9 @@ def __get__( return self if self._adt not in obj.data_dict: return None - return struct.unpack(self._format, obj.data_dict[self._adt])[0] + value = obj.data_dict[self._adt] + assert not isinstance(value, list) + return struct.unpack(self._format, value)[0] def __set__(self, obj: "Advertisement", value: Any) -> None: obj.data_dict[self._adt] = struct.pack(self._format, value) @@ -237,7 +252,10 @@ class Advertisement: bytestring prefixes to match against the multiple data structures in the advertisement. """ - match_prefixes = () + address: Optional[Address] + _rssi: Optional[int] + + match_prefixes: Optional[Tuple[bytes, ...]] = () """For Advertisement, :py:attr:`~adafruit_ble.advertising.Advertisement.match_prefixes` will always return ``True``. Subclasses may override this value.""" # cached bytes of merged prefixes. @@ -293,15 +311,16 @@ def rssi(self) -> Optional[int]: return self._rssi @classmethod - def get_prefix_bytes(cls) -> Optional[bytes]: + def get_prefix_bytes(cls) -> bytes: """Return a merged version of match_prefixes as a single bytes object, with length headers. """ # Check for deprecated `prefix` class attribute. - cls._prefix_bytes = getattr(cls, "prefix", None) + prefix_bytes: Optional[bytes] = getattr(cls, "prefix", None) + # Do merge once and memoize it. - if cls._prefix_bytes is None: - cls._prefix_bytes = ( + cls._prefix_bytes = ( + ( b"" if cls.match_prefixes is None else b"".join( @@ -309,6 +328,9 @@ def get_prefix_bytes(cls) -> Optional[bytes]: for prefix in cls.match_prefixes ) ) + if prefix_bytes is None + else prefix_bytes + ) return cls._prefix_bytes diff --git a/adafruit_ble/advertising/adafruit.py b/adafruit_ble/advertising/adafruit.py index ad3285b..4f30c56 100755 --- a/adafruit_ble/advertising/adafruit.py +++ b/adafruit_ble/advertising/adafruit.py @@ -15,6 +15,7 @@ """ import struct + from micropython import const from . import Advertisement, LazyObjectField diff --git a/adafruit_ble/advertising/standard.py b/adafruit_ble/advertising/standard.py index 3bfc1a1..8c077fb 100644 --- a/adafruit_ble/advertising/standard.py +++ b/adafruit_ble/advertising/standard.py @@ -11,29 +11,59 @@ """ +from __future__ import annotations + import struct from collections import OrderedDict, namedtuple +from ..uuid import StandardUUID, VendorUUID from . import ( Advertisement, AdvertisingDataField, - encode_data, + compute_length, decode_data, + encode_data, to_hex, - compute_length, ) -from ..uuid import StandardUUID, VendorUUID +TYPE_CHECKING = False try: - from typing import Optional, List, Tuple, Union, Type, Iterator, Iterable, Any - from adafruit_ble.uuid import UUID - from adafruit_ble.services import Service - from _bleio import ScanEntry + from typing import ( + TYPE_CHECKING, + Any, + Collection, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + Type, + TypeGuard, + Union, + overload, + ) + + if TYPE_CHECKING: + from _bleio import ScanEntry - UsesServicesAdvertisement = Union[ - "ProvideServicesAdvertisement", "SolicitServicesAdvertisement" - ] + from adafruit_ble.services import Service + from adafruit_ble.uuid import StandardUUID, VendorUUID + from . import DataDict + + AdvServiceLists = Dict[int, "BoundServiceList"] + + UsesServicesAdvertisement = Union[ + "ProvideServicesAdvertisement", "SolicitServicesAdvertisement" + ] + + Uuid = Union[StandardUUID, VendorUUID] + + class WithManufacturerData(Advertisement): + """Stub type, any subclass of Advertisement which has manufacturer data.""" + + manufacturer_data: ManufacturerData except ImportError: pass @@ -42,6 +72,13 @@ __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" +def has_manufacturer_data(obj: Advertisement) -> TypeGuard[WithManufacturerData]: + """Tiny function for type correctness.""" + return hasattr(obj, "manufacturer_data") and isinstance( + obj.manufacturer_data, ManufacturerData + ) + + class BoundServiceList: """Sequence-like object of Service UUID objects. It stores both standard and vendor UUIDs.""" @@ -50,33 +87,35 @@ def __init__( advertisement: UsesServicesAdvertisement, *, standard_services: List[int], - vendor_services: List[int] + vendor_services: List[int], ) -> None: self._advertisement = advertisement self._standard_service_fields = standard_services self._vendor_service_fields = vendor_services - self._standard_services = [] - self._vendor_services = [] + self._standard_services: List[StandardUUID] = [] + self._vendor_services: List[VendorUUID] = [] for adt in standard_services: if adt in self._advertisement.data_dict: data = self._advertisement.data_dict[adt] for i in range(len(data) // 2): - uuid = StandardUUID(data[2 * i : 2 * (i + 1)]) - self._standard_services.append(uuid) + standard_uuid = StandardUUID(data[2 * i : 2 * (i + 1)]) + self._standard_services.append(standard_uuid) for adt in vendor_services: if adt in self._advertisement.data_dict: data = self._advertisement.data_dict[adt] for i in range(len(data) // 16): - uuid = VendorUUID(data[16 * i : 16 * (i + 1)]) - self._vendor_services.append(uuid) + vendor_uuid = VendorUUID(data[16 * i : 16 * (i + 1)]) + self._vendor_services.append(vendor_uuid) - def __contains__(self, key: Union[UUID, Service]) -> bool: + def __contains__(self, key: Union[Uuid, Service]) -> bool: uuid = key if hasattr(key, "uuid"): uuid = key.uuid return uuid in self._vendor_services or uuid in self._standard_services - def _update(self, adt: int, uuids: List[UUID]) -> None: + def _update( + self, adt: int, uuids: Union[List[StandardUUID], List[VendorUUID]] + ) -> None: if not uuids: # uuids is empty del self._advertisement.data_dict[adt] @@ -88,13 +127,13 @@ def _update(self, adt: int, uuids: List[UUID]) -> None: i += uuid_length self._advertisement.data_dict[adt] = b - def __iter__(self) -> Iterator[UUID]: - all_services = list(self._standard_services) + def __iter__(self) -> Iterator[Uuid]: + all_services: List[Uuid] = list(self._standard_services) all_services.extend(self._vendor_services) return iter(all_services) # TODO: Differentiate between complete and incomplete lists. - def append(self, service: Service) -> None: + def append(self, service: Type[Service]) -> None: """Append a service to the list.""" if ( isinstance(service.uuid, StandardUUID) @@ -135,10 +174,10 @@ def extend(self, services: Iterable[Service]) -> None: def __str__(self) -> str: data = [] - for service_uuid in self._standard_services: - data.append(str(service_uuid)) - for service_uuid in self._vendor_services: - data.append(str(service_uuid)) + for standard_service_uuid in self._standard_services: + data.append(str(standard_service_uuid)) + for vendor_service_uuid in self._vendor_services: + data.append(str(vendor_service_uuid)) return "".format(", ".join(data)) @@ -160,11 +199,27 @@ def _present(self, obj: UsesServicesAdvertisement) -> bool: return True return False + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[UsesServicesAdvertisement]] = None + ) -> ServiceList: + ... + + @overload + def __get__( + self, + obj: UsesServicesAdvertisement, + cls: Optional[Type[UsesServicesAdvertisement]] = None, + ) -> Union[BoundServiceList, Tuple[()]]: + ... + def __get__( self, obj: Optional[UsesServicesAdvertisement], - cls: Type[UsesServicesAdvertisement], - ) -> Union[UsesServicesAdvertisement, Tuple[()], "ServiceList"]: + cls: Optional[Type[UsesServicesAdvertisement]] = None, + ) -> Union[ServiceList, Union[BoundServiceList, Tuple[()]]]: if obj is None: return self if not self._present(obj) and not obj.mutable: @@ -180,6 +235,8 @@ def __get__( class ProvideServicesAdvertisement(Advertisement): """Advertise what services that the device makes available upon connection.""" + adv_service_lists: AdvServiceLists + # Prefixes that match each ADT that can carry service UUIDs. match_prefixes = (b"\x02", b"\x03", b"\x06", b"\x07") services = ServiceList(standard_services=[0x02, 0x03], vendor_services=[0x06, 0x07]) @@ -193,6 +250,7 @@ def __init__(self, *services: Service, entry: Optional[ScanEntry] = None) -> Non # Attributes are supplied by entry. return if services: + assert self.services self.services.extend(services) self.connectable = True self.flags.general_discovery = True @@ -209,6 +267,8 @@ def matches(cls, entry: ScanEntry) -> bool: class SolicitServicesAdvertisement(Advertisement): """Advertise what services the device would like to use over a connection.""" + adv_service_lists: AdvServiceLists + # Prefixes that match each ADT that can carry solicited service UUIDs. match_prefixes = (b"\x14", b"\x15") @@ -222,6 +282,7 @@ def __init__(self, *services: Service, entry: Optional[ScanEntry] = None) -> Non raise ValueError("Supply services or entry, not both") # Attributes are supplied by entry. return + assert self.solicited_services self.solicited_services.extend(services) self.connectable = True self.flags.general_discovery = True @@ -237,13 +298,15 @@ class ManufacturerData(AdvertisingDataField): sub-class. """ + data: DataDict + def __init__( self, obj: UsesServicesAdvertisement, *, advertising_data_type: int = 0xFF, company_id: int, - key_encoding: str = "B" + key_encoding: str = "B", ) -> None: self._obj = obj self._company_id = company_id @@ -257,9 +320,14 @@ def __init__( if isinstance(existing_data, list): for existing in existing_data: if existing.startswith(encoded_company): - existing_data = existing - existing_data = None - self.data = decode_data(existing_data[2:], key_encoding=key_encoding) + data = existing + break + else: + # encoded_company was not found + raise RuntimeError + else: + data = existing_data + self.data = decode_data(data[2:], key_encoding=key_encoding) self._key_encoding = key_encoding def __len__(self) -> int: @@ -281,7 +349,7 @@ class ManufacturerDataField: """A single piece of data within the manufacturer specific data. The format can be repeated.""" def __init__( - self, key: int, value_format: str, field_names: Optional[Iterable[str]] = None + self, key: int, value_format: str, field_names: Optional[Collection[str]] = None ) -> None: self._key = key self._format = value_format @@ -297,18 +365,44 @@ def __init__( self._entry_length = struct.calcsize(value_format) self.field_names = field_names if field_names: + assert self.field_names is not None # Mostly, this is to raise a ValueError if field_names has invalid entries - self.mdf_tuple = namedtuple("mdf_tuple", self.field_names) + self.mdf_tuple = namedtuple( # type: ignore[misc, arg-type] + "mdf_tuple", + self.field_names, + ) + + if TYPE_CHECKING: + + @overload + def __get__( + self, + obj: None, + cls: Optional[Type[Advertisement]] = None, + ) -> ManufacturerDataField: + ... + + @overload + def __get__( + self, + obj: Advertisement, + cls: Optional[Type[Advertisement]] = None, + ) -> Optional[Tuple]: + ... def __get__( - self, obj: Optional[Advertisement], cls: Type[Advertisement] - ) -> Optional[Union["ManufacturerDataField", Tuple, namedtuple]]: + self, + obj: Optional[Advertisement], + cls: Optional[Type[Advertisement]] = None, + ) -> Union[ManufacturerDataField, Optional[Tuple]]: if obj is None: return self + assert has_manufacturer_data(obj) if self._key not in obj.manufacturer_data.data: return None packed = obj.manufacturer_data.data[self._key] if self._entry_length == len(packed): + assert isinstance(packed, bytes) unpacked = struct.unpack_from(self._format, packed) if self.element_count == 1: unpacked = unpacked[0] @@ -320,15 +414,20 @@ def __get__( if len(packed) % self._entry_length != 0: raise RuntimeError("Invalid data length") entry_count = len(packed) // self._entry_length - unpacked = [None] * entry_count + unpacked_: List[Optional[Tuple[Any, ...]]] = [None] * entry_count for i in range(entry_count): offset = i * self._entry_length - unpacked[i] = struct.unpack_from(self._format, packed, offset=offset) + assert isinstance(packed, bytes) + unpacked_[i] = struct.unpack_from(self._format, packed, offset=offset) if self.element_count == 1: - unpacked[i] = unpacked[i][0] + val = unpacked_[i] + assert val is not None + unpacked_[i] = val[0] return tuple(unpacked) - def __set__(self, obj: "Advertisement", value: Any) -> None: + def __set__(self, obj: Advertisement, value: Any) -> None: + assert has_manufacturer_data(obj) + if not obj.mutable: raise AttributeError() if isinstance(value, tuple) and ( @@ -360,7 +459,7 @@ def __init__(self, service: Service) -> None: self._prefix = bytes(service.uuid) def __get__( # pylint: disable=too-many-return-statements,too-many-branches - self, obj: Optional[Service], cls: Type[Service] + self, obj: Optional[Advertisement], cls: Type[Advertisement] ) -> Optional[Union["ServiceData", memoryview]]: if obj is None: return self diff --git a/adafruit_ble/characteristics/__init__.py b/adafruit_ble/characteristics/__init__.py index 42a5868..ddf8489 100644 --- a/adafruit_ble/characteristics/__init__.py +++ b/adafruit_ble/characteristics/__init__.py @@ -11,17 +11,22 @@ from __future__ import annotations import struct + import _bleio from ..attributes import Attribute +TYPE_CHECKING = False try: - from typing import Optional, Type, Union, Tuple, Iterable, TYPE_CHECKING + from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Type, Union, overload if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + from adafruit_ble.services import Service + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -32,7 +37,7 @@ class Characteristic: """ - Top level Characteristic class that does basic binding. + Base Characteristic class that does basic binding. :param UUID uuid: The uuid of the characteristic :param int properties: The properties of the characteristic, @@ -85,10 +90,13 @@ class Characteristic: WRITE = _bleio.Characteristic.WRITE WRITE_NO_RESPONSE = _bleio.Characteristic.WRITE_NO_RESPONSE - def __init__( + field_name: str + value: ReadableBuffer + + def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -96,7 +104,7 @@ def __init__( fixed_length: bool = False, initial_value: Optional[ReadableBuffer] = None, ) -> None: - self.field_name = None # Set by Service during basic binding + # field_name is set by Service during basic binding if uuid: self.uuid = uuid @@ -126,18 +134,18 @@ def _ensure_bound( service.bleio_characteristics[self.field_name] = bleio_characteristic def __bind_locally( - self, service: Service, initial_value: Optional[bytes] + self, service: Service, initial_value: Optional[ReadableBuffer] ) -> _bleio.Characteristic: - if initial_value is None: - initial_value = self.initial_value - if initial_value is None and self.max_length: - initial_value = bytes(self.max_length) + value = initial_value if initial_value is not None else self.initial_value + max_length = self.max_length - if max_length is None and initial_value is None: - max_length = 0 - initial_value = b"" - elif max_length is None: - max_length = len(initial_value) + if value is None: + if max_length is None: + max_length = 0 + value = b"" + else: + max_length = len(value) + return _bleio.Characteristic.add_to_service( service.bleio_service, self.uuid.bleio_uuid, @@ -149,9 +157,23 @@ def __bind_locally( write_perm=self.write_perm, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, service: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__( + self, service: Service, cls: Optional[Type[Service]] = None + ) -> ReadableBuffer: + ... + def __get__( self, service: Optional[Service], cls: Optional[Type[Service]] = None - ) -> ReadableBuffer: + ) -> Union[Characteristic, ReadableBuffer]: # CircuitPython doesn't invoke descriptor protocol on obj's class, # but CPython does. In the CPython case, pretend that it doesn't. if service is None: @@ -175,10 +197,12 @@ class ComplexCharacteristic: has been bound to the corresponding instance attribute. """ - def __init__( + field_name: str + + def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -186,7 +210,7 @@ def __init__( fixed_length: bool = False, initial_value: Optional[ReadableBuffer] = None, ) -> None: - self.field_name = None # Set by Service during basic binding + # field_name is set by Service during basic binding if uuid: self.uuid = uuid @@ -214,9 +238,23 @@ def bind(self, service: Service) -> _bleio.Characteristic: write_perm=self.write_perm, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, service: None, cls: Optional[Type[Service]] = None + ) -> ComplexCharacteristic: + ... + + @overload + def __get__( + self, service: Service, cls: Optional[Type[Service]] = None + ) -> _bleio.Characteristic: + ... + def __get__( self, service: Optional[Service], cls: Optional[Type[Service]] = None - ) -> _bleio.Characteristic: + ) -> Union[ComplexCharacteristic, _bleio.Characteristic]: if service is None: return self bound_object = self.bind(service) @@ -237,11 +275,11 @@ class StructCharacteristic(Characteristic): :param buf initial_value: see `Characteristic` """ - def __init__( + def __init__( # pylint: disable=too-many-arguments self, struct_format, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -261,9 +299,26 @@ def __init__( write_perm=write_perm, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + # NOTE(elpekenin): we actually return StructCharacteristic, but we hint like + # this so that we dont change parent's function signature, causing a mypy warn + # regardless, this is not wrong, just incomplete, because this is a subclass + ... + + @overload + def __get__( + self, obj: Service, cls: Optional[Type[Service]] = None + ) -> Optional[Tuple[int, ...]]: + ... + def __get__( self, obj: Optional[Service], cls: Optional[Type[Service]] = None - ) -> Optional[Union[Tuple, "StructCharacteristic"]]: + ) -> Union[Characteristic, Optional[Tuple[int, ...]]]: if obj is None: return self raw_data = super().__get__(obj, cls) diff --git a/adafruit_ble/characteristics/float.py b/adafruit_ble/characteristics/float.py index 69915eb..74dcc02 100644 --- a/adafruit_ble/characteristics/float.py +++ b/adafruit_ble/characteristics/float.py @@ -12,16 +12,20 @@ from __future__ import annotations -from . import Attribute -from . import StructCharacteristic +from . import Attribute, StructCharacteristic +TYPE_CHECKING = False try: - from typing import Optional, Type, Union, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional, Type, Union, overload if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + + from adafruit_ble.characteristics import Characteristic from adafruit_ble.services import Service + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -33,10 +37,10 @@ class FloatCharacteristic(StructCharacteristic): """32-bit float""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -53,12 +57,27 @@ def __init__( initial_value=initial_value, ) + if TYPE_CHECKING: + # NOTE(elpekenin): return type doesn't match parent, but that's + # not a problem + @overload # type: ignore[override] + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__(self, obj: Service, cls: Optional[Type[Service]] = None) -> float: + ... + def __get__( self, obj: Optional[Service], cls: Optional[Type[Service]] = None - ) -> Union[float, "FloatCharacteristic"]: + ) -> Union[Characteristic, float]: if obj is None: return self - return super().__get__(obj)[0] + get = super().__get__(obj) + assert get is not None + return get[0] # pylint: disable=unsubscriptable-object - def __set__(self, obj: Service, value: float) -> None: + def __set__(self, obj: Service, value: float) -> None: # type: ignore[override] super().__set__(obj, (value,)) diff --git a/adafruit_ble/characteristics/int.py b/adafruit_ble/characteristics/int.py index 52dbd80..5a71ada 100755 --- a/adafruit_ble/characteristics/int.py +++ b/adafruit_ble/characteristics/int.py @@ -12,16 +12,20 @@ from __future__ import annotations -from . import Attribute -from . import StructCharacteristic +from . import Attribute, StructCharacteristic +TYPE_CHECKING = False try: - from typing import Optional, Type, Union, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional, Type, Union, overload if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + + from adafruit_ble.characteristics import Characteristic from adafruit_ble.services import Service + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -33,13 +37,13 @@ class IntCharacteristic(StructCharacteristic): """Superclass for different kinds of integer fields.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, format_string: str, min_value: int, max_value: int, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -61,14 +65,29 @@ def __init__( initial_value=initial_value, ) + if TYPE_CHECKING: + # NOTE(elpekenin): return type doesn't match parent, but that's + # not a problem + @overload # type: ignore[override] + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__(self, obj: Service, cls: Optional[Type[Service]] = None) -> int: + ... + def __get__( self, obj: Optional[Service], cls: Optional[Type[Service]] = None - ) -> Union[int, "IntCharacteristic"]: + ) -> Union[Characteristic, int]: if obj is None: return self - return super().__get__(obj)[0] + get = super().__get__(obj) + assert get is not None + return get[0] # pylint: disable=unsubscriptable-object - def __set__(self, obj: Service, value: int) -> None: + def __set__(self, obj: Service, value: int) -> None: # type: ignore[override] if not self._min_value <= value <= self._max_value: raise ValueError("out of range") super().__set__(obj, (value,)) diff --git a/adafruit_ble/characteristics/json.py b/adafruit_ble/characteristics/json.py index 2622cfd..c3ffb14 100644 --- a/adafruit_ble/characteristics/json.py +++ b/adafruit_ble/characteristics/json.py @@ -13,16 +13,19 @@ from __future__ import annotations import json -from . import Attribute -from . import Characteristic + +from . import Attribute, Characteristic try: - from typing import Optional, Any, Type, TYPE_CHECKING + from typing import TYPE_CHECKING, Any, Optional, Type, Union if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + from adafruit_ble.services import Service + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -34,10 +37,10 @@ class JSONCharacteristic(Characteristic): """JSON string characteristic for JSON serializable values of a limited size (max_length).""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = Characteristic.READ, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, diff --git a/adafruit_ble/characteristics/stream.py b/adafruit_ble/characteristics/stream.py index 5053971..0cea03c 100755 --- a/adafruit_ble/characteristics/stream.py +++ b/adafruit_ble/characteristics/stream.py @@ -15,18 +15,19 @@ import _bleio -from . import Attribute -from . import Characteristic -from . import ComplexCharacteristic +from . import Attribute, Characteristic, ComplexCharacteristic try: - from typing import Optional, Union, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional, Union if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer + from adafruit_ble.characteristics import Characteristic - from adafruit_ble.uuid import UUID from adafruit_ble.services import Service + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -53,10 +54,10 @@ def write(self, buf: ReadableBuffer) -> None: class StreamOut(ComplexCharacteristic): """Output stream from the Service server.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, timeout: float = 1.0, buffer_size: int = 64, properties: int = Characteristic.NOTIFY, @@ -88,10 +89,10 @@ def bind( class StreamIn(ComplexCharacteristic): """Input stream into the Service server.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, timeout: float = 1.0, buffer_size: int = 64, properties: int = (Characteristic.WRITE | Characteristic.WRITE_NO_RESPONSE), diff --git a/adafruit_ble/characteristics/string.py b/adafruit_ble/characteristics/string.py index 7326bc7..bcd3a44 100755 --- a/adafruit_ble/characteristics/string.py +++ b/adafruit_ble/characteristics/string.py @@ -12,16 +12,19 @@ from __future__ import annotations -from . import Attribute -from . import Characteristic +from . import Attribute, Characteristic +TYPE_CHECKING = False try: - from typing import Optional, Type, Union, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional, Type, Union, overload if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + from adafruit_ble.services import Service + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -33,10 +36,10 @@ class StringCharacteristic(Characteristic): """UTF-8 Encoded string characteristic.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = Characteristic.READ, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -52,9 +55,21 @@ def __init__( initial_value=initial_value, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__(self, obj: Service, cls: Optional[Type[Service]] = None) -> str: + ... + def __get__( self, obj: Optional[Service], cls: Optional[Type[Service]] = None - ) -> Union[str, "StringCharacteristic"]: + ) -> Union[Characteristic, str]: if obj is None: return self return str(super().__get__(obj, cls), "utf-8") @@ -67,7 +82,7 @@ class FixedStringCharacteristic(Characteristic): """Fixed strings are set once when bound and unchanged after.""" def __init__( - self, *, uuid: Optional[UUID] = None, read_perm: int = Attribute.OPEN + self, *, uuid: Optional[Uuid] = None, read_perm: int = Attribute.OPEN ) -> None: super().__init__( uuid=uuid, @@ -77,9 +92,21 @@ def __init__( fixed_length=True, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__(self, obj: Service, cls: Optional[Type[Service]] = None) -> str: + ... + def __get__( - self, obj: Service, cls: Optional[Type[Service]] = None - ) -> Union[str, "FixedStringCharacteristic"]: + self, obj: Optional[Service], cls: Optional[Type[Service]] = None + ) -> Union[Characteristic, str]: if obj is None: return self return str(super().__get__(obj, cls), "utf-8") diff --git a/adafruit_ble/services/__init__.py b/adafruit_ble/services/__init__.py index 2fd0789..dd5ff00 100644 --- a/adafruit_ble/services/__init__.py +++ b/adafruit_ble/services/__init__.py @@ -15,10 +15,15 @@ from ..characteristics import Characteristic, ComplexCharacteristic try: - from typing import Optional + from typing import TYPE_CHECKING, Dict, Optional, Union except ImportError: pass +if TYPE_CHECKING: + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] + __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" @@ -36,6 +41,8 @@ class Service: instance for the connection's peer. """ + uuid: Uuid + def __init__( self, *, @@ -44,6 +51,7 @@ def __init__( **initial_values, ) -> None: if service is None: + # NOTE(elpekenin): when is self.uuid set? __new__? # pylint: disable=no-member self.bleio_service = _bleio.Service( self.uuid.bleio_uuid, secondary=secondary @@ -56,7 +64,7 @@ def __init__( # This internal dictionary is manipulated by the Characteristic descriptors to store their # per-Service state. It is NOT managed by the Service itself. It is an attribute of the # Service so that the lifetime of the objects is the same as the Service. - self.bleio_characteristics = {} + self.bleio_characteristics: Dict[str, _bleio.Characteristic] = {} # Set the field name on all of the characteristic objects so they can replace themselves if # they choose. diff --git a/adafruit_ble/services/circuitpython.py b/adafruit_ble/services/circuitpython.py index 3d58b88..0eb14e1 100755 --- a/adafruit_ble/services/circuitpython.py +++ b/adafruit_ble/services/circuitpython.py @@ -10,11 +10,11 @@ """ -from . import Service from ..characteristics import Characteristic from ..characteristics.stream import StreamOut from ..characteristics.string import StringCharacteristic from ..uuid import VendorUUID +from . import Service __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" diff --git a/adafruit_ble/services/nordic.py b/adafruit_ble/services/nordic.py index d4db39b..ae1ca6d 100755 --- a/adafruit_ble/services/nordic.py +++ b/adafruit_ble/services/nordic.py @@ -13,16 +13,16 @@ from __future__ import annotations -from . import Service +from ..characteristics.stream import StreamIn, StreamOut from ..uuid import VendorUUID -from ..characteristics.stream import StreamOut, StreamIn +from . import Service try: - from typing import Optional, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: - from circuitpython_typing import WriteableBuffer, ReadableBuffer import _bleio + from circuitpython_typing import ReadableBuffer, WriteableBuffer except ImportError: pass diff --git a/adafruit_ble/services/sphero.py b/adafruit_ble/services/sphero.py index 2c34d6f..0fa3337 100644 --- a/adafruit_ble/services/sphero.py +++ b/adafruit_ble/services/sphero.py @@ -10,8 +10,8 @@ """ -from . import Service from ..uuid import VendorUUID +from . import Service __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" diff --git a/adafruit_ble/services/standard/__init__.py b/adafruit_ble/services/standard/__init__.py index 91fe0bb..8f445fe 100644 --- a/adafruit_ble/services/standard/__init__.py +++ b/adafruit_ble/services/standard/__init__.py @@ -12,12 +12,11 @@ import time -from .. import Service -from ...uuid import StandardUUID -from ...characteristics import Characteristic -from ...characteristics.string import StringCharacteristic -from ...characteristics import StructCharacteristic +from ...characteristics import Characteristic, StructCharacteristic from ...characteristics.int import Uint8Characteristic +from ...characteristics.string import StringCharacteristic +from ...uuid import StandardUUID +from .. import Service __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" @@ -78,7 +77,12 @@ def struct_time(self) -> time.struct_time: """The current time as a `time.struct_time`. Day of year and whether DST is in effect are always -1. """ - year, month, day, hour, minute, second, weekday, _, _ = self.current_time + current_time = self.current_time + if current_time is None: + msg = "Could not get current time." + raise RuntimeError(msg) + + year, month, day, hour, minute, second, weekday, _, _ = current_time # Bluetooth weekdays count from 1. struct_time counts from 0. return time.struct_time( (year, month, day, hour, minute, second, weekday - 1, -1, -1) diff --git a/adafruit_ble/services/standard/device_info.py b/adafruit_ble/services/standard/device_info.py index 0ebd8ac..2f30008 100644 --- a/adafruit_ble/services/standard/device_info.py +++ b/adafruit_ble/services/standard/device_info.py @@ -15,12 +15,12 @@ import os import sys -from .. import Service -from ...uuid import StandardUUID from ...characteristics.string import FixedStringCharacteristic +from ...uuid import StandardUUID +from .. import Service try: - from typing import Optional, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: import _bleio @@ -43,7 +43,7 @@ class DeviceInfoService(Service): software_revision = FixedStringCharacteristic(uuid=StandardUUID(0x2A28)) manufacturer = FixedStringCharacteristic(uuid=StandardUUID(0x2A29)) - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, manufacturer: Optional[str] = None, diff --git a/adafruit_ble/services/standard/hid.py b/adafruit_ble/services/standard/hid.py index 6409f21..c5f46c0 100755 --- a/adafruit_ble/services/standard/hid.py +++ b/adafruit_ble/services/standard/hid.py @@ -16,18 +16,37 @@ import struct -from micropython import const import _bleio +from micropython import const -from adafruit_ble.characteristics import Attribute -from adafruit_ble.characteristics import Characteristic +from adafruit_ble.characteristics import Attribute, Characteristic from adafruit_ble.characteristics.int import Uint8Characteristic from adafruit_ble.uuid import StandardUUID from .. import Service try: - from typing import Dict, Optional + from typing import TYPE_CHECKING, Dict, List, Optional, Union + + from typing_extensions import Literal + + if TYPE_CHECKING: + from typing import TypedDict + + from circuitpython_typing import ReadableBuffer + + class Collection(TypedDict, total=False): + """Stub for type checker.""" + + # NOTE(elpekenin): total=False means not all keys are required + # however i'm pretty sure type,locals,globals are always present + # may do that with inheritance, but doesnt seem worth + tag: Union[Literal["input"], Literal["output"]] + type: ReadableBuffer + locals: List[ReadableBuffer] + globals: List[ReadableBuffer] + mains: List[Collection] + except ImportError: pass @@ -173,7 +192,7 @@ class ReportIn: uuid = StandardUUID(_REPORT_UUID_NUM) - def __init__( + def __init__( # pylint: disable=too-many-arguments self, service: Service, report_id: int, @@ -203,7 +222,7 @@ def __init__( initial_value=struct.pack(" None: + def send_report(self, report: bytearray) -> None: """Send a report to the peers""" self._characteristic.value = report @@ -214,7 +233,7 @@ class ReportOut: # pylint: disable=too-few-public-methods uuid = StandardUUID(_REPORT_UUID_NUM) - def __init__( + def __init__( # pylint: disable=too-many-arguments self, service: Service, report_id: int, @@ -249,7 +268,7 @@ def __init__( ) @property - def report(self) -> Dict: + def report(self) -> bytearray: """The HID OUT report""" return self._characteristic.value @@ -357,12 +376,12 @@ def __init__( def _init_devices(self) -> None: # pylint: disable=too-many-branches,too-many-statements,too-many-locals - self.devices = [] + self.devices: List[Union[ReportIn, ReportOut]] = [] hid_descriptor = self.report_map global_table = [None] * 10 local_table = [None] * 3 - collections = [] + collections: List[Collection] = [] top_level_collections = [] i = 0 @@ -417,7 +436,7 @@ def _init_devices(self) -> None: i += size - def get_report_info(collection: Dict, reports: Dict) -> None: + def get_report_info(collection: Collection, reports: Dict) -> None: """Gets info about hid reports""" for main in collection["mains"]: if "type" in main: @@ -440,7 +459,7 @@ def get_report_info(collection: Dict, reports: Dict) -> None: ) usage_page = collection["globals"][0][0] usage = collection["locals"][0][0] - reports = {} + reports: Dict[int, Dict] = {} get_report_info(collection, reports) if len(reports) > 1: raise NotImplementedError( diff --git a/docs/conf.py b/docs/conf.py index f19b877..a49c58f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -4,9 +4,9 @@ # # SPDX-License-Identifier: MIT +import datetime import os import sys -import datetime sys.path.insert(0, os.path.abspath("..")) @@ -123,7 +123,7 @@ # -- Options for LaTeX output --------------------------------------------- -latex_elements = { +latex_elements: dict[str, str] = { # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', diff --git a/examples/ble_advertising_simpletest.py b/examples/ble_advertising_simpletest.py index 2c2815e..7a98199 100644 --- a/examples/ble_advertising_simpletest.py +++ b/examples/ble_advertising_simpletest.py @@ -6,7 +6,6 @@ """ from adafruit_ble import BLERadio - from adafruit_ble.advertising import Advertisement ble = BLERadio() diff --git a/examples/ble_bluefruit_color_picker.py b/examples/ble_bluefruit_color_picker.py index 4facbae..4cf7fdc 100755 --- a/examples/ble_bluefruit_color_picker.py +++ b/examples/ble_bluefruit_color_picker.py @@ -5,9 +5,8 @@ import board import neopixel - -from adafruit_bluefruit_connect.packet import Packet from adafruit_bluefruit_connect.color_packet import ColorPacket +from adafruit_bluefruit_connect.packet import Packet from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement diff --git a/examples/ble_bluefruit_connect_plotter.py b/examples/ble_bluefruit_connect_plotter.py index 542c714..1839d48 100755 --- a/examples/ble_bluefruit_connect_plotter.py +++ b/examples/ble_bluefruit_connect_plotter.py @@ -4,9 +4,11 @@ # CircuitPython Bluefruit LE Connect Plotter Example import time -import board -import analogio + import adafruit_thermistor +import analogio +import board + from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement from adafruit_ble.services.nordic import UARTService diff --git a/examples/ble_color_proximity.py b/examples/ble_color_proximity.py index 31c56d3..463aeb7 100644 --- a/examples/ble_color_proximity.py +++ b/examples/ble_color_proximity.py @@ -8,9 +8,9 @@ """ import time + import board import digitalio - import neopixel from adafruit_ble import BLERadio @@ -73,10 +73,13 @@ else: closest = None closest_rssi = -80 - closest_last_time = 0 + closest_last_time: float = 0 print("Scanning for colors") while not slide_switch.value: for entry in ble.start_scan(AdafruitColor, minimum_rssi=-100, timeout=1): + assert isinstance(entry, AdafruitColor) + assert entry.rssi is not None + if slide_switch.value: break now = time.monotonic() diff --git a/examples/ble_current_time_service.py b/examples/ble_current_time_service.py index a62dfd0..58213b3 100644 --- a/examples/ble_current_time_service.py +++ b/examples/ble_current_time_service.py @@ -7,6 +7,7 @@ """ import time + import adafruit_ble from adafruit_ble.advertising.standard import SolicitServicesAdvertisement from adafruit_ble.services.standard import CurrentTimeService @@ -14,6 +15,7 @@ radio = adafruit_ble.BLERadio() a = SolicitServicesAdvertisement() a.complete_name = "TimePlease" +assert a.solicited_services a.solicited_services.append(CurrentTimeService) radio.start_advertising(a) @@ -24,10 +26,15 @@ while radio.connected: for connection in radio.connections: + assert connection is not None + if not connection.paired: connection.pair() print("paired") + cts = connection[CurrentTimeService] + assert isinstance(cts, CurrentTimeService) + print(cts.current_time) time.sleep(1) diff --git a/examples/ble_demo_central.py b/examples/ble_demo_central.py index b7510ac..571f5e5 100644 --- a/examples/ble_demo_central.py +++ b/examples/ble_demo_central.py @@ -9,12 +9,11 @@ import time +import adafruit_lis3dh import board import busio import digitalio -import adafruit_lis3dh import neopixel - from adafruit_bluefruit_connect.color_packet import ColorPacket from adafruit_ble import BLERadio @@ -42,6 +41,8 @@ def scale(value): # See if any existing connections are providing UARTService. if ble.connected: for connection in ble.connections: + assert connection is not None + if UARTService in connection: uart_connection = connection break @@ -50,6 +51,7 @@ def scale(value): if not uart_connection: print("Scanning...") for adv in ble.start_scan(ProvideServicesAdvertisement, timeout=5): + assert isinstance(adv, ProvideServicesAdvertisement) if UARTService in adv.services: print("found a UARTService advertisement") uart_connection = ble.connect(adv) @@ -64,7 +66,10 @@ def scale(value): neopixels.fill(color) color_packet = ColorPacket(color) try: - uart_connection[UARTService].write(color_packet.to_bytes()) + service = uart_connection[UARTService] + assert isinstance(service, UARTService) + + service.write(color_packet.to_bytes()) except OSError: try: uart_connection.disconnect() diff --git a/examples/ble_demo_periph.py b/examples/ble_demo_periph.py index a9b5f92..bc45e7b 100644 --- a/examples/ble_demo_periph.py +++ b/examples/ble_demo_periph.py @@ -8,10 +8,10 @@ import board import neopixel +from adafruit_bluefruit_connect.color_packet import ColorPacket # Only the packet classes that are imported will be known to Packet. from adafruit_bluefruit_connect.packet import Packet -from adafruit_bluefruit_connect.color_packet import ColorPacket from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement diff --git a/examples/ble_detailed_scan.py b/examples/ble_detailed_scan.py index 7f5dfeb..cad9a3a 100644 --- a/examples/ble_detailed_scan.py +++ b/examples/ble_detailed_scan.py @@ -6,7 +6,6 @@ # specialty advertising types. from adafruit_ble import BLERadio - from adafruit_ble.advertising import Advertisement from adafruit_ble.advertising.standard import ProvideServicesAdvertisement diff --git a/examples/ble_device_info_service.py b/examples/ble_device_info_service.py index 34ac32b..466581e 100644 --- a/examples/ble_device_info_service.py +++ b/examples/ble_device_info_service.py @@ -7,6 +7,7 @@ """ import time + import adafruit_ble from adafruit_ble.advertising.standard import Advertisement from adafruit_ble.services.standard.device_info import DeviceInfoService @@ -28,10 +29,15 @@ while radio.connected: for connection in radio.connections: + assert connection is not None + if not connection.paired: connection.pair() print("paired") + dis = connection[DeviceInfoService] + assert isinstance(dis, DeviceInfoService) + print(dis.manufacturer) print(dis.model_number) time.sleep(60) diff --git a/examples/ble_hid_periph.py b/examples/ble_hid_periph.py index 6b8053a..51dea4d 100644 --- a/examples/ble_hid_periph.py +++ b/examples/ble_hid_periph.py @@ -15,9 +15,8 @@ import adafruit_ble from adafruit_ble.advertising import Advertisement from adafruit_ble.advertising.standard import ProvideServicesAdvertisement -from adafruit_ble.services.standard.hid import HIDService from adafruit_ble.services.standard.device_info import DeviceInfoService - +from adafruit_ble.services.standard.hid import HIDService # Use default HID descriptor hid = HIDService() @@ -30,8 +29,9 @@ ble = adafruit_ble.BLERadio() if ble.connected: - for c in ble.connections: - c.disconnect() + for conn in ble.connections: + if conn is not None: + conn.disconnect() print("advertising") ble.start_advertising(advertisement, scan_response) @@ -43,9 +43,9 @@ pass print("Start typing:") while ble.connected: - c = sys.stdin.read(1) - sys.stdout.write(c) - kl.write(c) + char = sys.stdin.read(1) + sys.stdout.write(char) + kl.write(char) # print("sleeping") time.sleep(0.1) ble.start_advertising(advertisement) diff --git a/examples/ble_json_central.py b/examples/ble_json_central.py index 654f7f4..16ac489 100644 --- a/examples/ble_json_central.py +++ b/examples/ble_json_central.py @@ -5,10 +5,10 @@ # Read sensor readings from peripheral BLE device using a JSON characteristic. from ble_json_service import SensorService + from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement - ble = BLERadio() connection = None @@ -16,6 +16,7 @@ if not connection: print("Scanning for BLE device advertising our sensor service...") for adv in ble.start_scan(ProvideServicesAdvertisement): + assert isinstance(adv, ProvideServicesAdvertisement) if SensorService in adv.services: connection = ble.connect(adv) print("Connected") @@ -24,6 +25,8 @@ if connection and connection.connected: service = connection[SensorService] + assert isinstance(service, SensorService) + service.settings = {"unit": "celsius"} # 'fahrenheit' while connection.connected: print("Sensors: ", service.sensors) diff --git a/examples/ble_json_peripheral.py b/examples/ble_json_peripheral.py index e079a8a..14d13fb 100644 --- a/examples/ble_json_peripheral.py +++ b/examples/ble_json_peripheral.py @@ -4,13 +4,14 @@ # Provide readable sensor values and writable settings to connected devices via JSON characteristic. -import time import random +import time + from ble_json_service import SensorService + from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement - # Create BLE radio, custom service, and advertisement. ble = BLERadio() service = SensorService() diff --git a/examples/ble_json_service.py b/examples/ble_json_service.py index 6351564..96a8d41 100644 --- a/examples/ble_json_service.py +++ b/examples/ble_json_service.py @@ -4,10 +4,10 @@ # Read sensor readings from peripheral BLE device using a JSON characteristic. -from adafruit_ble.uuid import VendorUUID -from adafruit_ble.services import Service from adafruit_ble.characteristics import Characteristic from adafruit_ble.characteristics.json import JSONCharacteristic +from adafruit_ble.services import Service +from adafruit_ble.uuid import VendorUUID # A custom service with two JSON characteristics for this device. The "sensors" characteristic diff --git a/examples/ble_packet_buffer_client.py b/examples/ble_packet_buffer_client.py index 158ac61..fc21398 100644 --- a/examples/ble_packet_buffer_client.py +++ b/examples/ble_packet_buffer_client.py @@ -17,13 +17,21 @@ buf = bytearray(512) while True: while ble.connected and any( - PacketBufferService in connection for connection in ble.connections + map( + lambda conn: conn is not None and PacketBufferService in conn, + ble.connections, + ) ): for connection in ble.connections: + assert connection is not None + if PacketBufferService not in connection: continue print("echo") + pb = connection[PacketBufferService] + assert isinstance(pb, PacketBufferService) + pb.write(b"echo") # Returns 0 if nothing was read. packet_len = pb.readinto(buf) @@ -34,6 +42,7 @@ print("disconnected, scanning") for advertisement in ble.start_scan(ProvideServicesAdvertisement, timeout=1): + assert isinstance(advertisement, ProvideServicesAdvertisement) if PacketBufferService not in advertisement.services: continue ble.connect(advertisement) diff --git a/examples/ble_packet_buffer_service.py b/examples/ble_packet_buffer_service.py index 510e4f0..a3c53aa 100644 --- a/examples/ble_packet_buffer_service.py +++ b/examples/ble_packet_buffer_service.py @@ -8,12 +8,12 @@ import _bleio -from adafruit_ble.services import Service from adafruit_ble.characteristics import ( Attribute, Characteristic, ComplexCharacteristic, ) +from adafruit_ble.services import Service from adafruit_ble.uuid import VendorUUID @@ -30,7 +30,7 @@ def __init__(self, uuid16): class PacketBufferCharacteristic(ComplexCharacteristic): - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, uuid=None, diff --git a/examples/ble_packet_buffer_test.py b/examples/ble_packet_buffer_test.py index 66c986f..3326ec5 100644 --- a/examples/ble_packet_buffer_test.py +++ b/examples/ble_packet_buffer_test.py @@ -11,7 +11,6 @@ from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement - ble = BLERadio() pbs = PacketBufferService() advertisement = ProvideServicesAdvertisement(pbs) diff --git a/examples/ble_simpletest.py b/examples/ble_simpletest.py index 03a44e2..5894c96 100644 --- a/examples/ble_simpletest.py +++ b/examples/ble_simpletest.py @@ -7,12 +7,13 @@ """ from adafruit_ble import BLERadio +from adafruit_ble.advertising import Advertisement ble = BLERadio() print("scanning") found = set() scan_responses = set() -for advertisement in ble.start_scan(): +for advertisement in ble.start_scan(Advertisement): addr = advertisement.address if advertisement.scan_response and addr not in scan_responses: scan_responses.add(addr) diff --git a/examples/ble_uart_echo_client.py b/examples/ble_uart_echo_client.py index 48a0f23..b5f7757 100644 --- a/examples/ble_uart_echo_client.py +++ b/examples/ble_uart_echo_client.py @@ -14,13 +14,18 @@ ble = BLERadio() while True: while ble.connected and any( - UARTService in connection for connection in ble.connections + map(lambda conn: conn is not None and UARTService in conn, ble.connections) ): for connection in ble.connections: + assert connection is not None + if UARTService not in connection: continue print("echo") + uart = connection[UARTService] + assert isinstance(uart, UARTService) + uart.write(b"echo") # Returns b'' if nothing was read. one_byte = uart.read(4) @@ -31,6 +36,7 @@ print("disconnected, scanning") for advertisement in ble.start_scan(ProvideServicesAdvertisement, timeout=1): + assert isinstance(advertisement, ProvideServicesAdvertisement) if UARTService not in advertisement.services: continue ble.connect(advertisement) diff --git a/optional_requirements.txt b/optional_requirements.txt index d4e27c4..7a17dbd 100644 --- a/optional_requirements.txt +++ b/optional_requirements.txt @@ -1,3 +1,6 @@ # SPDX-FileCopyrightText: 2022 Alec Delaney, for Adafruit Industries # # SPDX-License-Identifier: Unlicense + +circuitpython-stubs +mypy