Skip to content

Commit

Permalink
Added support for FreeBSD
Browse files Browse the repository at this point in the history
As developed and designed for use on RBPi, all read and write ops are
implemented using the I2CRDWR syscall, it beeing the only one implemented
in the bcm2835_bsc driver.
  • Loading branch information
0xc0decafe committed May 31, 2022
1 parent 9f9af5a commit c65d97c
Showing 1 changed file with 138 additions and 65 deletions.
203 changes: 138 additions & 65 deletions smbus2/smbus2.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import os
import sys
from platform import system
from fcntl import ioctl
from ctypes import c_uint32, c_uint8, c_uint16, c_char, POINTER, Structure, Array, Union, create_string_buffer, string_at

Expand Down Expand Up @@ -49,6 +50,9 @@
I2C_SMBUS_I2C_BLOCK_DATA = 8
I2C_SMBUS_BLOCK_MAX = 32

#FreeBSD RDWR syscall
I2CRDWR = 0x80106906

# To determine what functionality is present (uapi/linux/i2c.h)
try:
from enum import IntFlag
Expand Down Expand Up @@ -200,6 +204,8 @@ def read(address, length):
:rtype: :py:class:`i2c_msg`
"""
arr = create_string_buffer(length)
if system() == 'FreeBSD':
address = address << 1 | I2C_M_RD
return i2c_msg(
addr=address, flags=I2C_M_RD, len=length,
buf=arr)
Expand All @@ -225,6 +231,8 @@ def write(address, buf):
if type(buf) is not str:
buf = ''.join([chr(x) for x in buf])
arr = create_string_buffer(buf, len(buf))
if system() == 'FreeBSD':
address = address << 1
return i2c_msg(
addr=address, flags=0, len=len(arr),
buf=arr)
Expand Down Expand Up @@ -301,7 +309,10 @@ def open(self, bus):
:raise TypeError: if type(bus) is not in (int, str)
"""
if isinstance(bus, int):
filepath = "/dev/i2c-{}".format(bus)
if system() == 'FreeBSD':
filepath = "/dev/iic{}".format(bus)
else:
filepath = "/dev/i2c-{}".format(bus)
elif isinstance(bus, str):
filepath = bus
else:
Expand Down Expand Up @@ -346,6 +357,8 @@ def _set_address(self, address, force=None):
:param force:
:type force: Boolean
"""
if system() == 'FreeBSD':
return
force = force if force is not None else self.force
if self.address != address or self._force_last != force:
if force is True:
Expand All @@ -361,6 +374,8 @@ def _get_funcs(self):
:rtype: int
"""
if system() == 'FreeBSD':
return 1
f = c_uint32()
ioctl(self.fd, I2C_FUNCS, f)
return f.value
Expand All @@ -373,6 +388,8 @@ def write_quick(self, i2c_addr, force=None):
:param force:
:type force: Boolean
"""
if system() == 'FreeBSD':
return
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=0, size=I2C_SMBUS_QUICK)
Expand All @@ -389,12 +406,17 @@ def read_byte(self, i2c_addr, force=None):
:type force: Boolean
:return: Read byte value
"""
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=0, size=I2C_SMBUS_BYTE
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.byte
if system() == 'FreeBSD':
msg = i2c_msg.read(i2c_addr, 1)
self.i2c_rdwr(msg)
return msg.buf.contents
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=0, size=I2C_SMBUS_BYTE
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.byte

def write_byte(self, i2c_addr, value, force=None):
"""
Expand All @@ -407,11 +429,15 @@ def write_byte(self, i2c_addr, value, force=None):
:param force:
:type force: Boolean
"""
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=value, size=I2C_SMBUS_BYTE
)
ioctl(self.fd, I2C_SMBUS, msg)
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, value)
self.i2c_rdwr(msg)
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=value, size=I2C_SMBUS_BYTE
)
ioctl(self.fd, I2C_SMBUS, msg)

def read_byte_data(self, i2c_addr, register, force=None):
"""
Expand All @@ -426,12 +452,18 @@ def read_byte_data(self, i2c_addr, register, force=None):
:return: Read byte value
:rtype: int
"""
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_BYTE_DATA
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.byte
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, [register])
msg2 = i2c_msg.read(i2c_addr, 1)
self.i2c_rdwr(msg, msg2)
return msg2.buf.contents
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_BYTE_DATA
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.byte

def write_byte_data(self, i2c_addr, register, value, force=None):
"""
Expand All @@ -447,12 +479,16 @@ def write_byte_data(self, i2c_addr, register, value, force=None):
:type force: Boolean
:rtype: None
"""
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_BYTE_DATA
)
msg.data.contents.byte = value
ioctl(self.fd, I2C_SMBUS, msg)
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, [register] + data)
self.i2c_rdwr(msg)
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_BYTE_DATA
)
msg.data.contents.byte = value
ioctl(self.fd, I2C_SMBUS, msg)

