forked from ChrisTimperley/ardu-demo-bugs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mission_runner.py
executable file
·169 lines (143 loc) · 5.26 KB
/
mission_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python
#
# Uses Dronekit to interact with the SITL (software-based simulator) and to
# issue commands/missions to the rover. After the demonstration, this
# functionality is to be subsumed by Houston.
#
from __future__ import print_function
import subprocess
import time
import dronekit
import dronekit_sitl
from timeit import default_timer as timer
from pymavlink import mavutil
from dronekit_sitl import SITL
from dronekit import Vehicle, \
VehicleMode, \
Command, \
CommandSequence, \
LocationGlobal
def snapshot(vehicle):
"""
Produces a snapshot of the current state of the vehicle.
"""
snap = {
'is_armable': vehicle.is_armable,
'armed': vehicle.armed,
'mode': vehicle.mode.name,
'groundspeed': vehicle.groundspeed,
'heading': vehicle.heading,
'lat': vehicle.location.global_frame.lat,
'lon': vehicle.location.global_frame.lon,
'alt': vehicle.location.global_frame.alt
}
return snap
def parse_command(s):
"""
Parses a line from a mission file into its corresponding Command
object in Dronekit.
"""
args = s.split()
arg_index = int(args[0])
arg_currentwp = 0 #int(args[1])
arg_frame = int(args[2])
arg_cmd = int(args[3])
arg_autocontinue = 0 # not supported by dronekit
(p1, p2, p3, p4, x, y, z) = [float(x) for x in args[4:11]]
cmd = Command(0, 0, 0, arg_frame, arg_cmd, arg_currentwp, arg_autocontinue,\
p1, p2, p3, p4, x, y, z)
return cmd
def load_mission(fn):
"""
Loads a mission from a given file and converts it into a list of
Command objects.
"""
cmds = []
with open(fn, 'r') as f:
lines = [l.strip() for l in f]
for line in lines[1:]:
cmd = parse_command(line)
cmds.append(cmd)
return cmds
def issue_mission(vehicle, commands):
"""
Issues (but does not trigger) a mission, provided as a list of commands,
to a given vehicle.
Blocks until the mission has been downloaded onto the vehicle.
"""
vcmds = vehicle.commands
vcmds.clear()
for command in commands:
vcmds.add(command)
vcmds.upload()
vcmds.wait_ready()
def execute_mission(mission, time_limit):
# TODO: allow 'binary' and 'speedup' to be passed as arguments
home = [40.071374969556928, -105.22978898137808, 1583.702759, 246]
speedup = 1
binary = '/experiment/source/build/sitl/bin/ardurover'
vehicle = sitl = None
try:
model_arg = '--model=rover'
home_arg = '--home={}'.format(','.join(map(str, home)))
defaults_arg = "--defaults=/experiment/source/Tools/autotest/default_params/rover.parm"
sitl = SITL(binary)
sitl.launch([home_arg, model_arg, defaults_arg],
verbose=True,
await_ready=True,
restart=False,
speedup=speedup)
# launch mavproxy -- ports aren't being forwarded!
# sim_uri = sitl.connection_string()
# mavproxy_cmd = \
# "mavproxy.py --console --master tcp:127.0.0.1:5760 --sitl 127.0.0.1:5501 --out localhost:14550 & /bin/bash"
# subprocess.call(mavproxy_cmd, shell=True)
# time.sleep(1)
# dronekit is broken! it always tries to connect to 127.0.0.1:5760
print("try to connect to vehicle...")
dronekit_connects_to = 'tcp:127.0.0.1:5760'
vehicle = dronekit.connect(dronekit_connects_to, wait_ready=True)
# TODO: add time limit
while not vehicle.is_armable:
time.sleep(0.2)
# TODO: add time limit
# arm the rover
vehicle.armed = True
while not vehicle.armed:
print("waiting for the rover to be armed...")
time.sleep(0.1)
vehicle.armed = True
# TODO: add time limit
issue_mission(vehicle, mission)
# trigger the mission by switching the vehicle's mode to "AUTO"
vehicle.mode = VehicleMode("AUTO")
# monitor the mission
last_wp = vehicle.commands.count
mission_complete = [False]
waypoints_visited = []
try:
def on_waypoint(self, name, message):
wp = int(message.seq)
# print("Reached WP #{}".format(wp))
# record the (relevant) state of the vehicle
waypoints_visited.append((wp, snapshot(vehicle)))
if wp == last_wp:
mission_complete[0] = True
vehicle.add_message_listener('MISSION_ITEM_REACHED', on_waypoint)
# wait until the last waypoint is reached or a time limit has expired
time_started = timer()
while not mission_complete[0]:
time_elapsed = timer() - time_started
if time_limit and time_elapsed > time_limit:
return waypoints_visited[:]
time.sleep(0.2)
return waypoints_visited[:]
# remove the listener
finally:
vehicle.remove_message_listener('MISSION_ITEM_REACHED', on_waypoint)
# close the connection and shutdown the simulator
finally:
if vehicle:
vehicle.close()
if sitl:
sitl.stop()