Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task events to the scheduler #2043

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9b73610
Add task events table
jpbruinsslot Nov 14, 2023
24f68f3
Extend database queries and models
jpbruinsslot Nov 15, 2023
a2041eb
Add event store
jpbruinsslot Nov 20, 2023
7514691
Remove test files
jpbruinsslot Nov 20, 2023
91042df
Fix serialization between sqlalchemy and pydantic models
jpbruinsslot Nov 20, 2023
014a80c
Add events endpoint
jpbruinsslot Nov 21, 2023
4aadee8
Implement event endpoints and add tests
jpbruinsslot Nov 22, 2023
bd2a146
Formatting
jpbruinsslot Nov 23, 2023
86167c6
Ignore A002
jpbruinsslot Nov 23, 2023
3214048
Merge branch 'main' into feature/mula/task-events
jpbruinsslot Nov 23, 2023
80663cb
Merge branch 'main' into feature/mula/task-events
jpbruinsslot Nov 23, 2023
2f36177
Remove additional events
jpbruinsslot Nov 23, 2023
36f8246
Differentiate between list of task and individual tasks
jpbruinsslot Nov 27, 2023
b0044cc
Remove postgres trigger
jpbruinsslot Dec 5, 2023
d1572d2
Remove trigger and add tests
jpbruinsslot Dec 6, 2023
cf5fd6b
Update tests
jpbruinsslot Dec 6, 2023
7f3015e
Merge branch 'main' into feature/mula/task-events
jpbruinsslot Dec 6, 2023
ef2c691
Merge branch 'main' into feature/mula/task-events
jpbruinsslot Dec 7, 2023
8ad6a5f
Merge branch 'main' into feature/mula/task-events
jpbruinsslot Dec 7, 2023
eb8b97f
Update and fix tests
jpbruinsslot Dec 7, 2023
97404b5
Formatting
jpbruinsslot Dec 7, 2023
b96f51b
Merge branch 'main' into feature/mula/task-events
underdarknl Dec 11, 2023
d489b48
Combine exceptions
jpbruinsslot Dec 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mula/.ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:

ci_postgres:
image: postgres:15
command: ["postgres", "-c", "log_statement=all", "-c", "log_destination=stderr"]
healthcheck:
test: ["CMD", "gosu", "postgres", "pg_isready"]
interval: 3s
Expand Down
40 changes: 40 additions & 0 deletions mula/scheduler/alembic/versions/0008_create_events_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Create events table

Revision ID: 0008
Revises: 0007
Create Date: 2023-11-14 15:00:00.000000

"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

import scheduler

# revision identifiers, used by Alembic.
revision = "0008"
down_revision = "0007"
branch_labels = None
depends_on = None


