-
Notifications
You must be signed in to change notification settings - Fork 1
/
DAQ.py
152 lines (115 loc) · 4.38 KB
/
DAQ.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import time
from sensors.AnalogStream import Linpot, Stream
from sensors.GyroAndAccel import Read
from button.ReadState import read_state
from ecu_can.CanBus import ECU
from DAQState import DAQState
import json
from labjack import ljm
import logging
logging.basicConfig(level=logging.DEBUG)
class DAQ(object):
def __init__(self, output_path: str, session_id: int = 0):
self.output_path = output_path
self.log = logging.getLogger("DAQ")
self.analog_stream: Stream | None = None
self.read_accel_gyro: Read | None = None
self.canbus: ECU | None = None
self.handle = ljm.openS(
"T7", "ANY", "ANY"
)
self._state = None
self._session_id = session_id or 0
self.log.info("session id: %s", self.session_id)
@property
def session_id(self):
return self._session_id
@session_id.setter
def session_id(self, new_session_id):
if new_session_id == self._session_id:
return
self._session_id = new_session_id
# update json file
with open("./config.json", "r+", encoding="utf8") as f:
try:
config = json.load(f)
except json.JSONDecodeError:
config = {}
config["session_id"] = new_session_id
f.seek(0)
json.dump(config, f)
f.truncate()
@property
def state(self):
return self._state
@state.setter
def state(self, new_state):
print("State Change: ", self._state, new_state)
if new_state == self._state:
return
else:
self.log.info("New State: %s", new_state)
print(new_state)
self._state = new_state
if new_state == DAQState.COLLECTING:
self.session_id += 1
if self.analog_stream:
self.analog_stream.start()
if self.canbus:
self.canbus.start(session_id=self.session_id)
if self.read_accel_gyro:
self.read_accel_gyro.start() # Cannot be called more than once per thread
elif new_state == DAQState.SAVING:
if self.analog_stream:
self.analog_stream.stop()
if self.canbus:
self.canbus.stop()
if self.read_accel_gyro:
self.read_accel_gyro.stop_reading()
if self.analog_stream:
self.analog_stream.save(self.output_path + f"/analog-{{}}-{self.session_id}.csv")
self.analog_stream = Stream(self.handle, extensions=[Linpot()])
if self.canbus:
self.canbus.save()
if self.read_accel_gyro:
self.read_accel_gyro.join() # Terminates thread
self.read_accel_gyro.save(self.output_path + f"/accel-{self.session_id}.csv", self.output_path
+ f"/gyro-{self.session_id}.csv")
self.read_accel_gyro = Read() # Re-initializes thread to reset session id
elif new_state == DAQState.INIT:
self.analog_stream = Stream(handle=self.handle, extensions=[Linpot()])
self.canbus = ECU(output_path=self.output_path + f"/can-")
self.handle = self.analog_stream.handle
self.read_accel_gyro = Read()
def _run(self):
while True:
button_clicked = read_state.read_button_state(self.handle)
if button_clicked:
if self.state in [DAQState.INIT, DAQState.SAVING]:
# startup is done
self.state = DAQState.COLLECTING
elif self.state == DAQState.COLLECTING:
self.state = DAQState.SAVING
time.sleep(0)
def start(self):
self.log.info("Starting DAQ")
self.state = DAQState.INIT
try:
self._run()
except KeyboardInterrupt:
self.log.info("DAQ Stopped")
finally:
self.state = DAQState.SAVING
self.log.info("DAQ Stopped")
def __del__(self):
if self.handle:
ljm.close(self.handle)
self.handle = None
if __name__ == "__main__":
try:
with open("./config.json", "r+", encoding="utf8") as f:
config = json.load(f)
except:
config = {}
daq = DAQ(config.get("output", "test-data"), session_id=config.get("session_id", 0))
daq.start()