-
-
Notifications
You must be signed in to change notification settings - Fork 334
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
NeuroPawn bug fix + emulation + docs (#748)
add neuropawn board
- Loading branch information
1 parent
75d55cd
commit 75db782
Showing
19 changed files
with
634 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import logging | ||
import threading | ||
import time | ||
from random import randint | ||
|
||
class Listener(threading.Thread): | ||
|
||
def __init__(self, port, write, read): | ||
# for windows write and read are methods from Serial object, for linux - os.read/write it doesnt work otherwise | ||
threading.Thread.__init__(self) | ||
self.port = port | ||
self.writer_process = None | ||
self.write = write | ||
self.read = read | ||
self.need_stop = False | ||
|
||
def run(self): | ||
self.writer_process = KnightBoardWriter(self.port, 0.005, self.write) | ||
self.writer_process.daemon = True | ||
self.writer_process.start() | ||
time.sleep(10) | ||
self.writer_process.need_data = False | ||
self.writer_process.join() | ||
|
||
|
||
class KnightBoardWriter(threading.Thread): | ||
|
||
def __init__(self, port, delay, write): | ||
threading.Thread.__init__(self) | ||
self.port = port | ||
self.write = write | ||
self.delay = delay | ||
self.package_size = 21 | ||
self.package_num = 0 | ||
self.need_data = True | ||
|
||
def run(self): | ||
while self.need_data: | ||
if self.package_num % 256 == 0: | ||
self.package_num = 0 | ||
|
||
package = list() | ||
package.append(0xA0) | ||
package.append(self.package_num) | ||
for i in range(2, self.package_size - 1): | ||
package.append(randint(0, 255)) | ||
package.append(0xC0) | ||
logging.info(bytes(package)) | ||
self.write(self.port, bytes(package)) | ||
|
||
self.package_num = self.package_num + 1 | ||
time.sleep(self.delay) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import logging | ||
import os | ||
import pty | ||
import subprocess | ||
import sys | ||
|
||
from brainflow_emulator.emulate_common import TestFailureError, log_multilines | ||
from brainflow_emulator.knightboard_emulator import Listener | ||
|
||
|
||
def write(port, data): | ||
return os.write(port, data) | ||
|
||
|
||
def read(port, num_bytes): | ||
return os.read(port, num_bytes) | ||
|
||
|
||
def get_ports_pty(): | ||
master, slave = pty.openpty() | ||
s_name = os.ttyname(slave) | ||
return master, slave, s_name | ||
|
||
|
||
def test_serial(cmd_list, master, slave, s_name): | ||
listen_thread = Listener(master, write, read) | ||
listen_thread.daemon = True | ||
listen_thread.start() | ||
|
||
cmd_to_run = cmd_list + [s_name] | ||
logging.info('Running %s' % ' '.join([str(x) for x in cmd_to_run])) | ||
process = subprocess.Popen(cmd_to_run, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
stdout, stderr = process.communicate() | ||
|
||
log_multilines(logging.info, stdout) | ||
log_multilines(logging.info, stderr) | ||
|
||
if process.returncode != 0: | ||
raise TestFailureError('Test failed with exit code %s' % str(process.returncode), process.returncode) | ||
|
||
return stdout, stderr | ||
|
||
|
||
def main(cmd_list): | ||
if not cmd_list: | ||
raise Exception('No command to execute') | ||
master, slave, s_name = get_ports_pty() | ||
test_serial(cmd_list, master, slave, s_name) | ||
|
||
|
||
if __name__ == '__main__': | ||
logging.basicConfig(level=logging.INFO) | ||
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
import logging | ||
import os | ||
import subprocess | ||
import sys | ||
import time | ||
|
||
import pkg_resources | ||
from brainflow_emulator.emulate_common import TestFailureError, log_multilines | ||
from brainflow_emulator.knightboard_emulator import Listener | ||
from serial import Serial | ||
|
||
|
||
def write(port, data): | ||
return port.write(data) | ||
|
||
|
||
def read(port, num_bytes): | ||
return port.read(num_bytes) | ||
|
||
|
||
def get_isntaller(): | ||
return pkg_resources.resource_filename(__name__, os.path.join('com0com', 'setup_com0com_W7_x64_signed.exe')) | ||
|
||
|
||
def install_com0com(): | ||
this_directory = os.path.abspath(os.path.dirname(__file__)) | ||
directory = os.path.join(this_directory, 'com0com') | ||
if not os.path.exists(directory): | ||
os.makedirs(directory) | ||
cmds = [get_isntaller(), '/NCRC', '/S', '/D=%s' % directory] | ||
logging.info('running %s' % ' '.join(cmds)) | ||
p = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
out, err = p.communicate() | ||
if p.returncode != 0: | ||
logging.error('stdout is %s' % out) | ||
logging.error('stderr is %s' % err) | ||
raise Exception('com0com installation failure') | ||
logging.info('Sleeping a few second, it doesnt work in appveyour without it') | ||
time.sleep(10) | ||
return directory | ||
|
||
|
||
def get_ports_windows(): | ||
directory = install_com0com() | ||
# remove ports from previous run if any | ||
p = subprocess.Popen([os.path.join(directory, 'setupc.exe'), 'remove', '0'], | ||
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory) | ||
stdout, stderr = p.communicate() | ||
logging.info('remove stdout is %s' % stdout) | ||
logging.info('remove stderr is %s' % stderr) | ||
|
||
m_name = 'COM14' | ||
s_name = 'COM15' | ||
|
||
p = subprocess.Popen( | ||
[os.path.join(directory, 'setupc.exe'), 'install', 'PortName=%s' % m_name, 'PortName=%s' % s_name], | ||
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory) | ||
stdout, stderr = p.communicate() | ||
logging.info('install stdout is %s' % stdout) | ||
logging.info('install stderr is %s' % stderr) | ||
|
||
if p.returncode != 0: | ||
raise Exception('com0com failure') | ||
logging.info('Sleeping a few second, it doesnt work in appveyour without it') | ||
time.sleep(10) | ||
return m_name, s_name | ||
|
||
|
||
def test_serial(cmd_list, m_name, s_name): | ||
master = Serial('\\\\.\\%s' % m_name, timeout=0) | ||
listen_thread = Listener(master, write, read) | ||
listen_thread.daemon = True | ||
listen_thread.start() | ||
|
||
cmd_to_run = cmd_list + [s_name] | ||
logging.info('Running %s' % ' '.join([str(x) for x in cmd_to_run])) | ||
process = subprocess.Popen(cmd_to_run, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
stdout, stderr = process.communicate() | ||
|
||
log_multilines(logging.info, stdout) | ||
log_multilines(logging.info, stderr) | ||
|
||
master.close() | ||
if process.returncode != 0: | ||
raise TestFailureError('Test failed with exit code %s' % str(process.returncode), process.returncode) | ||
|
||
return stdout, stderr | ||
|
||
|
||
def main(cmd_list): | ||
if not cmd_list: | ||
raise Exception('No command to execute') | ||
|
||
m_name, s_name = get_ports_windows() | ||
test_serial(cmd_list, m_name, s_name) | ||
|
||
|
||
if __name__ == '__main__': | ||
logging.basicConfig(level=logging.INFO) | ||
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,5 +59,6 @@ | |
EXPLORE_PLUS_8_CHAN_BOARD(54) | ||
EXPLORE_PLUS_32_CHAN_BOARD(55) | ||
PIEEG_BOARD(56) | ||
NEUROPAWN_KNIGHT_BOARD(57) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.