Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query manager #72

Merged
merged 15 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ repos:
entry: coverage report
types: [python]
pass_filenames: false
args: [--fail-under=9] # increase this over time
args: [--fail-under=60] # increase this over time, goal=80
- id: pyre-check
name: pyre-check
entry: pyre check
Expand Down
38 changes: 25 additions & 13 deletions server/app/alert.tsx
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import Link from "next/link";
import { CheckCircleIcon, XMarkIcon } from "@heroicons/react/20/solid";
import {
CheckCircleIcon,
ExclamationTriangleIcon,
} from "@heroicons/react/20/solid";

export default function QueryStartedAlert({ queryId }: { queryId: string }) {
export function QueryStartedAlert({ queryId }: { queryId: string }) {
return (
<div className="rounded-md bg-green-50 p-4">
<div className="-mt-16 mb-4 rounded-md bg-green-50 p-4">
<div className="flex">
<div className="shrink-0">
<CheckCircleIcon
Expand All @@ -19,16 +22,25 @@ export default function QueryStartedAlert({ queryId }: { queryId: string }) {
</Link>.{" "}
</p>
</div>
<div className="ml-auto pl-3">
<div className="-m-1.5">
<button
type="button"
className="inline-flex rounded-md bg-green-50 p-1.5 text-green-500 hover:bg-green-100 focus:outline-none focus:ring-2 focus:ring-green-600 focus:ring-offset-2 focus:ring-offset-green-50"
>
<span className="sr-only">Dismiss</span>
<XMarkIcon className="size-5" aria-hidden="true" />
</button>
</div>
</div>
</div>
);
}

export function QueryFailedToStartAlert({ queryId }: { queryId: string }) {
return (
<div className="-mt-16 mb-4 rounded-md bg-red-50 p-4">
<div className="flex">
<div className="shrink-0">
<ExclamationTriangleIcon
className="size-5 text-red-400"
aria-hidden="true"
/>
</div>
<div className="ml-3">
<p className="text-sm font-medium text-red-800">
Failed to start a query: {queryId}.
</p>
</div>
</div>
</div>
Expand Down
19 changes: 16 additions & 3 deletions server/app/query/create/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from "@heroicons/react/20/solid";
import { useRouter } from "next/navigation";
import { ExclamationCircleIcon } from "@heroicons/react/20/solid";
import QueryStartedAlert from "@/app/alert";
import { QueryStartedAlert, QueryFailedToStartAlert } from "@/app/alert";
import {
DemoLoggerRemoteServers,
IPARemoteServers,
Expand All @@ -23,6 +23,9 @@ type QueryType = Database["public"]["Enums"]["query_type"];

export default function Page() {
const [queryId, setQueryId] = useState<string | null>(null);
const [querySubmitSuccess, setQuerySubmitSuccess] = useState<boolean | null>(
null,
);
const router = useRouter();

const handleFormSubmit = async (
Expand All @@ -43,9 +46,14 @@ export default function Page() {
method: "POST",
body: params,
});
const _data = await response.json();
if (!response.ok) {
setQuerySubmitSuccess(false);
const error_message = await response.text();
throw new Error(error_message);
}
}

setQuerySubmitSuccess(true);
await new Promise((f) => setTimeout(f, 1000));

// Redirect to /query/view/<newQueryId>
Expand All @@ -67,7 +75,12 @@ export default function Page() {

return (
<>
{queryId && <QueryStartedAlert queryId={queryId} />}
{queryId && querySubmitSuccess === true && (
<QueryStartedAlert queryId={queryId} />
)}
{queryId && querySubmitSuccess === false && (
<QueryFailedToStartAlert queryId={queryId} />
)}
<div className="md:flex md:items-start md:justify-between">
<DemoLogsForm handleDemoLogsFormSubmit={handleDemoLogsFormSubmit} />
<IPAForm handleIPAFormSubmit={handleIPAFormSubmit} />
Expand Down
2 changes: 1 addition & 1 deletion server/app/query/layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export default function RootLayout({
children: React.ReactNode;
}) {
return (
<div className="mx-auto min-h-full max-w-7xl bg-slate-100 py-10 sm:px-6 lg:px-8 dark:bg-slate-900">
<div className="mx-auto min-h-full max-w-7xl bg-slate-100 py-20 sm:px-6 lg:px-8 dark:bg-slate-900">
{children}
</div>
);
Expand Down
22 changes: 0 additions & 22 deletions sidecar/app/logger.py

This file was deleted.

2 changes: 2 additions & 0 deletions sidecar/app/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from .query.base import QueryManager
from .routes import start, stop, websockets

app = FastAPI()
app.state.QUERY_MANAGER = QueryManager(max_parallel_queries=1)
app.include_router(websockets.router)
app.include_router(start.router)
app.include_router(stop.router)
Expand Down
125 changes: 79 additions & 46 deletions sidecar/app/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,69 @@
import loguru

from ..helpers import Role
from ..logger import logger
from ..settings import settings
from ..settings import get_settings
from .status import Status, StatusHistory
from .step import Step

# Dictionary to store queries
queries: dict[str, "Query"] = {}


class QueryExistsError(Exception):
pass


def status_file_path(query_id: str) -> Path:
settings = get_settings()
return settings.status_dir_path / Path(query_id)
akoshelev marked this conversation as resolved.
Show resolved Hide resolved


def log_file_path(query_id: str) -> Path:
settings = get_settings()
return settings.log_dir_path / Path(query_id)


@dataclass
class Query:
"""
Query is the base class, used to implement a list of steps to be run by this server.
The server has a role, obtained from get_settings().

Steps implement a `build_from_query` method,
which allows them to utilize data stored on the query.
"""

# pylint: disable=too-many-instance-attributes
query_id: str
current_step: Optional[Step] = field(init=False, default=None, repr=True)
logger: loguru.Logger = field(init=False, repr=False)
_logger_id: int = field(init=False, repr=False)
logger: loguru.Logger = field(init=False, repr=False, compare=False)
Copy link
Collaborator

@cberkhoff cberkhoff Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use @dataclass if we need to add all these exceptions for each of the fields?
Why not just a class?

Copy link
Member Author

@eriktaubeneck eriktaubeneck Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dataclass is just a wrapper that does some codegen to build standard methods like __init__ and __compare__. These are easy ways to define what goes in that codegen. It looks verbose, but it's much cleaner than defining all those manually.

_logger_id: int = field(init=False, repr=False, compare=False)
role: Role = field(init=False, repr=True)
_status_history: StatusHistory = field(init=False, repr=True)
step_classes: ClassVar[list[type[Step]]] = []

def __post_init__(self):
self.logger = logger.bind(task=self.query_id)
status_dir = settings.root_path / Path("status")
status_dir.mkdir(exist_ok=True)
status_file_path = status_dir / Path(f"{self.query_id}")
self._status_history = StatusHistory(file_path=status_file_path, logger=logger)

self._log_dir.mkdir(exist_ok=True)
self._logger_id = logger.add(
settings = get_settings()

self.logger = settings.logger.bind(task=self.query_id)
self.role = settings.role

self._status_history = StatusHistory(
file_path=self.status_file_path, logger=self.logger
)

self._logger_id = self.logger.add(
self.log_file_path,
serialize=True,
filter=lambda record: record["extra"].get("task") == self.query_id,
enqueue=True,
)
self.logger.debug(f"adding new Query {self}.")
if queries.get(self.query_id) is not None:
raise QueryExistsError(f"{self.query_id} already exists")
queries[self.query_id] = self

@property
def _log_dir(self) -> Path:
return settings.root_path / Path("logs")
def status_file_path(self) -> Path:
return status_file_path(self.query_id)

@property
def role(self) -> Role:
return settings.role
def log_file_path(self) -> Path:
return log_file_path(self.query_id)

@property
def started(self) -> bool:
Expand All @@ -65,23 +80,6 @@ def started(self) -> bool:
def finished(self) -> bool:
return self.status >= Status.COMPLETE

@classmethod
def get_from_query_id(cls, query_id) -> Optional["Query"]:
query = queries.get(query_id)
if query:
return query
try:
query = cls(query_id)
except QueryExistsError as e:
# avoid race condition on queries
query = queries.get(query_id)
if query:
return query
raise e
if query.status == Status.UNKNOWN:
return None
return query

@property
def status(self) -> Status:
return self._status_history.current_status
Expand All @@ -99,10 +97,6 @@ def status_event_json(self):
def running(self):
return self.started and not self.finished

@property
def log_file_path(self) -> Path:
return self._log_dir / Path(f"{self.query_id}.log")

@property
def steps(self) -> Iterable[Step]:
for step_class in self.step_classes:
Expand Down Expand Up @@ -154,11 +148,9 @@ def crash(self):
def _cleanup(self):
self.current_step = None
try:
logger.remove(self._logger_id)
self.logger.remove(self._logger_id)
except ValueError:
pass
if queries.get(self.query_id) is not None:
del queries[self.query_id]

@property
def cpu_usage_percent(self) -> float:
Expand All @@ -174,3 +166,44 @@ def memory_rss_usage(self) -> int:


QueryTypeT = TypeVar("QueryTypeT", bound=Query)


class MaxQueriesRunningError(Exception):
pass


@dataclass
class QueryManager:
eriktaubeneck marked this conversation as resolved.
Show resolved Hide resolved
"""
The QueryManager allows for a fixed number of queries to run at once,
and stores those queries in a dictionary.

Accessing running queries allows the finish and kill methods to be called
from another caller (typically a route handler in the HTTP layer.
"""

max_parallel_queries: int = field(init=True, repr=False, default=1)
running_queries: dict[str, Query] = field(
init=False, repr=True, default_factory=dict
)

def get_from_query_id(self, cls, query_id: str) -> Optional[Query]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have self and cls in the same function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cls is poorly named, let me fix that. it's meant to be the class of the Query that we are getting.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which are the different classes of Query?

if query_id in self.running_queries:
return self.running_queries[query_id]
if status_file_path(query_id).exists():
return cls(query_id)
return None

def run_query(self, query: Query):
if not self.capacity_available:
raise MaxQueriesRunningError(
f"Only {self.max_parallel_queries} allowed. Currently running {self}"
)

self.running_queries[query.query_id] = query
query.start()
del self.running_queries[query.query_id]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is query.start() a blocking call? otherwise I am not following this.

If it is a blocking call, should we use try here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's blocking. It's defined here, and it's already wrapped in a try/except that handles crashes (and killing the other helpers when 1 crashes.)


@property
def capacity_available(self):
return len(self.running_queries) < self.max_parallel_queries
5 changes: 4 additions & 1 deletion sidecar/app/query/ipa.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from ..helpers import Role
from ..local_paths import Paths
from ..settings import settings
from ..settings import get_settings
from .base import Query
from .command import FileOutputCommand, LoggerOutputCommand
from .step import CommandStep, LoggerOutputCommandStep, Status, Step
Expand All @@ -30,6 +30,7 @@ class IPAQuery(Query):

def send_kill_signals(self):
self.logger.info("sending kill signals")
settings = get_settings()
for helper in settings.helpers.values():
if helper.role == self.role:
continue
Expand Down Expand Up @@ -241,6 +242,7 @@ def build_from_query(cls, query: IPAQuery):
)

def run(self):
settings = get_settings()
sidecar_urls = [
helper.sidecar_url
for helper in settings.helpers.values()
Expand Down Expand Up @@ -339,6 +341,7 @@ class IPACoordinatorQuery(IPAQuery):

def send_terminate_signals(self):
self.logger.info("sending terminate signals")
settings = get_settings()
for helper in settings.helpers.values():
if helper.role == self.role:
continue
Expand Down
6 changes: 4 additions & 2 deletions sidecar/app/query/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass, field
from enum import IntEnum, auto
from pathlib import Path
from typing import NamedTuple
from typing import NamedTuple, Optional

import loguru

Expand Down Expand Up @@ -51,7 +51,9 @@ def locking_status(self):
"""Cannot add to history after this or higher status is reached"""
return Status.COMPLETE

def add(self, status: Status, timestamp: float = time.time()):
def add(self, status: Status, timestamp: Optional[float] = None):
if timestamp is None:
timestamp = time.time()
assert status > self.current_status
assert self.current_status < self.locking_status
self._status_history.append(
Expand Down
Loading
Loading