diff --git a/cyusb.py b/cyusb.py index 702a6d8..8c03839 100755 --- a/cyusb.py +++ b/cyusb.py @@ -1,18 +1,27 @@ #!/usr/bin/env python3 # -*- coding: utf-8-unix -*- """ -A port of CyUSB Serial Library in pure python. +A port of Cypress USB Serial Library (libcyusbserial) in pure python. + +This code is still in alpha stage. Many protocols and data format +details are discovered, but information still needs to be cleaned +out and API/code/tools need further refactoring. + """ +import sys +import os import usb1 -import struct +import logging -from usb1 import USBContext -from usb1.libusb1 import * +from struct import pack, unpack +from usb1 import USBContext, USBInterfaceSetting from enum import Enum, IntEnum -from contextlib import contextmanager +from argparse import ArgumentParser from IPython import embed +log = logging.getLogger(__name__) + VID = 0x04b4 PID = 0x0004 @@ -20,14 +29,13 @@ EP_INTR = 3 EP_OUT = 0x00 -EP_IN = 0x80 +EP_IN = 0x80 -CY_VENDOR_REQUEST_DEVICE_TO_HOST = 0xC0 -CY_VENDOR_REQUEST_HOST_TO_DEVICE = 0x40 CY_VENDOR_REQUEST = 0x40 +CY_CLASS_INTERFACE_REQUEST = 0x21 -CY_CLASS_INTERFACE_REQUEST_DEVICE_TO_HOST = 0xA1 -CY_CLASS_INTERFACE_REQUEST_HOST_TO_DEVICE = 0x21 +# used to set which SCB to configure +CY_SCB_INDEX_POS = 15 class CY_TYPE(IntEnum): DISABLED = 0 @@ -78,8 +86,6 @@ class CY_VENDOR_CMDS(IntEnum): CY_DEVICE_RESET_CMD = 0xE3 -CY_SCB_INDEX_POS = 15 - # I2C related macros class CY_I2C(IntEnum): CONFIG_LENGTH = 16 @@ -109,8 +115,8 @@ class CY_SPI(IntEnum): BUS_ERROR = (1 << 1) # Vendor UART related macros -CY_SET_LINE_CONTROL_STATE_CMD = 0x22 class CY_UART(IntEnum): + SET_LINE_CONTROL_STATE_CMD = 0x22 SET_FLOW_CONTROL_CMD = 0x60 SEND_BREAK_CMD = 0x23 CONFIG_LEN = 16 @@ -151,68 +157,59 @@ class CY_PHDC(IntEnum): CY_PHDC_CLR_FEATURE_WVALUE = 0x1 CY_PHDC_SET_FEATURE_WVALUE = 0x0101 +def checksum(buff): + """Return checksum of 512-byte config bytes""" + return 0xFFFFFFFF & sum(unpack("<125I", buff[12:])) + +def get_checksum(buff): + """Extract checksum value in 512-byte config bytes""" + return unpack(" index: + raise Exception("Not enough interfaces (SCBs) found") # setup parameters - self.if_num, uc, ui, us = found + us, ui, uc, ud = found[index] + self.us_num = us.getAlternateSetting() + self.if_num = us.getNumber() self.uc_num = uc.getConfigurationValue() - self.us_num = us.getNumber() self.timeout = timeout # scan EPs @@ -230,8 +227,7 @@ def open(ud, cy_type, timeout=1000): # open USBDeviceHandle self.dev = ud.open() - # detach kernel driver to gain access (need root access) - #if dev.kernelDriverActive(if_num): dev.detachKernelDriver(if_num) + # detach kernel driver to gain access self.dev.setAutoDetachKernelDriver(True) # @@ -242,7 +238,6 @@ def open(ud, cy_type, timeout=1000): self.dev.setConfiguration(self.uc_num) if self.us_num > 0: self.dev.setInterfaceAltSetting(self.if_num, self.us_num) - return self def close(self): if self.dev: @@ -258,48 +253,44 @@ def __exit__(self, err_type, err_value, tb): self.close() ###################################################################### + # WARNING: Many APIs are not yet complete and/or tested. + ###################################################################### def CyGetSpiConfig(self): - dev = self.dev - scbIndex = 1 if self.if_num > 0 else 0 - bmRequestType = CY_VENDOR_REQUEST + bmRequestType = CY_VENDOR_REQUEST | EP_OUT bmRequest = CY_VENDOR_CMDS.CY_SPI_GET_CONFIG_CMD wValue = (scbIndex << CY_SCB_INDEX_POS) wIndex = 0 wLength = CY_SPI.CONFIG_LEN - ret = dev.controlRead(bmRequestType, bmRequest, - wValue, wIndex, wLength, self.timeout) + ret = self.dev.controlRead(bmRequestType, bmRequest, + wValue, wIndex, wLength, self.timeout) return ret def CySetSpiConfig(self, config): - dev = self.dev - scbIndex = 1 if self.if_num > 0 else 0 - bmRequestType = CY_VENDOR_REQUEST + bmRequestType = CY_VENDOR_REQUEST | EP_OUT bmRequest = CY_VENDOR_CMDS.CY_SPI_SET_CONFIG_CMD wValue = (scbIndex << CY_SCB_INDEX_POS) wIndex = 0 wLength = CY_SPI.CONFIG_LEN wBuffer = bytearray() - ret = dev.controlWrite(bmRequestType, bmRequest, - wValue, wIndex, wBuffer, self.timeout) + ret = self.dev.controlWrite(bmRequestType, bmRequest, + wValue, wIndex, wBuffer, self.timeout) return ret def CySpiReset(self): - dev = self.dev - scbIndex = 1 if self.if_num > 0 else 0 - bmRequestType = CY_VENDOR_REQUEST + bmRequestType = CY_VENDOR_REQUEST | EP_OUT bmRequest = CY_VENDOR_CMDS.CY_SPI_RESET_CMD wValue = (scbIndex << CY_SCB_INDEX_POS) wIndex = 0 wLength = 0 - ret = dev.controlRead(bmRequestType, bmRequest, - wValue, wIndex, wLength, self.timeout) + ret = self.dev.controlRead(bmRequestType, bmRequest, + wValue, wIndex, wLength, self.timeout) return ret def CySpiRead(self, size): @@ -499,7 +490,7 @@ def CyUartSetBreak(self, ms): def CyUartSetRts(self): bmRequestType = CY_VENDOR_REQUEST | EP_OUT - bmRequest = CY_VENDOR_CMDS.CY_SET_LINE_CONTROL_STATE_CMD + bmRequest = CY_UART.SET_LINE_CONTROL_STATE_CMD wValue = (1<<1) | self.dtrValue wIndex = self.if_num wBuffer = bytearray(0) @@ -511,7 +502,7 @@ def CyUartSetRts(self): def CyUartClearRts(self): bmRequestType = CY_VENDOR_REQUEST | EP_OUT - bmRequest = CY_VENDOR_CMDS.CY_SET_LINE_CONTROL_STATE_CMD + bmRequest = CY_UART.SET_LINE_CONTROL_STATE_CMD wValue = self.dtrValue wIndex = self.if_num wBuffer = bytearray(0) @@ -523,7 +514,7 @@ def CyUartClearRts(self): def CyUartSetDtr(self): bmRequestType = CY_VENDOR_REQUEST | EP_OUT - bmRequest = CY_VENDOR_CMDS.CY_SET_LINE_CONTROL_STATE_CMD + bmRequest = CY_UART.SET_LINE_CONTROL_STATE_CMD wValue = (self.rtsValue << 1) | 1 wIndex = self.if_num wBuffer = bytearray(0) @@ -535,7 +526,7 @@ def CyUartSetDtr(self): def CyUartClearDtr(self): bmRequestType = CY_VENDOR_REQUEST | EP_OUT - bmRequest = CY_VENDOR_CMDS.CY_SET_LINE_CONTROL_STATE_CMD + bmRequest = CY_UART.SET_LINE_CONTROL_STATE_CMD wValue = (self.rtsValue << 1) wIndex = self.if_num wBuffer = bytearray(0) @@ -605,7 +596,7 @@ def CyPhdcClrFeature(self): def CyPhdcSetFeature(self): bmRequestType = CY_VENDOR_REQUEST | EP_OUT - bmRequest = CY_VENDOR_CMDS.CY_PHDC_SET_FEATURE + bmRequest = CY_PHDC.SET_FEATURE wValue = CY_PHDC_SET_FEATURE_WVALUE wIndex = self.if_num wBuffer = bytearray(0) @@ -616,7 +607,7 @@ def CyPhdcSetFeature(self): def CyPhdcGetStatus(self): bmRequestType = CY_VENDOR_REQUEST | EP_IN - bmRequest = CY_VENDOR_CMDS.CY_PHDC_GET_DATA_STATUS + bmRequest = CY_PHDC.GET_DATA_STATUS wValue = 0 wIndex = self.if_num wLength = CY_PHDC_GET_STATUS_LEN @@ -702,6 +693,10 @@ def CyGetSignature(self): wValue, wIndex, wLength, self.timeout) return ret + ###################################################################### + # Non-Cypress APIs still under experimental stage + ###################################################################### + def ping(self): """Send whatever USCU sends on startup""" bmRequestType = CY_VENDOR_REQUEST | EP_OUT @@ -782,30 +777,113 @@ def write_config(self, buff): wIndex = 0 if len(buff) != 512: - print("FIXME: need to have valid buffer") - sys.exit(0) + raise Exception("Config memory MUST be 512byte in size") + if get_checksum(buff) != checksum(buff): + raise Exception("Invalid config memory checksum") + wBuffer = buff ret = self.dev.controlWrite(bmRequestType, bmRequest, wValue, wIndex, wBuffer, self.timeout) return ret - @staticmethod - def checksum(buff): - """Return Cypress-style checksum of 512-byte config bytes""" - return 0xFFFFFFFF & sum(struct.unpack("<125I", buff[12:])) +###################################################################### + +def format_usage(): + p = os.path.basename(sys.argv[0]) + return """ +{p} - Reprogram Cypress USB-to-Serial chip (CY7C65211, etc) +Usage: {p} [options] (save|load|mode) args... +Options: + -V, --vid vid: VID of device to connect (0x04b4) + -P, --pid pid: PID of device to connect (0x0004) + -n, --nth N : Select Nth device (0) + -s, --scb N : Select Nth SCB block (0) +Example: + $ {p} save save.bin + $ {p} load save.bin + $ {p} mode SPI + $ {p} mode I2C + $ {p} mode UART +NOTE: +- Detail of configuration memory is still under investigation. +- Interface is likely to change after further discovery. +""".lstrip().format(**locals()) + +def usage(): + sys.stderr.write(format_usage()) + sys.exit(0) + +def to_int(v): + return int(v, 0) + +def do_save(ctx, file): + dev = ctx.dev + dev.connect() + buf = dev.read_config() + dev.disconnect() + + if get_checksum(buf) != checksum(buf): + raise Exception("Invalid checksum") + + return open(file, "wb").write(buf) + +def do_load(ctx, file): + dev = ctx.dev + buf = open(file, "rb").read(512) # cut 513th byte from USB capture + + if get_checksum(buf) != checksum(buf): + raise Exception("Invalid checksum") + + dev.connect() + ret = dev.write_config(buf) + dev.disconnect() + return ret + +def do_mode(ctx, mode): + dev = ctx.dev + raise Exception("Not yet implemented") + +def main(ctx): + opt = ctx.opt -def main(): with USBContext() as context: + found = list(find_device(context, opt.vid, opt.pid)) + + if len(found) - 1 < opt.nth: + raise Exception("No USB device found") + # ux == USB Device/Configuration/Interface/Setting/Endpoint - ud = list(find_device(context, VID, PID))[0] + ud = found[opt.nth] uc = ud[0] ui = uc[0] us = ui[0] ue = us[0] - with CyUSB.open(ud, CY_TYPE.I2C) as cyd: - embed() - -if __name__ == '__main__': - main() + with CyUSB(ud, CY_TYPE.MFG, index=opt.scb) as dev: + #embed() + ctx.dev = dev + cmd = opt.args[0] + if cmd == "save": do_save(ctx, *opt.args[1:]) + if cmd == "load": do_load(ctx, *opt.args[1:]) + if cmd == "mode": do_mode(ctx, *opt.args[1:]) + +if __name__ == '__main__' and '__file__' in globals(): + ap = ArgumentParser() + ap.format_help = ap.format_usage = format_usage + ap.add_argument('-D', '--debug', default="WARN") + ap.add_argument('-V', '--vid', type=to_int, default=VID) + ap.add_argument('-P', '--pid', type=to_int, default=PID) + ap.add_argument('-n', '--nth', type=int, default=0) + ap.add_argument('-s', '--scb', type=int, default=0) + ap.add_argument('args', nargs='*') + + opt = ap.parse_args() + + if not opt.args: + usage() + logging.basicConfig(level=eval('logging.' + opt.debug)) + + ctx = lambda:0 + ctx.opt = opt + main(ctx)