-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
executable file
·75 lines (63 loc) · 2.22 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#! /usr/bin/env python3
# ---------------------------------------------------------------------------- #
# app.py #
# #
# By - jacksonwb #
# Created: Sunday October 2019 7:12:26 pm #
# Modified: Sunday Oct 2019 7:32:01 pm #
# Modified By: jacksonwb #
# ---------------------------------------------------------------------------- #
from src.myo_core import MyoBT
from src.neuro_ml import NeuroML
import os, time
import numpy as np
from collections import deque
import src.gt_module as gt_module
import argparse
import multiprocessing
def handler(myo, emg):
for i, data in enumerate(emg):
print(f'ch{i}: {data}', end=' ')
print('')
def parse():
parser = argparse.ArgumentParser()
parser.add_argument('--model', '-m')
parser.add_argument('--seq', '-s', help='model sequence length', type=int, default=32)
return parser.parse_args()
q_loc = deque()
def producer_handler(myo, emg):
q_loc.append(np.array(emg, dtype='float64'))
if len(q_loc) > seq_len:
q_loc.popleft()
print('q len:', q.qsize())
if q.qsize() < 2 and len(q_loc) == seq_len:
q.put(np.array(q_loc).reshape([1, seq_len, 8]))
def producer(q):
myo = MyoBT()
myo.assign_emg_handler(producer_handler)
myo.run()
def consumer(q, model_file):
ml = NeuroML()
ml.load_model(model_file)
while True:
now = time.time()
data = q.get()
predict = ml.predict_sequence(data)
gt_module.send_to_godot(predict[0])
fr = 1 / (time.time() - now)
print('framerate:', fr)
if __name__ == "__main__":
args = parse()
global seq_len
seq_len = args.seq
model_file = args.model if args.model else '../models/FC_arch_jackson_all_joints_model.h5'
q = multiprocessing.Queue()
producer = multiprocessing.Process(target=producer, args=(q,))
producer.daemon = True
consumer = multiprocessing.Process(target=consumer, args=(q,model_file))
consumer.daemon = True
print('Starting sub processes...')
consumer.start()
producer.start()
while True:
pass