Skip to content

Commit

Permalink
feat(rockpi): add GPIO support
Browse files Browse the repository at this point in the history
- Install libmraa from Radxa APT.
- Monkey patch gpiozero.LED to be used with mraa.
- Create custom button class that uses mraa because
gpiozero.Button could not easily be monkey patched.
  • Loading branch information
marvinmarnold committed Dec 7, 2021
1 parent 96ccd23 commit cf305ef
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 48 deletions.
19 changes: 14 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,20 @@ ENV PYTHONPATH="/opt:$PYTHONPATH"
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# START DEBUGGING
# Uncomment the lines below to mock parts of the configuration
# COPY example/ example/
# ENV ETH0_MAC_ADDRESS_PATH=/opt/example/eth0_mac_address.txt
# END DEBUGGING
# hadolint ignore=DL3008, DL4006
RUN export DISTRO=buster-testing && \
echo "deb http://apt.radxa.com/$DISTRO/ ${DISTRO%-*} main" | tee -a /etc/apt/sources.list.d/apt-radxa-com.list && \
wget -nv -O - apt.radxa.com/$DISTRO/public.key | apt-key add - && \
apt-get update && \
apt-get install --no-install-recommends -y libmraa && \
apt-get autoremove -y && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# This is the libmraa install location, because we are using venv
# it must be added to path explicitly
ENV PYTHONPATH="$PYTHONPATH:/usr/local/lib/python3.7/dist-packages"

# Run start-gateway-config script
ENTRYPOINT ["/opt/start-gateway-config.sh"]

6 changes: 3 additions & 3 deletions gatewayconfig/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from gatewayconfig.gatewayconfig_app import GatewayconfigApp
from gatewayconfig.logger import get_logger

logger = get_logger(__name__)
LOGGER = get_logger(__name__)
VARIANT = os.getenv('VARIANT')
# SENTRY_CONFIG currently being used in production
SENTRY_DSN = os.getenv('SENTRY_CONFIG') # https://docs.sentry.io/product/sentry-basics/dsn-explainer/
Expand Down Expand Up @@ -31,7 +31,7 @@ def main():