def upgrade():
# Add events table
op.create_table(
"events",
sa.Column("id", sa.Integer(), nullable=False, autoincrement=True),
sa.Column("task_id", scheduler.utils.datastore.GUID(), nullable=True),
sa.Column("type", sa.String(), nullable=True),
sa.Column("context", sa.String(), nullable=True),
sa.Column("event", sa.String(), nullable=True),
sa.Column("timestamp", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.Column("data", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.PrimaryKeyConstraint("id"),
)

op.create_index(op.f("ix_events_task_id"), "events", ["task_id"], unique=False)


def downgrade():
# Drop the events table
op.drop_table("events")
4 changes: 4 additions & 0 deletions mula/scheduler/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from scheduler import storage
from scheduler.config import settings
from scheduler.connectors import services
from scheduler.models import TaskDB
from scheduler.utils import remove_trailing_slash


Expand Down Expand Up @@ -83,9 +84,12 @@ def __init__(self) -> None:
**{
storage.TaskStore.name: storage.TaskStore(dbconn),
storage.PriorityQueueStore.name: storage.PriorityQueueStore(dbconn),
storage.EventStore.name: storage.EventStore(dbconn),
}
)

TaskDB.set_event_store(self.datastores.event_store)

# Metrics collector registry
self.metrics_registry: CollectorRegistry = CollectorRegistry()

Expand Down
4 changes: 2 additions & 2 deletions mula/scheduler/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .base import Base
from .boefje import Boefje, BoefjeMeta
from .events import RawData, RawDataReceivedEvent
from .events import Event, EventDB, RawData, RawDataReceivedEvent
from .health import ServiceHealth
from .normalizer import Normalizer
from .ooi import OOI, MutationOperationType, ScanProfile, ScanProfileMutation
from .organisation import Organisation
from .plugin import Plugin
from .queue import PrioritizedItem, PrioritizedItemDB, Queue
from .scheduler import Scheduler
from .tasks import BoefjeTask, NormalizerTask, Task, TaskDB, TaskStatus
from .tasks import BoefjeTask, NormalizerTask, Task, TaskDB, TaskList, TaskStatus
53 changes: 51 additions & 2 deletions mula/scheduler/models/events.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,60 @@
from datetime import datetime
import uuid
from datetime import datetime, timezone

from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.schema import Index
from sqlalchemy.sql import func

from scheduler.utils import GUID

from .base import Base
from .raw_data import RawData


class RawDataReceivedEvent(BaseModel):
created_at: datetime
organization: str
raw_data: RawData


class Event(BaseModel):
model_config = ConfigDict(from_attributes=True)

task_id: uuid.UUID

type: str

context: str

event: str

timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

data: dict


class EventDB(Base):
__tablename__ = "events"

id = Column(Integer, primary_key=True)

task_id = Column(GUID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a foreign key to the task table.


type = Column(String)

context = Column(String)

event = Column(String)

timestamp = Column(DateTime(timezone=True), nullable=False, server_default=func.now())

data = Column(JSONB, nullable=False)

__table_args__ = (
Index(
"ix_events_task_id",
task_id,
),
)
58 changes: 58 additions & 0 deletions mula/scheduler/models/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Column, DateTime, Enum, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.schema import Index
from sqlalchemy.sql import func
from sqlalchemy.sql.expression import text
Expand Down Expand Up @@ -44,6 +45,27 @@ class TaskStatus(str, enum.Enum):
CANCELLED = "cancelled"


class TaskList(BaseModel):
model_config = ConfigDict(from_attributes=True)

id: uuid.UUID

scheduler_id: str

type: str

p_item: PrioritizedItem

status: TaskStatus

created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

def __repr__(self):
return f"Task(id={self.id}, scheduler_id={self.scheduler_id}, type={self.type}, status={self.status})"


class Task(BaseModel):
model_config = ConfigDict(from_attributes=True)

Expand All @@ -57,13 +79,22 @@ class Task(BaseModel):

status: TaskStatus

duration: Optional[float] = Field(None, alias="duration", readonly=True)

queued: Optional[float] = Field(None, alieas="queued", readonly=True)

runtime: Optional[float] = Field(None, alias="runtime", readonly=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type should be timedelta instead of float.


created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

def __repr__(self):
return f"Task(id={self.id}, scheduler_id={self.scheduler_id}, type={self.type}, status={self.status})"

def model_dump_db(self):
return self.model_dump(exclude={"duration", "queued", "runtime"})


class TaskDB(Base):
__tablename__ = "tasks"
Expand Down Expand Up @@ -103,6 +134,33 @@ class TaskDB(Base):
),
)

_event_store = None

@classmethod
def set_event_store(cls, event_store):
cls._event_store = event_store

@hybrid_property
def duration(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_duration(self.id)

@hybrid_property
def queued(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_queued(self.id)

@hybrid_property
def runtime(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_runtime(self.id)


class NormalizerTask(BaseModel):
"""NormalizerTask represent data needed for a Normalizer to run."""
Expand Down
28 changes: 26 additions & 2 deletions mula/scheduler/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def post_push(self, p_item: models.PrioritizedItem) -> None:
Args:
p_item: The prioritized item from the priority queue.
"""
# Create task
#
# NOTE: we set the id of the task the same as the p_item, for easier
# lookup.
task = models.Task(
Expand All @@ -101,16 +103,28 @@ def post_push(self, p_item: models.PrioritizedItem) -> None:
modified_at=datetime.now(timezone.utc),
)

# Create event
event = models.Event(
task_id=task.id,
type="events.db",
context="task",
event="insert",
data=task.model_dump(),
)

task_db = self.ctx.datastores.task_store.get_task_by_id(str(p_item.id))
if task_db is not None:
event.event = "update"
self.ctx.datastores.task_store.update_task(task)
self.ctx.datastores.event_store.create_event(event)
return

self.ctx.datastores.task_store.create_task(task)
self.ctx.datastores.event_store.create_event(event)

def post_pop(self, p_item: models.PrioritizedItem) -> None:
"""When a boefje task is being removed from the queue. We
persist a task to the datastore with the status RUNNING
persist a task to the datastore with the status DISPATCHED.

Args:
p_item: The prioritized item from the priority queue.
Expand All @@ -127,10 +141,20 @@ def post_pop(self, p_item: models.PrioritizedItem) -> None:
)
return None

# Update task
task.status = models.TaskStatus.DISPATCHED
self.ctx.datastores.task_store.update_task(task)

return None
# Create event
event = models.Event(
task_id=task.id,
type="events.db",
context="task",
event="update",
data=task.model_dump(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will result in a very big event table, because the task model is pretty big already and this will be duplicated unnecessarily multiple times in the events table. I don't think this is a good idea with regards to performance and resource usage.

)

self.ctx.datastores.event_store.create_event(event)

def pop_item_from_queue(
self, filters: Optional[storage.filters.FilterRequest] = None
Expand Down
Loading