-
Notifications
You must be signed in to change notification settings - Fork 4
/
start_nano.py
executable file
·207 lines (182 loc) · 6.67 KB
/
start_nano.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module starts all the screen sessions to run the ada_feeding demo.
"""
import asyncio
import argparse
import getpass
import os
import sys
# pylint: disable=duplicate-code
# This is intentionally similar to start_nano.py
parser = argparse.ArgumentParser()
parser.add_argument(
"-t",
"--termination_wait_secs",
default=5,
help="How long (secs) to wait for the code within screens to terminate (default, 5)",
)
parser.add_argument(
"-l",
"--launch_wait_secs",
default=0.1,
help=(
"How long (secs) to wait between running code in screens. Not that "
"too low of a value can result in commands getting rearranged. (default, 0.1)"
),
)
parser.add_argument(
"-c",
"--close",
action="store_true",
help="If set, only terminate the code in the screens.",
)
parser.add_argument(
"--tty",
default="",
help="If set, redirect output to this device.",
)
async def get_existing_screens():
"""
Get a list of active screen sessions.
Adapted from
https://serverfault.com/questions/886405/create-screen-session-in-background-only-if-it-doesnt-already-exist
"""
proc = await asyncio.create_subprocess_shell(
"screen -ls", stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
existing_screens = [
line.split(".")[1].split("\t")[0].rstrip()
for line in stdout.decode("utf-8").splitlines()
if line.startswith("\t")
]
return existing_screens
async def execute_command(screen_name: str, command: str, indent: int = 8) -> None:
"""
Execute a command in a screen.
"""
global sudo_password # pylint: disable=global-statement
indentation = " " * indent
printable_command = command.replace("\003", "SIGINT")
print(f"# {indentation}`{printable_command}`")
await asyncio.create_subprocess_shell(
f"screen -S {screen_name} -X stuff '{command}\n'"
)
await asyncio.sleep(args.launch_wait_secs)
if command.startswith("sudo"):
if sudo_password is None:
sudo_password = getpass.getpass(
prompt=f"# {indentation}Enter sudo password: "
)
await asyncio.create_subprocess_shell(
f"screen -S {screen_name} -X stuff '{sudo_password}\n'"
)
await asyncio.sleep(args.launch_wait_secs)
elif command.startswith("ssh"):
ssh_password = getpass.getpass(prompt=f"# {indentation}Enter ssh password: ")
await asyncio.create_subprocess_shell(
f"screen -S {screen_name} -X stuff '{ssh_password}\n'"
)
await asyncio.sleep(args.launch_wait_secs)
async def main(args: argparse.Namespace, pwd: str) -> None:
"""
Start the ada_feeding demo.
Args:
args: The command-line arguments.
pwd: The absolute path to the current directory.
"""
# pylint: disable=too-many-branches, too-many-statements
# This is meant to be a flexible function, hence the many branches and statements.
# pylint: disable=redefined-outer-name
# That is okay in this case.
print(
"################################################################################"
)
if not args.close:
print("# Running nano's camera in a screen session")
print("# Prerequisites / Notes:")
print(
"# 1. The home directory should contain the run_camera.sh script to "
"start the camera."
)
else:
print("# Terminating nano's camera in the screen session")
print(
"################################################################################"
)
# Determine which screen sessions to start and what commands to run
screen_sessions = {
"camera": [
"~/run_camera.sh"
+ ("" if len(args.tty) == 0 else f" 2>&1 | tee {args.tty}"),
],
"nano_bridge": [
"~/run_nano_bridge.sh"
+ ("" if len(args.tty) == 0 else f" 2>&1 | tee {args.tty}"),
],
}
close_commands = {}
initial_close_commands = ["\003"] # Ctrl+c termination
initial_start_commands = [f"cd {pwd}"]
for screen_name, commands in screen_sessions.items():
screen_sessions[screen_name] = initial_start_commands + commands
if screen_name not in close_commands:
close_commands[screen_name] = []
close_commands[screen_name] = (
initial_close_commands + close_commands[screen_name]
)
# Determine which screens are already running
print("# Checking for existing screen sessions")
terminated_screen = False
existing_screens = await get_existing_screens()
for screen_name in screen_sessions:
if screen_name in existing_screens:
print(f"# Found session `{screen_name}`: ")
for command in close_commands[screen_name]:
await execute_command(screen_name, command)
terminated_screen = True
elif not args.close:
print(f"# Creating session `{screen_name}`")
await asyncio.create_subprocess_shell(f"screen -dmS {screen_name}")
await asyncio.sleep(args.launch_wait_secs)
print(
"################################################################################"
)
if not args.close:
# Sleep for a bit to allow the screens to terminate
if terminated_screen:
print(f"# Waiting {args.termination_wait_secs} secs for code to terminate")
await asyncio.sleep(args.termination_wait_secs)
print(
"################################################################################"
)
# Start the screen sessions
print("# Starting camera code")
for screen_name, commands in screen_sessions.items():
print(f"# `{screen_name}`")
for command in commands:
await execute_command(screen_name, command)
print(
"################################################################################"
)
print("# Done!")
if __name__ == "__main__":
# Get the arguments
args = parser.parse_args()
# Ensure the script is not being run as sudo. Sudo has a different screen
# server and may have different versions of libraries installed.
if os.geteuid() == 0:
print(
"ERROR: This script should not be run as sudo. Run as a regular user.",
file=sys.stderr,
)
sys.exit(1)
# Get the path to the home folder.
HOME_PWD = "~"
# Run the main function
sudo_password = None # pylint: disable=invalid-name
asyncio.run(main(args, HOME_PWD))
# Return success
sys.exit(0)