Skip to content

Commit

Permalink
FakeCanCat works!
Browse files Browse the repository at this point in the history
requirements.txt update (although i'm having issues with termcolor)
  • Loading branch information
atlas0fd00m committed Sep 4, 2021
1 parent 8fda33a commit 01b4174
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CanCat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
from __future__ import print_function

import sys
Expand Down
7 changes: 4 additions & 3 deletions cancat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def _reconnect(self, port=None, baud=None):

# SHIM to allow us to easily specify a Fake CanCat for testing
if self.port == 'FakeCanCat':
import cancat.tests as testcat
import cancat.test as testcat
self._io = testcat.FakeCanCat()

else:
Expand Down Expand Up @@ -534,8 +534,9 @@ def _send(self, cmd, message):
finally:
self._out_lock.release()
# FIXME: wait for response?
except:
print("Could not acquire lock. Are you trying interactive commands without an active connection?")
except Exception as e:
print("Exception: %r" % e)
#print("Could not acquire lock. Are you trying interactive commands without an active connection?")

def CANrecv(self, count=1):
'''
Expand Down
100 changes: 90 additions & 10 deletions cancat/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,111 @@ def mmio_MARCSTATE(self, tgt, dbyte):
return MARC_STATE_RX
'''

## Serial Commands ##
CMD_LOG = 0x2f
CMD_LOG_HEX = 0x2e

CMD_CAN_RECV = 0x30
CMD_PING_RESPONSE = 0x31
CMD_CHANGE_BAUD_RESULT = 0x32
CMD_CAN_BAUD_RESULT = 0x33
CMD_CAN_SEND_RESULT = 0x34
CMD_ISO_RECV = 0x35
CMD_SET_FILT_MASK = 0x36
CMD_CAN_MODE_RESULT = 0x37
CMD_CAN_SEND_ISOTP_RESULT = 0x38
CMD_CAN_RECV_ISOTP_RESULT = 0x39
CMD_CAN_SENDRECV_ISOTP_RESULT = 0x3A
CMD_PRINT_CAN_REGS = 0x3C

CMD_PING = 0x41
CMD_CHANGE_BAUD = 0x42
CMD_CAN_BAUD = 0x43
CMD_CAN_SEND = 0x44
CMD_CAN_MODE = 0x45
CMD_CAN_MODE_SNIFF_CAN0 = 0x00
CMD_CAN_MODE_SNIFF_CAN1 = 0x01
CMD_CAN_MODE_CITM = 0x02
CMD_CAN_SEND_ISOTP = 0x46
CMD_CAN_RECV_ISOTP = 0x47
CMD_CAN_SENDRECV_ISOTP = 0x48

# TODO: first, implement PING capability



class FakeCanCat:
'''
This class emulates a real CanCat (the physical device).
We're going to try making this single-threaded. We may need to handle in/out in separate threads, but fingers crossed.
'''
def __init__(self):
self._recvbuf = queue.Queue()
self._outbuf = queue.Queue() # this is what's handed to the CanCat receiver thread
self._inbuf = b''
self._outbuf = b''

self._inq = queue.Queue()
self._outq = queue.Queue() # this is what's handed to the CanCat receiver thread
self.memory = fakeMemory()

self.start_ts = time.time()

self.memory.writeMemory(0xdf00, FAKE_MEM_DF00)
self.memory.writeMemory(0xdf46, b'\xf0\x0d')
for intreg, intval in list(FAKE_INTERRUPT_REGISTERS.items()):
logger.info('setting interrupt register: %r = %r', intreg, intval)
self.memory.writeMemory(eval(intreg), intval)
#self.memory.writeMemory(0xdf00, FAKE_MEM_DF00)
#self.memory.writeMemory(0xdf46, b'\xf0\x0d')
#for intreg, intval in list(FAKE_INTERRUPT_REGISTERS.items()):
# logger.info('setting interrupt register: %r = %r', intreg, intval)
# self.memory.writeMemory(eval(intreg), intval)

# do we want to add in a few CAN messages to be received?

def clock(self):
return time.time() - self.start_ts

def CanCat_send(data, cmd):
packet = '@%c%c%s' % (len(data)+1, cmd, data)
self._outbuf += packet
def CanCat_send(self, data, cmd):
print(b'===FakeCanCat_send: cmd:%x data: %r' % (cmd, data))
packet = b'@%c%c%s' % (len(data)+1, cmd, data)
self._inq.put(packet)

def log(self, msg):
self.CanCat_send(b"FakeCanCat: " + msg, CMD_LOG)
def logHex(self, num):
self.CanCat_send(struct.pack(b"<I", num), CMD_LOG_HEX)
def logHexStr(self, num, prefix):
self.log(prefix)
self.logHex(num)

#### FAKE SERIAL DEVICE (interface to Python)
def read(self, count=1):
if len(self._inbuf) < count:
empty = False
try:
#print(b'===fccc: attempting to get from _inq:')
self._inbuf += self._inq.get(timeout = 1)
except queue.Empty:
empty = True
#print(b'===fccc: attempting to get from _inq: NOPE')
return b''

out = self._inbuf[:count]
self._inbuf = self._inbuf[count:]
if len(out):
#print(b"===FakeCanCatCHEAT===: read(%r): %x" % (count, ord(out)))
return out

return b''

def write(self, msg):
#self._inq.put(msg) # nah, let's try to handle it here...

self.log(b"write(%r)" % msg)
print(b"===FakeCanCatCHEAT===: write(%r)" % msg)

length, cmd = struct.unpack("<HB", msg[0:3])
data = msg[3:]

if cmd == CMD_CHANGE_BAUD:
self.log(b"CMD_CHANGE_BAUD")

elif cmd == CMD_PING:
print(b'=CMD_PING=')
self.CanCat_send(data, CMD_PING_RESPONSE)

4 changes: 1 addition & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
ipython
#numpy
#pyparsing
pyserial
pyusb
#serial
termcolor

0 comments on commit 01b4174

Please sign in to comment.