def read_word_data(self, i2c_addr, register, force=None):
"""
Expand All @@ -467,12 +503,18 @@ def read_word_data(self, i2c_addr, register, force=None):
:return: 2-byte word
:rtype: int
"""
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_WORD_DATA
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.word
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, [register])
msg2 = i2c_msg.read(i2c_addr, 2)
self.i2c_rdwr(msg, msg2)
return msg2.buf.contents
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_WORD_DATA
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.word

def write_word_data(self, i2c_addr, register, value, force=None):
"""
Expand All @@ -488,12 +530,16 @@ def write_word_data(self, i2c_addr, register, value, force=None):
:type force: Boolean
:rtype: None
"""
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_WORD_DATA
)
msg.data.contents.word = value
ioctl(self.fd, I2C_SMBUS, msg)
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, [register] + data)
self.i2c_rdwr(msg)
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_WORD_DATA
)
msg.data.contents.word = value
ioctl(self.fd, I2C_SMBUS, msg)

def process_call(self, i2c_addr, register, value, force=None):
"""
Expand All @@ -509,6 +555,8 @@ def process_call(self, i2c_addr, register, value, force=None):
:type force: Boolean
:rtype: int
"""
if system() == 'FreeBSD':
raise NotImplementedError()
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_PROC_CALL
Expand All @@ -530,13 +578,19 @@ def read_block_data(self, i2c_addr, register, force=None):
:return: List of bytes
:rtype: list
"""
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_BLOCK_DATA
)
ioctl(self.fd, I2C_SMBUS, msg)
length = msg.data.contents.block[0]
return msg.data.contents.block[1:length + 1]
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, [register])
msg2 = i2c_msg.read(i2c_addr, 32)
self.i2c_rdwr(msg, msg2)
return msg2.buf.contents
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_BLOCK_DATA
)
ioctl(self.fd, I2C_SMBUS, msg)
length = msg.data.contents.block[0]
return msg.data.contents.block[1:length + 1]

def write_block_data(self, i2c_addr, register, data, force=None):
"""
Expand All @@ -555,13 +609,17 @@ def write_block_data(self, i2c_addr, register, data, force=None):
length = len(data)
if length > I2C_SMBUS_BLOCK_MAX:
raise ValueError("Data length cannot exceed %d bytes" % I2C_SMBUS_BLOCK_MAX)
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_BLOCK_DATA
)
msg.data.contents.block[0] = length
msg.data.contents.block[1:length + 1] = data
ioctl(self.fd, I2C_SMBUS, msg)
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, [register] + data)
self.i2c_rdwr(msg)
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_BLOCK_DATA
)
msg.data.contents.block[0] = length
msg.data.contents.block[1:length + 1] = data
ioctl(self.fd, I2C_SMBUS, msg)

def block_process_call(self, i2c_addr, register, data, force=None):
"""
Expand All @@ -579,6 +637,8 @@ def block_process_call(self, i2c_addr, register, data, force=None):
:return: List of bytes
:rtype: list
"""
if system() == 'FreeBSD':
raise NotImplementedError()
length = len(data)
if length > I2C_SMBUS_BLOCK_MAX:
raise ValueError("Data length cannot exceed %d bytes" % I2C_SMBUS_BLOCK_MAX)
Expand Down Expand Up @@ -609,13 +669,19 @@ def read_i2c_block_data(self, i2c_addr, register, length, force=None):
"""
if length > I2C_SMBUS_BLOCK_MAX:
raise ValueError("Desired block length over %d bytes" % I2C_SMBUS_BLOCK_MAX)
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
)
msg.data.contents.byte = length
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.block[1:length + 1]
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, [register])
msg2 = i2c_msg.read(i2c_addr, length)
self.i2c_rdwr(msg, msg2)
return msg2.buf.contents
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
)
msg.data.contents.byte = length
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.block[1:length + 1]

def write_i2c_block_data(self, i2c_addr, register, data, force=None):
"""
Expand All @@ -634,13 +700,17 @@ def write_i2c_block_data(self, i2c_addr, register, data, force=None):
length = len(data)
if length > I2C_SMBUS_BLOCK_MAX:
raise ValueError("Data length cannot exceed %d bytes" % I2C_SMBUS_BLOCK_MAX)
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
)
msg.data.contents.block[0] = length
msg.data.contents.block[1:length + 1] = data
ioctl(self.fd, I2C_SMBUS, msg)
if system() == 'FreeBSD':
msg = i2c_msg.write(i2c_addr, [register] + data)
self.i2c_rdwr(msg)
else:
self._set_address(i2c_addr, force=force)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
)
msg.data.contents.block[0] = length
msg.data.contents.block[1:length + 1] = data
ioctl(self.fd, I2C_SMBUS, msg)

def i2c_rdwr(self, *i2c_msgs):
"""
Expand All @@ -655,4 +725,7 @@ def i2c_rdwr(self, *i2c_msgs):
:rtype: None
"""
ioctl_data = i2c_rdwr_ioctl_data.create(*i2c_msgs)
ioctl(self.fd, I2C_RDWR, ioctl_data)
if system() == 'FreeBSD':
ioctl(self.fd, I2CRDWR, ioctl_data)
else:
ioctl(self.fd, I2C_RDWR, ioctl_data)

0 comments on commit c65d97c

Please sign in to comment.