Skip to content

Commit

Permalink
Add workflow notifier python docker version
Browse files Browse the repository at this point in the history
  • Loading branch information
olaals committed Dec 3, 2024
1 parent bf0009a commit f5062d1
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 0 deletions.
26 changes: 26 additions & 0 deletions workflow-notifier/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM python:3.12-slim


WORKDIR /app

RUN groupadd -r appuser && useradd -r -g appuser appuser

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libpq-dev \
&& pip install --upgrade pip \
&& pip install -r requirements.txt \
&& apt-get remove -y gcc libpq-dev \
&& apt-get autoremove -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

COPY ./main.py /app/main.py

RUN chown -R appuser:appuser /app

USER appuser

CMD ["python", "main.py"]
120 changes: 120 additions & 0 deletions workflow-notifier/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import os
import typer
import requests
from msal import ConfidentialClientApplication

app = typer.Typer()


def get_env_or_fail(var_name: str) -> str:
value = os.getenv(var_name)
if not value:
typer.echo(
f"Error: Environment variable '{var_name}' is not set or is empty.",
err=True,
)
raise typer.Exit(1)
return value


IDA_SERVER_URL = get_env_or_fail("IDA_SERVER_URL")
TENANT_ID = get_env_or_fail("TENANT_ID")
NOTIFIER_CLIENT_ID = get_env_or_fail("NOTIFIER_CLIENT_ID")
NOTIFIER_CLIENT_SECRET = get_env_or_fail("NOTIFIER_CLIENT_SECRET")
IDA_SCOPE = get_env_or_fail("IDA_APP_REG_SCOPE")

SCOPES = [IDA_SCOPE]
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"

# Should preferably only be set to true in local environment.
# Validating the HTTPS certificate can prevent man-in-the-middle attacks.
SKIP_VALIDATE_HTTPS_CERT_IDA = (
os.getenv("SKIP_VALIDATE_HTTPS_CERT_IDA", "").lower() == "true"
)
VALIDATE_HTTPS_CERT_IDA = not SKIP_VALIDATE_HTTPS_CERT_IDA

if SKIP_VALIDATE_HTTPS_CERT_IDA:
typer.echo(
"Warning: Skipping HTTPS certificate validation for IDA server.", err=False
)


def get_access_token() -> str:
"""
Acquire an access token using MSAL.
"""
if not NOTIFIER_CLIENT_ID or not NOTIFIER_CLIENT_SECRET:
typer.echo(
"Error: NOTIFIER_CLIENT_ID or NOTIFIER_CLIENT_SECRET is not set.", err=True
)
raise typer.Exit(1)

app = ConfidentialClientApplication(
client_id=NOTIFIER_CLIENT_ID,
client_credential=NOTIFIER_CLIENT_SECRET,
authority=AUTHORITY,
)
result = app.acquire_token_for_client(scopes=SCOPES)

if result is None:
typer.echo("Error acquiring token: MSAL returned None.", err=True)
raise typer.Exit(1)

if isinstance(result, dict) and "access_token" in result:
return result["access_token"]

error_message = (
f"Error acquiring token: {result.get('error', 'Unknown')} - "
f"{result.get('error_description', 'No description provided')}"
if isinstance(result, dict)
else "Error acquiring token: Unexpected result type."
)
typer.echo(error_message, err=True)
raise typer.Exit(1)


def send_authenticated_put_request(url: str, payload: dict) -> requests.Response:
"""
Send an authenticated PUT request with the access token.
"""
access_token = get_access_token()
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.put(
url, json=payload, headers=headers, verify=VALIDATE_HTTPS_CERT_IDA
)
response.raise_for_status()
return response


@app.command()
def notify_start(inspection_id: str, workflow_name: str):
"""
Notify the server about workflow start.
"""
url = f"{IDA_SERVER_URL}/Workflows/notify-workflow-started"
payload = {"InspectionId": inspection_id, "WorkflowName": workflow_name}
try:
response = send_authenticated_put_request(url, payload)
typer.echo(f"Workflow started successfully: {response.json()}")
except requests.exceptions.RequestException as e:
typer.echo(f"Error notifying workflow start: {e}", err=True)
raise typer.Exit(1)


@app.command()
def notify_exit(inspection_id: str, workflow_status: str):
"""
Notify the server about workflow exit.
"""
url = f"{IDA_SERVER_URL}/Workflows/notify-workflow-exited"
payload = {"InspectionId": inspection_id, "WorkflowStatus": workflow_status}
try:
response = send_authenticated_put_request(url, payload)
typer.echo(f"Workflow exited successfully: {response.json()}")
except requests.exceptions.RequestException as e:
typer.echo(f"Error notifying workflow exit: {e}", err=True)
raise typer.Exit(1)


if __name__ == "__main__":
app()
2 changes: 2 additions & 0 deletions workflow-notifier/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
typer
requests

0 comments on commit f5062d1

Please sign in to comment.