def validate_env():
logger.debug("Starting with the following ENV:\n\
LOGGER.debug("Starting with the following ENV:\n\
SENTRY_DSN=%s\n\
BALENA_APP_NAME=%s\n\
BALENA_DEVICE_UUID=%s\n\
Expand Down Expand Up @@ -69,7 +69,7 @@ def start():
try:
config_app.start()
except Exception:
logger.exception('__main__ failed for unknown reason')
LOGGER.exception('__main__ failed for unknown reason')
config_app.stop()


Expand Down
60 changes: 39 additions & 21 deletions gatewayconfig/gatewayconfig_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import threading
from gpiozero import Button, LED

from hm_pyhelper.hardware_definitions import variant_definitions, is_raspberry_pi
from hm_pyhelper.hardware_definitions import is_rockpi, is_raspberry_pi, \
variant_definitions

from gatewayconfig.logger import get_logger
from gatewayconfig.processors.bluetooth_services_processor import BluetoothServicesProcessor
Expand All @@ -12,11 +13,13 @@
from gatewayconfig.processors.bluetooth_advertisement_processor import BluetoothAdvertisementProcessor
from gatewayconfig.gatewayconfig_shared_state import GatewayconfigSharedState
from gatewayconfig.file_loader import read_eth0_mac_address, read_wlan0_mac_address
from gatewayconfig.gpio.mraa_button import MraaButton
from gatewayconfig.gpio.mraa_led import MraaLED
import gatewayconfig.nmcli_custom as nmcli_custom


USER_BUTTON_HOLD_SECONDS = 2
logger = get_logger(__name__)
LOGGER = get_logger(__name__)


class GatewayconfigApp:
Expand All @@ -30,11 +33,11 @@ def __init__(self, sentry_dsn, balena_app_name, balena_device_uuid, variant, eth
self.init_sentry(sentry_dsn, balena_app_name, balena_device_uuid, variant)
self.shared_state = GatewayconfigSharedState()
self.init_nmcli()
self.init_gpio()
self.init_gpio(variant)

eth0_mac_address = read_eth0_mac_address(eth0_mac_address_filepath)
wlan0_mac_address = read_wlan0_mac_address(wlan0_mac_address_filepath)
logger.debug("Read eth0 mac address %s and wlan0 %s" % (eth0_mac_address, wlan0_mac_address))
LOGGER.debug("Read eth0 mac address %s and wlan0 %s" % (eth0_mac_address, wlan0_mac_address))
self.shared_state.load_public_key()

self.bluetooth_services_processor = BluetoothServicesProcessor(
Expand All @@ -55,23 +58,22 @@ def __init__(self, sentry_dsn, balena_app_name, balena_device_uuid, variant, eth
)

def start(self):
logger.debug("Starting ConfigApp")
LOGGER.debug("Starting ConfigApp")
try:
self.start_threads()

except KeyboardInterrupt:
logger.debug("KEYBOAD INTERRUPTION")
LOGGER.debug("KEYBOAD INTERRUPTION")
self.stop()

except Exception:
logger.exception('GatewayConfigApp failed for unknown reason')
LOGGER.exception('GatewayConfigApp failed for unknown reason')
self.stop()

def stop(self):
logger.debug("Stopping ConfigApp")
if is_raspberry_pi():
self.user_button.close()
self.status_led.close()
LOGGER.debug("Stopping ConfigApp")
self.user_button.close()
self.status_led.close()
# Quits the cputemp application
self.bluetooth_services_processor.quit()

Expand All @@ -83,15 +85,25 @@ def init_sentry(self, sentry_dsn, balena_app_name, balena_device_uuid, variant):
def init_nmcli(self):
nmcli_custom.disable_use_sudo()

def init_gpio(self):
def init_gpio(self, variant):
"""
This code was originally written for Raspberry Pi but ROCK Pi does not
support gpiozero. Custom GPIO implementations for ROCK Pi are used based
on the detected hardware.
"""
if is_raspberry_pi():
self.user_button = Button(self.get_button_pin(), hold_time=USER_BUTTON_HOLD_SECONDS)
self.user_button.when_held = self.start_bluetooth_advertisement
self.status_led = LED(self.get_status_led_pin())
self.user_button = Button(self.get_button_gpio(), hold_time=USER_BUTTON_HOLD_SECONDS)
self.status_led = LED(self.get_status_led_gpio())
elif is_rockpi():
self.user_button = MraaButton(self.get_button_pin(), hold_seconds=USER_BUTTON_HOLD_SECONDS)
self.user_button.start()
self.status_led = MraaLED(self.get_status_led_pin())
else:
logger.warn("LEDs and buttons are disabled. GPIO not yet supported on this device.")
self.user_button = None
self.status_led = None
LOGGER.warn("LEDs and buttons are disabled. "
"GPIO not yet supported on this device: %s"
% variant)

self.user_button.when_held = self.start_bluetooth_advertisement

# Use daemon threads so that everything exists cleanly when the program stops
def start_threads(self):
Expand All @@ -117,11 +129,17 @@ def start_threads(self):
self.bluetooth_advertisement_thread.start()

def start_bluetooth_advertisement(self):
logger.debug("Starting bluetooth advertisement")
LOGGER.debug("Starting bluetooth advertisement")
self.shared_state.should_advertise_bluetooth = True

def get_button_pin(self):
def get_button_gpio(self):
return self.variant_details['BUTTON']

def get_status_led_pin(self):
def get_status_led_gpio(self):
return self.variant_details['STATUS']

def get_button_pin(self):
return self.variant_details['GPIO_PIN_BUTTON']

def get_status_led_pin(self):
return self.variant_details['GPIO_PIN_LED']
91 changes: 91 additions & 0 deletions gatewayconfig/gpio/mraa_button.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import time
import threading

from hm_pyhelper.logger import get_logger
from gatewayconfig.gpio.mraa_gpio import init_mraa_pin


LOGGER = get_logger(__name__)


# How often to check for button presses
MONITOR_CHECK_INTERVAL_SECONDS = 0.05


class MraaButton(threading.Thread):
"""
Unlike MraaLED, the button could not be easily monkey patched.
Instead, basic button logic is implemented in its entirety.
MraaButton will trigger #when_held if a button is pressed for
hold_seconds. #when_held will only fire once if the button
remains held down. If a button is pressed down for shorter,
than hold_seconds, #when_held is not called.
"""

def __init__(self, pin_number, hold_seconds):
"""
pin_number refers to the pin number, not GPIO number:
https://wiki.radxa.com/Rockpi4/hardware/gpio#GPIO_number
"""

super(MraaButton, self).__init__()
LOGGER.info("Initializing mraa button on pin %s"
% pin_number)
self.daemon = True
self.cancelled = False
self.mraa_pin = init_mraa_pin(pin_number, False)
self.hold_seconds = hold_seconds
self.when_held = None
self.reset_pressed_state()

def run(self):
while not self.cancelled:
if not self.is_pressed():
self.reset_pressed_state()
else:
self.process_press()

time.sleep(MONITOR_CHECK_INTERVAL_SECONDS)

def process_press(self):
"""
If this is the first time this press has been detected,
record the time for future reference. If the press has
already been detected, attempt to trigger #when_held.
"""

if self.last_pressed_at is None:
LOGGER.debug("Button pressed for first time")
self.last_pressed_at = time.time()
else:
self.trigger_when_held_after_hold_seconds()

def trigger_when_held_after_hold_seconds(self):
"""
If #when_held has not been invoked due to this press,
and it is defined, invoke it.
"""

if self.when_held and not self.is_press_already_registered:
if self.have_hold_seconds_elapsed():
LOGGER.debug("Button pressed down for "
"`hold_seconds` (%s secs)"
% self.hold_seconds)
self.is_press_already_registered = True
self.when_held()

def have_hold_seconds_elapsed(self):
elapsed_seconds = time.time() - self.last_pressed_at
return elapsed_seconds >= self.hold_seconds

def is_pressed(self):
return self.mraa_pin.read() == 0

def reset_pressed_state(self):
self.last_pressed_at = None
self.is_press_already_registered = False

def close(self):
self.cancelled = True
self.mraa_pin.write(0)
57 changes: 57 additions & 0 deletions gatewayconfig/gpio/mraa_gpio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import subprocess # nosec
from hm_pyhelper.logger import get_logger

LOGGER = get_logger(__name__)

try:
import mraa
except Exception:
LOGGER.warn("mraa not loaded. If you see this message outside of unit "
"tests, this may cause a problem.")


def init_mraa_pin(pin_number, is_input):
"""
Instantiate a new mraa.Gpio instance on `pin`.
Pull the pin high before returning.
Defining this method outside MraaButton
so that it is easier to mock from tests.
"""

mraa_pin = mraa.Gpio(pin_number)
if is_input:
mraa_dir = mraa.DIR_IN
else:
mraa_dir = mraa.DIR_OUT

mraa_pin.dir(mraa_dir)
# pull up before usage
mraa_gpio_write(pin_number, 1)
return mraa_pin


def mraa_gpio_write(pin_number, val):
"""
Some aspect of initialization prevents mraa.Gpio.write
from having any effect. It is necessary to pull-up the
GPIO before using it but this is not possible using the
mraa Python library. In fact similar behavior is
observed interacting with the filesystem directly.
```
# echo 154 > /sys/class/gpio/export
# echo 1 > /sys/class/gpio/gpio154/value
bash: echo: write error: Operation not permitted
```
Using `mraa-gpio set PIN 1` solves the issue. After
invoking that command, the GPIO is then writable
normally.
This logic cannot be easily added to
start-gateway-config.sh because it would need to
include logic that replicates pyhelper methods
like is_raspberry_pi and hardware_definitions.
"""

subprocess.check_call(['mraa-gpio', 'set', str(pin_number), str(val)]) # nosec NOSONAR
25 changes: 25 additions & 0 deletions gatewayconfig/gpio/mraa_led.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from gpiozero import LED
from gpiozero.pins.mock import MockFactory

from hm_pyhelper.logger import get_logger
from gatewayconfig.gpio.mraa_gpio import init_mraa_pin

LOGGER = get_logger(__name__)


class MraaLED(LED):
""""
Extend gpiozero.LED and override the write method to interact with libmraa.
"""

def __init__(self, pin_number=None, *, active_high=True, initial_value=False):
super().__init__(pin_number, active_high=active_high,
initial_value=initial_value, pin_factory=MockFactory())

LOGGER.info("Initializing mraa LED on pin %s"
% pin_number)

self.mraa_pin = init_mraa_pin(pin_number, True)

def _write(self, value):
self.mraa_pin.write(value)
9 changes: 4 additions & 5 deletions gatewayconfig/processors/led_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from time import sleep
from gpiozero import LED

from hm_pyhelper.hardware_definitions import is_raspberry_pi
from hm_pyhelper.hardware_definitions import is_rockpi, is_raspberry_pi
from gatewayconfig.gatewayconfig_shared_state import GatewayconfigSharedState
from gatewayconfig.logger import get_logger

Expand All @@ -17,13 +16,13 @@ def __init__(self, status_led: LED, shared_state: GatewayconfigSharedState):
def run(self):
LOGGER.debug("LED LEDProcessor")

if is_raspberry_pi():
if is_raspberry_pi() or is_rockpi():
while True:
# Blink fast if diagnostics are not OK
if(self.shared_state.are_diagnostics_ok is False):
if(not self.shared_state.are_diagnostics_ok):
self.status_led.blink(0.1, 0.1, 10, False)
# Blink slow if advertising bluetooth
elif(self.shared_state.is_advertising_bluetooth is True):
elif(self.shared_state.is_advertising_bluetooth):
self.status_led.blink(1, 1, 1, False)
# Solid if diagnostics are OK and not advertising
else:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ colorzero==2.0
dbus-python==1.2.16
gpiozero==1.6.2
h3==3.7.2
hm-pyhelper==0.11.11
hm-pyhelper==0.11.14
nmcli==0.5.0
protobuf==3.15.6
pycairo==1.20.1
Expand Down
Loading

0 comments on commit cf305ef

Please sign in to comment.