From 95d3778b1252111802e4593c5230aaf4dcd90c74 Mon Sep 17 00:00:00 2001 From: atlas 0f d00m Date: Fri, 3 Sep 2021 21:44:21 -0400 Subject: [PATCH 1/2] testcancat beginnings --- cancat/__init__.py | 10 +++- cancat/test/__init__.py | 100 +++++++++++++++++++++++++++++++++++++ cancat/vstruct/bitfield.py | 2 +- 3 files changed, 109 insertions(+), 3 deletions(-) diff --git a/cancat/__init__.py b/cancat/__init__.py index d1bdda4..52c7f8b 100644 --- a/cancat/__init__.py +++ b/cancat/__init__.py @@ -268,8 +268,14 @@ def _reconnect(self, port=None, baud=None): if self._io != None: self._io.close() - self._io = serial.Serial(port=self.port, baudrate=self._baud, dsrdtr=True) - self._io.setDTR(True) + # SHIM to allow us to easily specify a Fake CanCat for testing + if self.port == 'FakeCanCat': + import cancat.tests as testcat + self._io = testcat.FakeCanCat() + + else: + self._io = serial.Serial(port=self.port, baudrate=self._baud, dsrdtr=True) + self._io.setDTR(True) # clear all locks and free anything waiting for them if self._in_lock != None: diff --git a/cancat/test/__init__.py b/cancat/test/__init__.py index e69de29..1e558d5 100644 --- a/cancat/test/__init__.py +++ b/cancat/test/__init__.py @@ -0,0 +1,100 @@ + +import usb +import time +import queue +import logging +import unittest +import threading +import traceback + +from rflib.const import * +from rflib.bits import ord23 + +logging.basicConfig(level=logging.INFO, format='%(asctime)s:%(levelname)s:%(name)s: %(message)s') +logger = logging.getLogger(__name__) + +EP0BUFSIZE = 512 + + +class fakeMemory: + def __init__(self, size=64*1024): + self.memory = [0 for x in range(size)] + self.mmio = {} + + def readMemory(self, addr, size): + logger.debug("fm.readMemory(0x%x, 0x%x)", addr, size) + chunk = b''.join([b'%c' % x for x in self.memory[addr:addr+size]]) + if len(chunk) < size: + chunk += b"@" * (size-len(chunk)) + return chunk + + def writeMemory(self, addr, data): + logger.debug("fm.writeMemory(0x%x, %r)", addr, data) + #if type(data) == str: + # raise(Exception("Cannot write 'str' to fakeMemory! Must use 'bytes'")) + + for x in range(len(data)): + tgt = addr+x + val = data[x] + + handler = self.mmio.get(tgt) + if handler is not None: + val = handler(tgt, data[x]) + + # if we didn't return None from the handler, write it anyway + if val is not None: + self.memory[tgt] = val + + ''' + def mmio_RFST(self, tgt, dbyte): + logger.info('mmio_RFST(0x%x, %r)', tgt, dbyte) + print("RFST==%x (%x)" % (self.readMemory(X_RFST, 1), ord(dbyte))) + + + # configure MARCSTATE + val = ord(dbyte) + if val in (2, 3): + val = dbyte+10 + + else: + val = MARC_STATE_RX + + self.writeMemory(MARCSTATE, b'%c'%(val)) + + # still set RFST + return dbyte + + def mmio_MARCSTATE(self, tgt, dbyte): + rfst = self.readMemory(X_RFST, 1) + logger.info('mmio_MARCSTATE(0x%x, %r) rfst=%r', tgt, dbyte, rfst) + return MARC_STATE_RX + ''' + +# TODO: first, implement PING capability + +class FakeCanCat: + ''' + This class emulates a real CanCat (the physical device). + ''' + def __init__(self): + self._recvbuf = queue.Queue() + self._outbuf = queue.Queue() # this is what's handed to the CanCat receiver thread + self.memory = fakeMemory() + + self.start_ts = time.time() + + self.memory.writeMemory(0xdf00, FAKE_MEM_DF00) + self.memory.writeMemory(0xdf46, b'\xf0\x0d') + for intreg, intval in list(FAKE_INTERRUPT_REGISTERS.items()): + logger.info('setting interrupt register: %r = %r', intreg, intval) + self.memory.writeMemory(eval(intreg), intval) + + def clock(self): + return time.time() - self.start_ts + + def CanCat_send(data, cmd): + packet = '@%c%c%s' % (len(data)+1, cmd, data) + self._outbuf += packet + + #### FAKE SERIAL DEVICE (interface to Python) + diff --git a/cancat/vstruct/bitfield.py b/cancat/vstruct/bitfield.py index 1f6608d..673ab55 100644 --- a/cancat/vstruct/bitfield.py +++ b/cancat/vstruct/bitfield.py @@ -1,4 +1,4 @@ -import envi.bits as e_bits +import cancat.envi.bits as e_bits from vstruct import VStruct from vstruct.primitives import * from binascii import unhexlify From 01b4174d9db4cdf8c325a59330d8d09f8cd571aa Mon Sep 17 00:00:00 2001 From: atlas 0f d00m Date: Sat, 4 Sep 2021 02:21:27 -0400 Subject: [PATCH 2/2] FakeCanCat works! requirements.txt update (although i'm having issues with termcolor) --- CanCat.py | 2 +- cancat/__init__.py | 7 +-- cancat/test/__init__.py | 100 ++++++++++++++++++++++++++++++++++++---- requirements.txt | 4 +- 4 files changed, 96 insertions(+), 17 deletions(-) diff --git a/CanCat.py b/CanCat.py index e84690b..a306a32 100755 --- a/CanCat.py +++ b/CanCat.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3 from __future__ import print_function import sys diff --git a/cancat/__init__.py b/cancat/__init__.py index 1e8ff20..0abf2e2 100644 --- a/cancat/__init__.py +++ b/cancat/__init__.py @@ -276,7 +276,7 @@ def _reconnect(self, port=None, baud=None): # SHIM to allow us to easily specify a Fake CanCat for testing if self.port == 'FakeCanCat': - import cancat.tests as testcat + import cancat.test as testcat self._io = testcat.FakeCanCat() else: @@ -534,8 +534,9 @@ def _send(self, cmd, message): finally: self._out_lock.release() # FIXME: wait for response? - except: - print("Could not acquire lock. Are you trying interactive commands without an active connection?") + except Exception as e: + print("Exception: %r" % e) + #print("Could not acquire lock. Are you trying interactive commands without an active connection?") def CANrecv(self, count=1): ''' diff --git a/cancat/test/__init__.py b/cancat/test/__init__.py index 1e558d5..e4f26f9 100644 --- a/cancat/test/__init__.py +++ b/cancat/test/__init__.py @@ -70,31 +70,111 @@ def mmio_MARCSTATE(self, tgt, dbyte): return MARC_STATE_RX ''' +## Serial Commands ## +CMD_LOG = 0x2f +CMD_LOG_HEX = 0x2e + +CMD_CAN_RECV = 0x30 +CMD_PING_RESPONSE = 0x31 +CMD_CHANGE_BAUD_RESULT = 0x32 +CMD_CAN_BAUD_RESULT = 0x33 +CMD_CAN_SEND_RESULT = 0x34 +CMD_ISO_RECV = 0x35 +CMD_SET_FILT_MASK = 0x36 +CMD_CAN_MODE_RESULT = 0x37 +CMD_CAN_SEND_ISOTP_RESULT = 0x38 +CMD_CAN_RECV_ISOTP_RESULT = 0x39 +CMD_CAN_SENDRECV_ISOTP_RESULT = 0x3A +CMD_PRINT_CAN_REGS = 0x3C + +CMD_PING = 0x41 +CMD_CHANGE_BAUD = 0x42 +CMD_CAN_BAUD = 0x43 +CMD_CAN_SEND = 0x44 +CMD_CAN_MODE = 0x45 +CMD_CAN_MODE_SNIFF_CAN0 = 0x00 +CMD_CAN_MODE_SNIFF_CAN1 = 0x01 +CMD_CAN_MODE_CITM = 0x02 +CMD_CAN_SEND_ISOTP = 0x46 +CMD_CAN_RECV_ISOTP = 0x47 +CMD_CAN_SENDRECV_ISOTP = 0x48 + # TODO: first, implement PING capability + + class FakeCanCat: ''' This class emulates a real CanCat (the physical device). + We're going to try making this single-threaded. We may need to handle in/out in separate threads, but fingers crossed. ''' def __init__(self): - self._recvbuf = queue.Queue() - self._outbuf = queue.Queue() # this is what's handed to the CanCat receiver thread + self._inbuf = b'' + self._outbuf = b'' + + self._inq = queue.Queue() + self._outq = queue.Queue() # this is what's handed to the CanCat receiver thread self.memory = fakeMemory() self.start_ts = time.time() - self.memory.writeMemory(0xdf00, FAKE_MEM_DF00) - self.memory.writeMemory(0xdf46, b'\xf0\x0d') - for intreg, intval in list(FAKE_INTERRUPT_REGISTERS.items()): - logger.info('setting interrupt register: %r = %r', intreg, intval) - self.memory.writeMemory(eval(intreg), intval) + #self.memory.writeMemory(0xdf00, FAKE_MEM_DF00) + #self.memory.writeMemory(0xdf46, b'\xf0\x0d') + #for intreg, intval in list(FAKE_INTERRUPT_REGISTERS.items()): + # logger.info('setting interrupt register: %r = %r', intreg, intval) + # self.memory.writeMemory(eval(intreg), intval) + + # do we want to add in a few CAN messages to be received? def clock(self): return time.time() - self.start_ts - def CanCat_send(data, cmd): - packet = '@%c%c%s' % (len(data)+1, cmd, data) - self._outbuf += packet + def CanCat_send(self, data, cmd): + print(b'===FakeCanCat_send: cmd:%x data: %r' % (cmd, data)) + packet = b'@%c%c%s' % (len(data)+1, cmd, data) + self._inq.put(packet) + + def log(self, msg): + self.CanCat_send(b"FakeCanCat: " + msg, CMD_LOG) + def logHex(self, num): + self.CanCat_send(struct.pack(b"