Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check to make sure helpers need to be killed before sending kill signal #89

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ py-modules = ["sidecar",]
profile = "black"
py_version=39
skip_glob=["server"]
extra_standard_library = ["tomllib"]

eriktaubeneck marked this conversation as resolved.
Show resolved Hide resolved

[tool.pylint.format]
max-line-length = "88"
Expand Down
69 changes: 67 additions & 2 deletions sidecar/app/helpers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import tomllib
from dataclasses import dataclass
from enum import IntEnum
from json import JSONDecodeError
from pathlib import Path
from urllib.parse import ParseResult, urlparse
from urllib.parse import ParseResult, urlparse, urlunparse

import tomllib
import httpx
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.x509 import load_pem_x509_certificate

from .query.step import Status


class Role(IntEnum):
COORDINATOR = 0
Expand All @@ -22,6 +26,67 @@ class Helper:
sidecar_url: ParseResult
public_key: EllipticCurvePublicKey

def query_status_url(self, query_id: str) -> str:
return str(
urlunparse(
self.sidecar_url._replace(
scheme="https", path=f"/start/{query_id}/status"
),
)
)
eriktaubeneck marked this conversation as resolved.
Show resolved Hide resolved

def query_finish_url(self, query_id: str) -> str:
return str(
urlunparse(
self.sidecar_url._replace(
scheme="https", path=f"/stop/finish/{query_id}"
),
)
)

def query_kill_url(self, query_id: str) -> str:
return str(
urlunparse(
self.sidecar_url._replace(
scheme="https", path=f"/stop/kill/{query_id}"
),
)
)

def get_current_query_status(self, query_id: str) -> Status:
try:
r = httpx.get(self.query_status_url(query_id))
except httpx.RequestError:
return Status.UNKNOWN
try:
j = r.json()
except JSONDecodeError:
return Status.UNKNOWN

return Status.from_json(j)

def kill_query(self, query_id: str) -> str:
status = self.get_current_query_status(query_id)
if status >= Status.COMPLETE:
return (
f"not sending kill signal. helper {self.role} "
f"already has status {status}"
)
r = httpx.post(self.query_kill_url(query_id))
eriktaubeneck marked this conversation as resolved.
Show resolved Hide resolved
return f"sent kill signal for query({query_id}) to helper {self.role}: {r.text}"

def finish_query(self, query_id: str) -> str:
status = self.get_current_query_status(query_id)
if status >= Status.COMPLETE:
return (
f"not sending finish signal. helper {self.role} "
f"already has status {status}"
)
r = httpx.post(self.query_finish_url(query_id))
eriktaubeneck marked this conversation as resolved.
Show resolved Hide resolved
return (
f"sent finish signal for query({query_id}) to helper {self.role}: {r.text}"
)


def load_helpers_from_network_config(network_config_path: Path) -> dict[Role, Helper]:
with network_config_path.open("rb") as f:
Expand Down
78 changes: 29 additions & 49 deletions sidecar/app/query/ipa.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
from enum import StrEnum
from pathlib import Path
from typing import ClassVar
from urllib.parse import urlunparse

import httpx
import loguru

from ..helpers import Role
from ..local_paths import Paths
from ..settings import get_settings
from .base import Query
Expand All @@ -31,17 +28,9 @@ class IPAQuery(Query):
def send_kill_signals(self):
self.logger.info("sending kill signals")
settings = get_settings()
for helper in settings.helpers.values():
if helper.role == self.role:
continue
finish_url = urlunparse(
helper.sidecar_url._replace(
scheme="https", path=f"/stop/kill/{self.query_id}"
),
)

r = httpx.post(finish_url)
self.logger.info(f"sent post request: {r.text}")
for helper in settings.other_helpers:
response = helper.kill_query(self.query_id)
self.logger.info(response)

def crash(self):
super().crash()
Expand Down Expand Up @@ -248,32 +237,31 @@ def build_from_query(cls, query: IPAQuery):

