-
Notifications
You must be signed in to change notification settings - Fork 3
/
bot_runner.py
executable file
·362 lines (300 loc) · 11.7 KB
/
bot_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import argparse
import asyncio
import json
import os
import subprocess
import sys
import time
import uuid
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
import aiohttp
import requests
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from loguru import logger
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper, DailyRoomObject, DailyRoomParams, DailyRoomProperties)
from tenacity import retry, stop_after_attempt, wait_exponential
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = int(os.getenv("MAX_SESSION_TIME", 5 * 60)) # Default: 5 minutes
REQUIRED_ENV_VARS = [
"DAILY_API_KEY",
"OPENAI_API_KEY",
"ELEVENLABS_API_KEY",
"ELEVENLABS_VOICE_ID",
"FLY_API_KEY",
"FLY_APP_NAME",
]
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "mds-moderator")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
"Authorization": f"Bearer {FLY_API_KEY}",
"Content-Type": "application/json",
}
daily_helpers = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
async def create_room() -> DailyRoomObject:
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("off_DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(properties=DailyRoomProperties())
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
return room
# ----------------- API ----------------- #
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount the static directory
STATIC_DIR = "frontend/out"
app.mount("/static", StaticFiles(directory=STATIC_DIR, html=True), name="static")
# ----------------- Main ----------------- #
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60))
def check_machine_state(vm_id):
logger.info(f"Checking state of machine {vm_id}")
res = requests.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}",
headers=FLY_HEADERS,
timeout=30,
)
res.raise_for_status()
state = res.json()["state"]
logger.info(f"Machine {vm_id} state: {state}")
if state != "started":
raise Exception(f"Machine not in 'started' state. Current state: {state}")
return state
def spawn_fly_machine(
room_url: str,
token: str,
bot_name: str,
system_prompt: Optional[str] = None,
sprite_folder: Optional[str] = None,
):
spawn_timeout = 300 # 5 minutes timeout
logger.info(f"Spawning Fly machine for room: {room_url}")
logger.info(f"Bot name: {bot_name}")
logger.info(f"System prompt: {system_prompt}")
logger.info(f"Sprite folder: {sprite_folder}")
# Use the same image as the bot runner
logger.info("Fetching current machine image from Fly")
res = requests.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines",
headers=FLY_HEADERS,
timeout=spawn_timeout,
)
if res.status_code != 200:
logger.error(f"Unable to get machine info from Fly: {res.text}")
raise Exception(f"Unable to get machine info from Fly: {res.text}")
image = res.json()[0]["config"]["image"]
logger.info(f"Using image: {image}")
# Machine configuration
cmd = f"/app/.venv/bin/python3 bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {"cmd": cmd},
"restart": {"policy": "no"},
"guest": {"cpu_kind": "shared", "cpus": 1, "memory_mb": 1024},
"env": {},
},
}
if system_prompt:
worker_props["config"]["env"]["SYSTEM_PROMPT"] = system_prompt
if sprite_folder:
worker_props["config"]["env"]["SPRITE_FOLDER"] = sprite_folder
worker_props["config"]["env"]["BOT_NAME"] = bot_name
logger.info("Worker properties:")
logger.info(json.dumps(worker_props, indent=2))
# Spawn a new machine instance
logger.info("Spawning new Fly machine")
res = requests.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines",
headers=FLY_HEADERS,
json=worker_props,
timeout=spawn_timeout,
)
if res.status_code != 200:
logger.error(f"Problem starting a bot worker: {res.text}")
raise Exception(f"Problem starting a bot worker: {res.text}")
# Wait for the machine to enter the started state
vm_id = res.json()["id"]
logger.info(f"Machine spawned with ID: {vm_id}")
logger.info("Waiting for machine to enter 'started' state")
start_time = time.time()
while time.time() - start_time < spawn_timeout:
try:
state = check_machine_state(vm_id)
if state == "started":
logger.info(f"Machine successfully started and joined room: {room_url}")
return
except Exception as e:
logger.warning(f"Error checking machine state: {e}")
time.sleep(10) # Wait 10 seconds before checking again
logger.error(f"Bot was unable to enter started state within {spawn_timeout} seconds")
raise Exception(f"Bot was unable to enter started state within {spawn_timeout} seconds")
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
if os.getenv("DUMMY_BOT", False):
# Simulate bot spawning without actually creating a room or spawning a bot
dummy_room_url = f"https://example.daily.co/{uuid.uuid4()}"
dummy_token = f"dummy_token_{uuid.uuid4()}"
return JSONResponse(
{
"room_url": dummy_room_url,
"token": dummy_token,
}
)
try:
data = await request.json()
logger.info(f"Starting bot with request: {data}")
# Is this a webhook creation request?
if "test" in data:
return JSONResponse({"test": True})
system_prompt = data.get("system_prompt") or os.getenv("SYSTEM_PROMPT")
sprite_folder = data.get("sprite_folder")
bot_name = data.get("name") or os.getenv("BOT_NAME", "Chatbot")
except Exception:
system_prompt = os.getenv("SYSTEM_PROMPT")
sprite_folder = None
bot_name = os.getenv("BOT_NAME", "Chatbot")
room = await create_room()
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)
if run_as_process:
print("Running as process")
else:
print("Running as VM")
if run_as_process:
try:
env = os.environ.copy()
if system_prompt:
env["SYSTEM_PROMPT"] = system_prompt
env["BOT_NAME"] = bot_name
# check if we run inside a docker container
if os.path.exists("/app/.venv/bin/python"):
cmd = f"/app/.venv/bin/python bot.py -u {room.url} -t {token}"
else:
cmd = f"pipenv run python bot.py -u {room.url} -t {token}"
subprocess.Popen(
[cmd],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
env=env,
) # nosec B602
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
spawn_fly_machine(room.url, token, bot_name, system_prompt, sprite_folder)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to spawn VM: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse(
{
"room_url": room.url,
"token": user_token,
}
)
@app.get("/{path_name:path}", response_class=FileResponse)
async def catch_all(path_name: Optional[str] = ""):
if path_name == "":
return FileResponse(f"{STATIC_DIR}/index.html")
file_path = Path(STATIC_DIR) / (path_name or "")
if file_path.is_file():
return file_path
html_file_path = file_path.with_suffix(".html")
if html_file_path.is_file():
return FileResponse(html_file_path)
raise HTTPException(status_code=404, detail="File not found")
async def deploy_bot(bot_name: str):
# Create a new room
try:
room = await create_room()
except HTTPException as e:
print(f"Unable to provision room: {e.detail}")
return False
# Get a token for the bot
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
print(f"Failed to get token for room: {room.url}")
return False
# Deploy the bot to Fly
try:
system_prompt = os.getenv("SYSTEM_PROMPT", "You're a friendly chatbot.")
sprite_folder = os.getenv("SPRITE_FOLDER", "robot")
spawn_fly_machine(room.url, token, bot_name, system_prompt, sprite_folder)
print(f"Bot '{bot_name}' deployed successfully to room: {room.url}")
except Exception as e:
print(f"Failed to spawn VM: {e}")
return False
return True
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="MDS Bot Runner")
parser.add_argument("--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true", default=False, help="Reload code on change")
parser.add_argument(
"--deploy-bot",
action="store_true",
default=False,
help="Immediately deploy a bot to Fly",
)
parser.add_argument(
"--bot-name",
type=str,
default=os.getenv("BOT_NAME", "Chatbot"),
help="Name of the bot",
)
config = parser.parse_args()
if config.deploy_bot:
bot_name = config.bot_name
rv = asyncio.run(deploy_bot(bot_name))
if not rv:
sys.exit(1)
sys.exit(0)
try:
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")