From 7490840ca3c450e3956c6f3d45b0fb9778d683db Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Wed, 17 Jul 2024 15:48:45 -0700 Subject: [PATCH] Query manager (#72) * initial query test, mocking env variables initial query test, mocking env variables * add more query tests * add pylint hint * remove tmp file, accidental addition * add QueryRunner, fail if a query is running * add QueryManager, refactor query loading and running. add tests for /start routes * fix bugs; add alert if query can't start * Update alert.tsx Co-authored-by: Alex Koshelev * Update sidecar/app/query/base.py Co-authored-by: Alex Koshelev * move logger onto settings, only built once * wrapper function for query not found * add comments to Query and QueryManager * fix typing issue with logger and pydantic * small refactor to add configure logger method on settings * create check_capacity helper, add details to message when at capacity --------- Co-authored-by: Alex Koshelev --- .pre-commit-config.yaml | 2 +- server/app/alert.tsx | 38 +++--- server/app/query/create/page.tsx | 19 ++- server/app/query/layout.tsx | 2 +- sidecar/app/logger.py | 22 ---- sidecar/app/main.py | 2 + sidecar/app/query/base.py | 125 ++++++++++++-------- sidecar/app/query/ipa.py | 5 +- sidecar/app/query/status.py | 6 +- sidecar/app/routes/http_helpers.py | 30 +++++ sidecar/app/routes/start.py | 72 +++++++++--- sidecar/app/routes/stop.py | 22 ++-- sidecar/app/routes/websockets.py | 61 +++++----- sidecar/app/settings.py | 54 ++++++++- sidecar/tests/app/query/test_base.py | 121 +++++++++++++++++++ sidecar/tests/app/routes/test_start.py | 156 +++++++++++++++++++++++++ 16 files changed, 585 insertions(+), 152 deletions(-) delete mode 100644 sidecar/app/logger.py create mode 100644 sidecar/app/routes/http_helpers.py create mode 100644 sidecar/tests/app/query/test_base.py create mode 100644 sidecar/tests/app/routes/test_start.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 368c543..56db7e4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,7 +32,7 @@ repos: entry: coverage report types: [python] pass_filenames: false - args: [--fail-under=9] # increase this over time + args: [--fail-under=60] # increase this over time, goal=80 - id: pyre-check name: pyre-check entry: pyre check diff --git a/server/app/alert.tsx b/server/app/alert.tsx index 37434a5..5c67347 100644 --- a/server/app/alert.tsx +++ b/server/app/alert.tsx @@ -1,9 +1,12 @@ import Link from "next/link"; -import { CheckCircleIcon, XMarkIcon } from "@heroicons/react/20/solid"; +import { + CheckCircleIcon, + ExclamationTriangleIcon, +} from "@heroicons/react/20/solid"; -export default function QueryStartedAlert({ queryId }: { queryId: string }) { +export function QueryStartedAlert({ queryId }: { queryId: string }) { return ( -
+
.{" "}

-
-
- -
+
+
+ ); +} + +export function QueryFailedToStartAlert({ queryId }: { queryId: string }) { + return ( +
+
+
+
+
+

+ Failed to start a query: {queryId}. +

diff --git a/server/app/query/create/page.tsx b/server/app/query/create/page.tsx index a51ccf6..1e5581e 100644 --- a/server/app/query/create/page.tsx +++ b/server/app/query/create/page.tsx @@ -9,7 +9,7 @@ import { } from "@heroicons/react/20/solid"; import { useRouter } from "next/navigation"; import { ExclamationCircleIcon } from "@heroicons/react/20/solid"; -import QueryStartedAlert from "@/app/alert"; +import { QueryStartedAlert, QueryFailedToStartAlert } from "@/app/alert"; import { DemoLoggerRemoteServers, IPARemoteServers, @@ -23,6 +23,9 @@ type QueryType = Database["public"]["Enums"]["query_type"]; export default function Page() { const [queryId, setQueryId] = useState(null); + const [querySubmitSuccess, setQuerySubmitSuccess] = useState( + null, + ); const router = useRouter(); const handleFormSubmit = async ( @@ -43,9 +46,14 @@ export default function Page() { method: "POST", body: params, }); - const _data = await response.json(); + if (!response.ok) { + setQuerySubmitSuccess(false); + const error_message = await response.text(); + throw new Error(error_message); + } } + setQuerySubmitSuccess(true); await new Promise((f) => setTimeout(f, 1000)); // Redirect to /query/view/ @@ -67,7 +75,12 @@ export default function Page() { return ( <> - {queryId && } + {queryId && querySubmitSuccess === true && ( + + )} + {queryId && querySubmitSuccess === false && ( + + )}
diff --git a/server/app/query/layout.tsx b/server/app/query/layout.tsx index dbea5ae..c5cb9f8 100644 --- a/server/app/query/layout.tsx +++ b/server/app/query/layout.tsx @@ -7,7 +7,7 @@ export default function RootLayout({ children: React.ReactNode; }) { return ( -
+
{children}
); diff --git a/sidecar/app/logger.py b/sidecar/app/logger.py deleted file mode 100644 index 295d1cc..0000000 --- a/sidecar/app/logger.py +++ /dev/null @@ -1,22 +0,0 @@ -import sys - -from loguru import logger - -from .helpers import Role -from .settings import settings - -logger.remove() -max_role_str_len = max(len(role.name) for role in Role) -role_str = f"{settings.role.name.replace('_', ' ').title():>{max_role_str_len}}" - -LOGGER_FORMAT = ( - "{time:YYYY-MM-DD HH:mm:ss} | " - "{level: <8} | " - "{extra[role]} - {message}" -) -logger.configure(extra={"role": role_str}) -logger.add( - sys.stderr, - level="INFO", - format=LOGGER_FORMAT, -) diff --git a/sidecar/app/main.py b/sidecar/app/main.py index c160239..579ac1d 100644 --- a/sidecar/app/main.py +++ b/sidecar/app/main.py @@ -1,9 +1,11 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from .query.base import QueryManager from .routes import start, stop, websockets app = FastAPI() +app.state.QUERY_MANAGER = QueryManager(max_parallel_queries=1) app.include_router(websockets.router) app.include_router(start.router) app.include_router(stop.router) diff --git a/sidecar/app/query/base.py b/sidecar/app/query/base.py index 20c3c2d..57caf17 100644 --- a/sidecar/app/query/base.py +++ b/sidecar/app/query/base.py @@ -8,54 +8,69 @@ import loguru from ..helpers import Role -from ..logger import logger -from ..settings import settings +from ..settings import get_settings from .status import Status, StatusHistory from .step import Step -# Dictionary to store queries -queries: dict[str, "Query"] = {} - class QueryExistsError(Exception): pass +def status_file_path(query_id: str) -> Path: + settings = get_settings() + return settings.status_dir_path / Path(query_id) + + +def log_file_path(query_id: str) -> Path: + settings = get_settings() + return settings.log_dir_path / Path(query_id) + + @dataclass class Query: + """ + Query is the base class, used to implement a list of steps to be run by this server. + The server has a role, obtained from get_settings(). + + Steps implement a `build_from_query` method, + which allows them to utilize data stored on the query. + """ + + # pylint: disable=too-many-instance-attributes query_id: str current_step: Optional[Step] = field(init=False, default=None, repr=True) - logger: loguru.Logger = field(init=False, repr=False) - _logger_id: int = field(init=False, repr=False) + logger: loguru.Logger = field(init=False, repr=False, compare=False) + _logger_id: int = field(init=False, repr=False, compare=False) + role: Role = field(init=False, repr=True) _status_history: StatusHistory = field(init=False, repr=True) step_classes: ClassVar[list[type[Step]]] = [] def __post_init__(self): - self.logger = logger.bind(task=self.query_id) - status_dir = settings.root_path / Path("status") - status_dir.mkdir(exist_ok=True) - status_file_path = status_dir / Path(f"{self.query_id}") - self._status_history = StatusHistory(file_path=status_file_path, logger=logger) - - self._log_dir.mkdir(exist_ok=True) - self._logger_id = logger.add( + settings = get_settings() + + self.logger = settings.logger.bind(task=self.query_id) + self.role = settings.role + + self._status_history = StatusHistory( + file_path=self.status_file_path, logger=self.logger + ) + + self._logger_id = self.logger.add( self.log_file_path, serialize=True, filter=lambda record: record["extra"].get("task") == self.query_id, enqueue=True, ) self.logger.debug(f"adding new Query {self}.") - if queries.get(self.query_id) is not None: - raise QueryExistsError(f"{self.query_id} already exists") - queries[self.query_id] = self @property - def _log_dir(self) -> Path: - return settings.root_path / Path("logs") + def status_file_path(self) -> Path: + return status_file_path(self.query_id) @property - def role(self) -> Role: - return settings.role + def log_file_path(self) -> Path: + return log_file_path(self.query_id) @property def started(self) -> bool: @@ -65,23 +80,6 @@ def started(self) -> bool: def finished(self) -> bool: return self.status >= Status.COMPLETE - @classmethod - def get_from_query_id(cls, query_id) -> Optional["Query"]: - query = queries.get(query_id) - if query: - return query - try: - query = cls(query_id) - except QueryExistsError as e: - # avoid race condition on queries - query = queries.get(query_id) - if query: - return query - raise e - if query.status == Status.UNKNOWN: - return None - return query - @property def status(self) -> Status: return self._status_history.current_status @@ -99,10 +97,6 @@ def status_event_json(self): def running(self): return self.started and not self.finished - @property - def log_file_path(self) -> Path: - return self._log_dir / Path(f"{self.query_id}.log") - @property def steps(self) -> Iterable[Step]: for step_class in self.step_classes: @@ -154,11 +148,9 @@ def crash(self): def _cleanup(self): self.current_step = None try: - logger.remove(self._logger_id) + self.logger.remove(self._logger_id) except ValueError: pass - if queries.get(self.query_id) is not None: - del queries[self.query_id] @property def cpu_usage_percent(self) -> float: @@ -174,3 +166,44 @@ def memory_rss_usage(self) -> int: QueryTypeT = TypeVar("QueryTypeT", bound=Query) + + +class MaxQueriesRunningError(Exception): + pass + + +@dataclass +class QueryManager: + """ + The QueryManager allows for a fixed number of queries to run at once, + and stores those queries in a dictionary. + + Accessing running queries allows the finish and kill methods to be called + from another caller (typically a route handler in the HTTP layer. + """ + + max_parallel_queries: int = field(init=True, repr=False, default=1) + running_queries: dict[str, Query] = field( + init=False, repr=True, default_factory=dict + ) + + def get_from_query_id(self, cls, query_id: str) -> Optional[Query]: + if query_id in self.running_queries: + return self.running_queries[query_id] + if status_file_path(query_id).exists(): + return cls(query_id) + return None + + def run_query(self, query: Query): + if not self.capacity_available: + raise MaxQueriesRunningError( + f"Only {self.max_parallel_queries} allowed. Currently running {self}" + ) + + self.running_queries[query.query_id] = query + query.start() + del self.running_queries[query.query_id] + + @property + def capacity_available(self): + return len(self.running_queries) < self.max_parallel_queries diff --git a/sidecar/app/query/ipa.py b/sidecar/app/query/ipa.py index 9e730e2..eb980a5 100644 --- a/sidecar/app/query/ipa.py +++ b/sidecar/app/query/ipa.py @@ -12,7 +12,7 @@ from ..helpers import Role from ..local_paths import Paths -from ..settings import settings +from ..settings import get_settings from .base import Query from .command import FileOutputCommand, LoggerOutputCommand from .step import CommandStep, LoggerOutputCommandStep, Status, Step @@ -30,6 +30,7 @@ class IPAQuery(Query): def send_kill_signals(self): self.logger.info("sending kill signals") + settings = get_settings() for helper in settings.helpers.values(): if helper.role == self.role: continue @@ -241,6 +242,7 @@ def build_from_query(cls, query: IPAQuery): ) def run(self): + settings = get_settings() sidecar_urls = [ helper.sidecar_url for helper in settings.helpers.values() @@ -339,6 +341,7 @@ class IPACoordinatorQuery(IPAQuery): def send_terminate_signals(self): self.logger.info("sending terminate signals") + settings = get_settings() for helper in settings.helpers.values(): if helper.role == self.role: continue diff --git a/sidecar/app/query/status.py b/sidecar/app/query/status.py index ebb112b..14014fc 100644 --- a/sidecar/app/query/status.py +++ b/sidecar/app/query/status.py @@ -4,7 +4,7 @@ from dataclasses import dataclass, field from enum import IntEnum, auto from pathlib import Path -from typing import NamedTuple +from typing import NamedTuple, Optional import loguru @@ -51,7 +51,9 @@ def locking_status(self): """Cannot add to history after this or higher status is reached""" return Status.COMPLETE - def add(self, status: Status, timestamp: float = time.time()): + def add(self, status: Status, timestamp: Optional[float] = None): + if timestamp is None: + timestamp = time.time() assert status > self.current_status assert self.current_status < self.locking_status self._status_history.append( diff --git a/sidecar/app/routes/http_helpers.py b/sidecar/app/routes/http_helpers.py new file mode 100644 index 0000000..bdfe1e4 --- /dev/null +++ b/sidecar/app/routes/http_helpers.py @@ -0,0 +1,30 @@ +from typing import Type + +from fastapi import HTTPException + +from ..query.base import Query, QueryManager + + +def get_query_from_query_id( + query_manager: QueryManager, + query_cls: Type[Query], + query_id: str, +) -> Query: + query = query_manager.get_from_query_id(query_cls, query_id) + if query is None: + raise HTTPException(status_code=404, detail=f"Query<{query_id}> not found") + return query + + +def check_capacity( + query_manager: QueryManager, +) -> None: + if not query_manager.capacity_available: + raise HTTPException( + status_code=503, + detail=( + f"Capacity unavailable. Currently running " + f"{len(query_manager.running_queries)} of " + f"{query_manager.max_parallel_queries} queries." + ), + ) diff --git a/sidecar/app/routes/start.py b/sidecar/app/routes/start.py index 168e630..25e8784 100644 --- a/sidecar/app/routes/start.py +++ b/sidecar/app/routes/start.py @@ -3,15 +3,15 @@ from pathlib import Path from typing import Annotated -from fastapi import APIRouter, BackgroundTasks, Form, HTTPException +from fastapi import APIRouter, BackgroundTasks, Form, Request, status from fastapi.responses import StreamingResponse from ..local_paths import Paths from ..query.base import Query from ..query.demo_logger import DemoLoggerQuery from ..query.ipa import GateType, IPACoordinatorQuery, IPAHelperQuery -from ..query.status import Status -from ..settings import settings +from ..settings import get_settings +from .http_helpers import check_capacity, get_query_from_query_id router = APIRouter( prefix="/start", @@ -21,20 +21,43 @@ ) -@router.post("/demo-logger/{query_id}") +class IncorrectRoleError(Exception): + pass + + +@router.get("/capacity_available") +def capacity_available( + request: Request, +): + query_manager = request.app.state.QUERY_MANAGER + return {"capacity_available": query_manager.capacity_available} + + +@router.get("/running_queries") +def running_queries( + request: Request, +): + query_manager = request.app.state.QUERY_MANAGER + return {"running_queries": list(query_manager.running_queries.keys())} + + +@router.post("/demo-logger/{query_id}", status_code=status.HTTP_201_CREATED) def demo_logger( query_id: str, num_lines: Annotated[int, Form()], total_runtime: Annotated[int, Form()], background_tasks: BackgroundTasks, + request: Request, ): + query_manager = request.app.state.QUERY_MANAGER + check_capacity(query_manager) + query = DemoLoggerQuery( query_id=query_id, num_lines=num_lines, total_runtime=total_runtime, ) - background_tasks.add_task(query.start) - + background_tasks.add_task(query_manager.run_query, query) return {"message": "Process started successfully", "query_id": query_id} @@ -47,11 +70,18 @@ def start_ipa_helper( multi_threading: Annotated[bool, Form()], disable_metrics: Annotated[bool, Form()], background_tasks: BackgroundTasks, + request: Request, ): # pylint: disable=too-many-arguments + query_manager = request.app.state.QUERY_MANAGER + check_capacity(query_manager) + + settings = get_settings() role = settings.role if not role or role == role.COORDINATOR: - raise Exception("Cannot start helper without helper role.") + raise IncorrectRoleError( + f"Cannot start helper without helper role. Currently running {role=}." + ) compiled_id = ( f"{commit_hash}_{gate_type}" @@ -75,28 +105,26 @@ def start_ipa_helper( disable_metrics=disable_metrics, port=settings.helper_port, ) - background_tasks.add_task(query.start) - + background_tasks.add_task(query_manager.run_query, query) return {"message": "Process started successfully", "query_id": query_id} @router.get("/ipa-helper/{query_id}/status") def get_ipa_helper_status( query_id: str, + request: Request, ): - query = Query.get_from_query_id(query_id) - if query is None: - return {"status": Status.NOT_FOUND.name} + query = get_query_from_query_id(request.app.state.QUERY_MANAGER, Query, query_id) return {"status": query.status.name} @router.get("/{query_id}/log-file") def get_ipa_helper_log_file( query_id: str, + request: Request, ): - query = Query.get_from_query_id(query_id) - if query is None: - return HTTPException(status_code=404, detail="Query not found") + query = get_query_from_query_id(request.app.state.QUERY_MANAGER, Query, query_id) + settings = get_settings() def iterfile(): with open(query.log_file_path, "rb") as f: @@ -123,7 +151,7 @@ def iterfile(): @router.post("/ipa-query/{query_id}") -def start_ipa_test_query( +def start_ipa_query( query_id: str, commit_hash: Annotated[str, Form()], size: Annotated[int, Form()], @@ -131,11 +159,19 @@ def start_ipa_test_query( max_trigger_value: Annotated[int, Form()], per_user_credit_cap: Annotated[int, Form()], background_tasks: BackgroundTasks, + request: Request, ): # pylint: disable=too-many-arguments + query_manager = request.app.state.QUERY_MANAGER + check_capacity(query_manager) + + settings = get_settings() role = settings.role if role != role.COORDINATOR: - raise Exception(f"Sidecar {role}: Cannot start query without coordinator role.") + raise IncorrectRoleError( + f"Attempting to start query with {role=}: " + "Cannot start query without coordinator role." + ) paths = Paths( repo_path=settings.root_path / Path("ipa"), @@ -153,6 +189,6 @@ def start_ipa_test_query( max_trigger_value=max_trigger_value, per_user_credit_cap=per_user_credit_cap, ) - background_tasks.add_task(query.start) + background_tasks.add_task(query_manager.run_query, query) return {"message": "Process started successfully", "query_id": query_id} diff --git a/sidecar/app/routes/stop.py b/sidecar/app/routes/stop.py index c9f2ebb..11ebc1c 100644 --- a/sidecar/app/routes/stop.py +++ b/sidecar/app/routes/stop.py @@ -1,8 +1,8 @@ -from fastapi import APIRouter +from fastapi import APIRouter, Request -from ..logger import logger from ..query.base import Query from ..query.status import Status +from .http_helpers import get_query_from_query_id router = APIRouter( prefix="/stop", @@ -15,13 +15,13 @@ @router.post("/finish/{query_id}") def finish( query_id: str, + request: Request, ): - query = Query.get_from_query_id(query_id) - if query is None: - return {"message": "Query not found", "query_id": query_id} - logger.info(f"{query=}") + query = get_query_from_query_id(request.app.state.QUERY_MANAGER, Query, query_id) + + query.logger.info(f"{query=}") if query.status < Status.COMPLETE: - logger.info("calling query finish") + query.logger.info("calling query finish") query.finish() return {"message": "Query stopped successfully", "query_id": query_id} return {"message": "Query already complete", "query_id": query_id} @@ -30,11 +30,11 @@ def finish( @router.post("/kill/{query_id}") def kill( query_id: str, + request: Request, ): - logger.info(f"kill called for {query_id=}") - query = Query.get_from_query_id(query_id) - if query is None: - return {"message": "Query not found", "query_id": query_id} + query = get_query_from_query_id(request.app.state.QUERY_MANAGER, Query, query_id) + + query.logger.info(f"kill called for {query_id=}") if query.status < Status.COMPLETE: query.kill() return {"message": "Query killed", "query_id": query_id} diff --git a/sidecar/app/routes/websockets.py b/sidecar/app/routes/websockets.py index 6f4f2aa..d846841 100644 --- a/sidecar/app/routes/websockets.py +++ b/sidecar/app/routes/websockets.py @@ -2,12 +2,11 @@ import time from contextlib import asynccontextmanager -from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect from websockets import ConnectionClosedError, ConnectionClosedOK -from ..logger import logger from ..query.base import Query -from ..query.status import Status +from .http_helpers import get_query_from_query_id router = APIRouter( prefix="/ws", @@ -29,40 +28,40 @@ async def use_websocket(websocket): @router.websocket("/status/{query_id}") -async def status_websocket(websocket: WebSocket, query_id: str): - query = Query.get_from_query_id(query_id) - async with use_websocket(websocket) as websocket: - if query is None: - logger.warning(f"{query_id=} Status: {Status.NOT_FOUND.name}") - await websocket.send_json( - {"status": Status.NOT_FOUND.name, "start_time": time.time()} - ) - else: - while query.running: - logger.debug(f"{query_id=} Status: {query.status.name}") - await websocket.send_json(query.status_event_json) - await asyncio.sleep(1) +async def status_websocket( + websocket: WebSocket, + query_id: str, +): + query_manager = websocket.app.state.QUERY_MANAGER + query = query_manager.get_from_query_id(Query, query_id) + if query is None: + raise HTTPException(status_code=404, detail="Query not found") - logger.debug(f"{query_id=} Status: {query.status.name}") + async with use_websocket(websocket) as websocket: + while query.running: + query.logger.debug(f"{query_id=} Status: {query.status.name}") await websocket.send_json(query.status_event_json) + await asyncio.sleep(1) + + query.logger.debug(f"{query_id=} Status: {query.status.name}") + await websocket.send_json(query.status_event_json) @router.websocket("/logs/{query_id}") -async def logs_websocket(websocket: WebSocket, query_id: str): - query = Query.get_from_query_id(query_id) +async def logs_websocket( + websocket: WebSocket, + query_id: str, +): + query = get_query_from_query_id(websocket.app.state.QUERY_MANAGER, Query, query_id) async with use_websocket(websocket) as websocket: - if query is None: - logger.warning(f"{query_id=} does not exist.") - return - with open(query.log_file_path, "r", encoding="utf8") as log_file: if query.finished: - logger.info(f"{query_id=} complete. sending all logs.") + query.logger.info(f"{query_id=} complete. sending all logs.") for line in log_file: await websocket.send_text(line) else: - logger.info(f"{query_id=} running. tailing log file.") + query.logger.info(f"{query_id=} running. tailing log file.") while query.running: line = log_file.readline() if not line: @@ -74,11 +73,15 @@ async def logs_websocket(websocket: WebSocket, query_id: str): @router.websocket("/stats/{query_id}") -async def stats_websocket(websocket: WebSocket, query_id: str): - query = Query.get_from_query_id(query_id) +async def stats_websocket( + websocket: WebSocket, + query_id: str, +): + query = get_query_from_query_id(websocket.app.state.QUERY_MANAGER, Query, query_id) + async with use_websocket(websocket) as websocket: - if query is None or query.finished: - logger.warning(f"{query_id=} is not running.") + if query.finished: + query.logger.warning(f"{query_id=} is finished.") return while query.running: await websocket.send_json( diff --git a/sidecar/app/settings.py b/sidecar/app/settings.py index fc56eba..912e896 100644 --- a/sidecar/app/settings.py +++ b/sidecar/app/settings.py @@ -1,11 +1,19 @@ +from __future__ import annotations + +import sys +from functools import lru_cache from pathlib import Path -from typing import Annotated, Any +from typing import TYPE_CHECKING, Annotated, Any +from loguru import logger from pydantic.functional_validators import BeforeValidator from pydantic_settings import BaseSettings from .helpers import Helper, Role, load_helpers_from_network_config +if TYPE_CHECKING: + from loguru import Logger + def gen_path(v: Any) -> Path: return Path(v) @@ -18,18 +26,54 @@ class Settings(BaseSettings): role: Role helper_port: int _helpers: dict[Role, Helper] + _logger: "Logger" # underscore prevents Pydantic from attempting to load this def model_post_init(self, __context) -> None: self._helpers = load_helpers_from_network_config(self.network_config_path) + self._logger = logger + self._configure_logger() + + def _configure_logger(self): + self._logger.remove() + max_role_str_len = max(len(role.name) for role in Role) + role_str = f"{self.role.name.replace('_', ' ').title():>{max_role_str_len}}" + + logger_format = ( + "{time:YYYY-MM-DD HH:mm:ss} | " + "{level: <8} | " + "{extra[role]} - {message}" + ) + self._logger.configure(extra={"role": role_str}) + self._logger.add( + sys.stderr, + level="INFO", + format=logger_format, + ) + + @property + def logger(self) -> "Logger": + return self._logger @property - def helper(self): + def helper(self) -> Helper: return self._helpers[self.role] @property - def helpers(self): + def helpers(self) -> dict[Role, Helper]: return self._helpers + @property + def status_dir_path(self) -> Path: + return self.root_path / Path("status") + + @property + def log_dir_path(self) -> Path: + return self.root_path / Path("logs") + -# pyre-ignore: https://pyre-check.org/docs/errors/#dataclass-like-classes -settings = Settings() +@lru_cache +def get_settings(): + settings = Settings() + settings.status_dir_path.mkdir(exist_ok=True) + settings.log_dir_path.mkdir(exist_ok=True) + return settings diff --git a/sidecar/tests/app/query/test_base.py b/sidecar/tests/app/query/test_base.py new file mode 100644 index 0000000..1c26bdd --- /dev/null +++ b/sidecar/tests/app/query/test_base.py @@ -0,0 +1,121 @@ +import os +from pathlib import Path +from unittest import mock +from uuid import uuid4 + +import pytest + +from sidecar.app.query.base import MaxQueriesRunningError, Query, QueryManager +from sidecar.app.query.status import Status + + +@pytest.fixture(autouse=True) +def mock_settings_env_vars(tmp_path): + env_vars = { + "ROLE": "0", + "ROOT_PATH": str(tmp_path), + "CONFIG_PATH": str(Path("local_dev/config")), + "NETWORK_CONFIG_PATH": str(Path("local_dev/config") / Path("network.toml")), + "HELPER_PORT": str(17440), + } + with mock.patch.dict(os.environ, env_vars): + yield + + +def test_query_files(): + query = Query(str(uuid4())) + assert not query.status_file_path.exists() + assert query.log_file_path.exists() + query.status = Status.STARTING + assert query.status_file_path.exists() + + +def test_query_started(): + for status in Status: + query = Query(str(uuid4())) + query.status = status + if status < Status.STARTING: + assert not query.started + else: + assert query.started + + +def test_query_finished(): + for status in Status: + query = Query(str(uuid4())) + query.status = status + if status < Status.COMPLETE: + assert not query.finished + else: + assert query.finished + + +@mock.patch("time.time", return_value=3.14) +def test_query_status_event_json(mock_time): + query = Query(str(uuid4())) + query.status = Status.STARTING + # in Query.status setter, we do two checks, which generate new UNKNOWN events + # within StatusHistory.add, we do two checks which also generate new UNKNOWN events + # finally, it's called for the actual new status setting + assert mock_time.call_count == 5 + assert query.status_event_json == {"status": "STARTING", "start_time": 3.14} + + +def test_query_running(): + for status in Status: + query = Query(str(uuid4())) + query.status = status + if Status.STARTING <= status < Status.COMPLETE: + assert query.running + else: + assert not query.running + + +def test_query_manager(): + query_manager = QueryManager(max_parallel_queries=1) + query = Query(str(uuid4())) + assert query_manager.get_from_query_id(Query, query.query_id) is None + query.status = Status.STARTING + assert query_manager.get_from_query_id(Query, query.query_id) == query + + +def test_query_manager_capacity_available(): + query_manager = QueryManager(max_parallel_queries=1) + assert query_manager.capacity_available + query = Query(str(uuid4())) + query_manager.running_queries[query.query_id] = query + assert not query_manager.capacity_available + del query_manager.running_queries[query.query_id] + assert query_manager.capacity_available + + +def test_query_manger_run_query(): + query_manager = QueryManager(max_parallel_queries=1) + query = Query(str(uuid4())) + + def fake_start(): + assert query.query_id in query_manager.running_queries + + with mock.patch( + "sidecar.app.query.base.Query.start", side_effect=fake_start + ) as mock_start: + query_manager.run_query(query) + mock_start.assert_called_once() + + assert query.query_id not in query_manager.running_queries + + +def test_query_manger_run_query_at_capacity(): + query_manager = QueryManager(max_parallel_queries=1) + query = Query(str(uuid4())) + query2 = Query(str(uuid4())) + + def fake_start(): + with pytest.raises(MaxQueriesRunningError): + query_manager.run_query(query2) + + with mock.patch( + "sidecar.app.query.base.Query.start", side_effect=fake_start + ) as mock_start: + query_manager.run_query(query) + mock_start.assert_called_once() diff --git a/sidecar/tests/app/routes/test_start.py b/sidecar/tests/app/routes/test_start.py new file mode 100644 index 0000000..8f00236 --- /dev/null +++ b/sidecar/tests/app/routes/test_start.py @@ -0,0 +1,156 @@ +from unittest import mock +from uuid import uuid4 + +import pytest +from fastapi.testclient import TestClient + +from sidecar.app.helpers import Role +from sidecar.app.main import app +from sidecar.app.query.status import Status +from sidecar.app.routes.start import IncorrectRoleError, Query +from sidecar.app.settings import get_settings + +client = TestClient(app) + + +@pytest.fixture(name="mock_role") +def _mock_role(): + def __mock_role(role: Role): + settings = get_settings() + settings.role = role + return settings + + return __mock_role + + +@pytest.fixture(name="running_query") +def _running_query(): + query = Query(str(uuid4())) + query_manager = app.state.QUERY_MANAGER + query_manager.running_queries[query.query_id] = query + query.status = Status.STARTING + yield query + del query_manager.running_queries[query.query_id] + + +def test_capacity_available(): + response = client.get("/start/capacity_available") + assert response.status_code == 200 + assert response.json() == {"capacity_available": True} + + +def test_not_capacity_available(running_query): + assert running_query.query_id in app.state.QUERY_MANAGER.running_queries + response = client.get("/start/capacity_available") + assert response.status_code == 200 + assert response.json() == {"capacity_available": False} + + +def test_running_queries(running_query): + response = client.get("/start/running_queries") + assert response.status_code == 200 + assert response.json() == {"running_queries": [running_query.query_id]} + + +def test_start_ipa_helper(mock_role): + settings = mock_role(Role.HELPER_1) + with mock.patch("sidecar.app.routes.start.get_settings", return_value=settings): + with mock.patch( + "sidecar.app.query.base.QueryManager.run_query" + ) as mock_query_manager: + query_id = str(uuid4()) + response = client.post( + f"/start/ipa-helper/{query_id}", + data={ + "commit_hash": "abcd1234", + "gate_type": "compact", + "stall_detection": True, + "multi_threading": True, + "disable_metrics": True, + }, + ) + assert response.status_code == 200 + mock_query_manager.assert_called_once() + + +def test_start_ipa_helper_as_coordinator(mock_role): + settings = mock_role(Role.COORDINATOR) + with pytest.raises(IncorrectRoleError): + with mock.patch("sidecar.app.routes.start.get_settings", return_value=settings): + with mock.patch("sidecar.app.query.base.QueryManager.run_query"): + query_id = str(uuid4()) + client.post( + f"/start/ipa-helper/{query_id}", + data={ + "commit_hash": "abcd1234", + "gate_type": "compact", + "stall_detection": True, + "multi_threading": True, + "disable_metrics": True, + }, + ) + + +def test_start_ipa_query(mock_role): + settings = mock_role(Role.COORDINATOR) + with mock.patch("sidecar.app.routes.start.get_settings", return_value=settings): + with mock.patch( + "sidecar.app.query.base.QueryManager.run_query" + ) as mock_query_manager: + query_id = str(uuid4()) + response = client.post( + f"/start/ipa-query/{query_id}", + data={ + "commit_hash": "abcd1234", + "size": 10, + "max_breakdown_key": 16, + "max_trigger_value": 10, + "per_user_credit_cap": 5, + }, + ) + assert response.status_code == 200 + mock_query_manager.assert_called_once() + + +def test_start_ipa_query_as_helper(mock_role): + settings = mock_role(Role.HELPER_1) + with pytest.raises(IncorrectRoleError): + with mock.patch("sidecar.app.routes.start.get_settings", return_value=settings): + with mock.patch("sidecar.app.query.base.QueryManager.run_query"): + query_id = str(uuid4()) + client.post( + f"/start/ipa-query/{query_id}", + data={ + "commit_hash": "abcd1234", + "size": 10, + "max_breakdown_key": 16, + "max_trigger_value": 10, + "per_user_credit_cap": 5, + }, + ) + + +def test_get_ipa_helper_status_not_found(): + query_id = str(uuid4()) + response = client.get(f"/start/ipa-helper/{query_id}/status") + assert response.status_code == 404 + + +def test_get_ipa_helper_status(running_query): + response = client.get(f"/start/ipa-helper/{running_query.query_id}/status") + assert response.status_code == 200 + assert response.json() == {"status": str(Status.STARTING.name)} + + +def test_get_ipa_helper_log_file_not_found(): + query_id = str(uuid4()) + response = client.get(f"/start/{query_id}/log-file") + assert response.status_code == 404 + + +def test_get_ipa_helper_log_file(running_query): + test_file_content = "log 1\nlog 2\nlog 3\n" + running_query.log_file_path.write_text(test_file_content, encoding="utf-8") + response = client.get(f"/start/{running_query.query_id}/log-file") + assert response.status_code == 200 + assert response.text.startswith(test_file_content)