def run(self):
settings = get_settings()
sidecar_urls = [
helper.sidecar_url
for helper in settings.helpers.values()
if helper.role != Role.COORDINATOR
]
for sidecar_url in sidecar_urls:
url = urlunparse(
sidecar_url._replace(
scheme="https", path=f"/start/{self.query_id}/status"
),
)
for helper in settings.other_helpers:
max_unknonwn_status_wait_time = 100
current_unknown_status_wait_time = 0
loop_wait_time = 1
while True:
r = httpx.get(url).json()
status = r.get("status")
status = helper.get_current_query_status(self.query_id)
match status:
case Status.IN_PROGRESS.name:
case Status.IN_PROGRESS:
break
case Status.KILLED.name:
self.success = False
return
case Status.NOT_FOUND.name:
self.success = False
return
case Status.CRASHED.name:
case Status.KILLED | Status.NOT_FOUND | Status.CRASHED:
self.success = False
return
case Status.STARTING | Status.COMPILING | Status.WAITING_TO_START:
# keep waiting while it's in a startup state
continue
case Status.UNKNOWN | Status.NOT_FOUND:
# eventually fail if the status is unknown or not found
# for ~100 seconds
current_unknown_status_wait_time += loop_wait_time
if (
current_unknown_status_wait_time
>= max_unknonwn_status_wait_time
):
self.success = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would that inform the UI that we failed to get a "good" status from helper and this is the reason why we couldn't start a query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a Step finishes with self.success = False, it causes the Query to crash. The UI will show the report collector as crashed, and in the logs we'll see that the IPACoordinatorWaitForHelpersStep failed.

return

time.sleep(1)
time.sleep(3) # allow enough time for the command to start
Expand Down Expand Up @@ -352,24 +340,16 @@ class IPACoordinatorQuery(IPAQuery):
IPACoordinatorStartStep,
]

def send_terminate_signals(self):
self.logger.info("sending terminate signals")
def send_finish_signals(self):
self.logger.info("sending finish signals")
settings = get_settings()
for helper in settings.helpers.values():
if helper.role == self.role:
continue
finish_url = urlunparse(
helper.sidecar_url._replace(
scheme="https", path=f"/stop/finish/{self.query_id}"
),
)

r = httpx.post(finish_url)
self.logger.info(f"sent post request: {finish_url}: {r.text}")
for helper in settings.other_helpers:
resp = helper.finish_query(self.query_id)
self.logger.info(resp)

def finish(self):
super().finish()
self.send_terminate_signals()
self.send_finish_signals()


@dataclass(kw_only=True)
Expand Down
8 changes: 8 additions & 0 deletions sidecar/app/query/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ class Status(IntEnum):
KILLED = auto()
CRASHED = auto()

@classmethod
def from_json(cls, response: dict[str, str]):
status_str = response.get("status", "")
try:
return cls[status_str]
except KeyError:
return cls.UNKNOWN


StatusChangeEvent = NamedTuple(
"StatusChangeEvent", [("status", Status), ("timestamp", float)]
Expand Down
4 changes: 4 additions & 0 deletions sidecar/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def helper(self) -> Helper:
def helpers(self) -> dict[Role, Helper]:
return self._helpers

@property
def other_helpers(self) -> list[Helper]:
return [helper for helper in self._helpers.values() if helper.role != self.role]

@property
def status_dir_path(self) -> Path:
return self.root_path / Path("status")
Expand Down
14 changes: 14 additions & 0 deletions sidecar/tests/app/query/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,17 @@ def test_status_history_status_event_json(
"start_time": now,
"end_time": now2,
}


@pytest.mark.parametrize(
"json_input,expected_status",
[
({"status": "STARTING"}, Status.STARTING),
({"status": "UNKNOWN"}, Status.UNKNOWN),
({"status": "not-a-status"}, Status.UNKNOWN),
({}, Status.UNKNOWN), # Empty JSON
({"other_key": "value"}, Status.UNKNOWN), # Missing "status" key
],
)
def test_status_from_json(json_input, expected_status):
assert Status.from_json(json_input) == expected_status
Loading