-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
246 lines (179 loc) · 5.88 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
Servo Controller to run in a microcontroller
Design Notes.
There are two tasks that each run on their own core and execute independently.
Micropython in RP2040 does not have a global lock and allows true multitasking
with a task per core.
Task 0.
This is the initial task after Micropython boots. It does the following:
1. initialization and setup
2. create the second task
3. Runs a command processing loop that can block while waiting for input
on the serial interface the connects to the Linux computer. The loop
reads one "line" from the interface, looks at the command inside then
cas the proper function to process that command. Commands include thins
like setting the angle of the servos or setting calibration numbers.
Task 1.
This task does several operations that can not block.
1. If enabled, move the servos at a specified rate. This allows the servos
to move smoothly without sonstant input from te Linux computer
2. Blink the onboard LED
3. In future revisions...?
"""
import micropython
import _thread
import gc
from machine import Pin
import utime
import json
import servo_driver
## TODO take comments of the numpy import
"""
try:
from ulab import numpy as np
except ImportError:
import numpy as np
"""
version_id = micropython.const("Servo Controller, Version 0.1")
# level 0, assertions are compiled in. Set to 1 to compile out
micropython.opt_level(0)
# save some bytes for processing the "out of memory exception
micropython.alloc_emergency_exception_buf(100)
# global instance of servo driver
sd = servo_driver.ServoDriver()
data_lock = _thread.allocate_lock()
led = Pin(25, Pin.OUT)
def get_next_command():
"""returns a full command or None"""
# TODO Ths needs to come over a different interface such as UART, or SPI
line = input('$$$')
try:
command = json.loads(line)
except:
print('ERROR. json.load() failed')
command = None
return command
def exec_command(command):
"""get and exec a command.
Example of all valid commands
('none',)
('angle',[745, 2000, 1500,...])
('cal',[200.4, 544, 643,...], [456, 34, 77])
('start',)
('stop',)
('echo',)
('ID',)
"""
global version_id
global sd
global data_lock
global cpu_total_us
global cpu_num_cycles
op = command[0]
if op == 'none':
pass
elif op == 'angle':
with data_lock:
sd.set_angles(command[1])
elif op == 'rate':
with data_lock:
sd.set_rate(command[1])
elif op == 'cal':
with data_lock:
sd.set_calibration(command[1], command[2])
elif op == 'limit':
with data_lock:
sd.set_us_limits(command[1], command[2])
elif op == 'channel_active':
with data_lock:
sd.set_channel_active(command[1])
elif op == 'start':
with data_lock:
sd.start()
elif op == 'stop':
with data_lock:
sd.stop()
elif op == 'zero':
with data_lock:
sd.zero_all()
elif op == 'echo':
print('>>>' + str(command) + '<<<')
elif op == "ID":
print(version_id)
elif op == "dump":
with data_lock:
print(sd.dump_all())
elif op == "cpu":
print("loop uSec", cpu_total_us / cpu_num_cycles)
print("mem free ", gc.mem_free())
else:
pass
@micropython.native
def past_ms(tickval_ms) -> bool:
delta = utime.ticks_diff(utime.ticks_ms(), tickval_ms)
if delta > 0:
return True
else:
return False
# CPU utilization tracking, task1
cpu_total_us = 0
cpu_num_cycles = 0
def task1():
"""Advances the servos at given rate and otheR non-blocking actions"""
global sd
global data_lock
global led
global cpu_total_us
global cpu_num_cycles
heartbeat_period = micropython.const(2000) # ms
heartbeat_on_ms = micropython.const( 400) # ms
servo_advance_period = micropython.const( 50) # ms
# init to 1ms in the past.
servo_advance_next = utime.ticks_add(utime.ticks_ms(), -1)
heartbeat_next = utime.ticks_add(utime.ticks_ms(), -1)
heartbeat_off = utime.ticks_add(utime.ticks_ms(), 1)
while True:
start_loop = utime.ticks_us()
if past_ms(heartbeat_next):
led.on()
heartbeat_next = utime.ticks_add(heartbeat_next,
heartbeat_period)
heartbeat_off = utime.ticks_add(utime.ticks_ms(),
heartbeat_on_ms)
if past_ms(heartbeat_off):
led.off()
if past_ms(servo_advance_next):
# If this attempt to get the lock fail, it will
# be retried on the next time around the loop.
with data_lock:
sd.advance()
servo_advance_next = utime.ticks_add(utime.ticks_ms(),
servo_advance_period)
# gc.collect() this takes 5 microseconds
# TODO place call to WDT.feed() here
cpu_total_us += utime.ticks_diff(utime.ticks_us(), start_loop)
cpu_num_cycles += 1
utime.sleep_ms(5)
def task0():
"""Read and process commands from serial data link"""
command_poll_period = micropython.const(5) # ms
while True:
command = get_next_command()
if command is not None:
exec_command(command)
# gc.collect() takes too long to run, perhaps move it
utime.sleep_ms(command_poll_period)
def main():
"""Entry point for servo controller"""
global led
# Fash LED fast for "wakeup"
for i in range(5):
led.on()
utime.sleep_ms(100)
led.off()
utime.sleep_ms(400)
# Start the Second thread.
_thread.start_new_thread(task1, ())
# Run task0 i the current thread
task0()
main()