Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional events support for tasks #2052

Closed
wants to merge 11 commits into from
40 changes: 40 additions & 0 deletions mula/scheduler/alembic/versions/0008_create_events_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Create events table

Revision ID: 0008
Revises: 0007
Create Date: 2023-11-14 15:00:00.000000

"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

import scheduler

# revision identifiers, used by Alembic.
revision = "0008"
down_revision = "0007"
branch_labels = None
depends_on = None


def upgrade():
# Add events table
op.create_table(
"events",
sa.Column("id", sa.Integer(), nullable=False, autoincrement=True),
sa.Column("task_id", scheduler.utils.datastore.GUID(), nullable=True),
sa.Column("type", sa.String(), nullable=True),
sa.Column("context", sa.String(), nullable=True),
sa.Column("event", sa.String(), nullable=True),
sa.Column("timestamp", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.Column("data", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.PrimaryKeyConstraint("id"),
)

op.create_index(op.f("ix_events_task_id"), "events", ["task_id"], unique=False)


def downgrade():
# Drop the events table
op.drop_table("events")
59 changes: 59 additions & 0 deletions mula/scheduler/alembic/versions/0009_add_task_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Add tasks trigger

Revision ID: 0009
Revises: 0008
Create Date: 2023-11-14 15:00:00.000000

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "0009"
down_revision = "0008"
branch_labels = None
depends_on = None


def upgrade():
# Create the record_event function
op.execute(
sa.DDL(
"""
CREATE OR REPLACE FUNCTION record_event()
RETURNS TRIGGER AS
$$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO events (task_id, type, context, event, data)
VALUES (NEW.id, 'events.db', 'task', 'insert', row_to_json(NEW));
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO events (task_id, type, context, event, data)
VALUES (NEW.id, 'events.db', 'task', 'update', row_to_json(NEW));
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
)
)

# Create the triggers
op.execute(
sa.DDL(
"""
CREATE TRIGGER tasks_insert_update_trigger
AFTER INSERT OR UPDATE ON tasks
FOR EACH ROW
EXECUTE FUNCTION record_event();
"""
)
)


def downgrade():
# Drop the record_event function
op.execute(sa.DDL("DROP FUNCTION IF EXISTS record_event()"))

# Drop the trigger
op.execute(sa.DDL("DROP TRIGGER IF EXISTS tasks_insert_update_trigger ON tasks"))
4 changes: 4 additions & 0 deletions mula/scheduler/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from scheduler import storage
from scheduler.config import settings
from scheduler.connectors import services
from scheduler.models import TaskDB
from scheduler.utils import remove_trailing_slash


Expand Down Expand Up @@ -83,9 +84,12 @@ def __init__(self) -> None:
**{
storage.TaskStore.name: storage.TaskStore(dbconn),
storage.PriorityQueueStore.name: storage.PriorityQueueStore(dbconn),
storage.EventStore.name: storage.EventStore(dbconn),
}
)

TaskDB.set_event_store(self.datastores.event_store)

# Metrics collector registry
self.metrics_registry: CollectorRegistry = CollectorRegistry()

Expand Down
2 changes: 1 addition & 1 deletion mula/scheduler/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .base import Base
from .boefje import Boefje, BoefjeMeta
from .events import RawData, RawDataReceivedEvent
from .events import Event, EventDB, RawData, RawDataReceivedEvent
from .health import ServiceHealth
from .normalizer import Normalizer
from .ooi import OOI, MutationOperationType, ScanProfile, ScanProfileMutation
Expand Down
53 changes: 51 additions & 2 deletions mula/scheduler/models/events.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,60 @@
from datetime import datetime
import uuid
from datetime import datetime, timezone

from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.schema import Index
from sqlalchemy.sql import func

from scheduler.utils import GUID

from .base import Base
from .raw_data import RawData


class RawDataReceivedEvent(BaseModel):
created_at: datetime
organization: str
raw_data: RawData


class Event(BaseModel):
model_config = ConfigDict(from_attributes=True)

task_id: uuid.UUID

type: str

context: str

event: str

timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

data: dict


class EventDB(Base):
__tablename__ = "events"

id = Column(Integer, primary_key=True)

task_id = Column(GUID)

type = Column(String)

context = Column(String)

event = Column(String)

timestamp = Column(DateTime(timezone=True), nullable=False, server_default=func.now())

data = Column(JSONB, nullable=False)

__table_args__ = (
Index(
"ix_events_task_id",
task_id,
),
)
149 changes: 127 additions & 22 deletions mula/scheduler/models/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

import mmh3
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Column, DateTime, Enum, String
from sqlalchemy import DDL, Column, DateTime, Enum, String, event
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.schema import Index
from sqlalchemy.sql import func
from sqlalchemy.sql.expression import text
Expand Down Expand Up @@ -44,27 +45,6 @@ class TaskStatus(str, enum.Enum):
CANCELLED = "cancelled"


class Task(BaseModel):
model_config = ConfigDict(from_attributes=True)

id: uuid.UUID

scheduler_id: str

type: str

p_item: PrioritizedItem

status: TaskStatus

created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

def __repr__(self):
return f"Task(id={self.id}, scheduler_id={self.scheduler_id}, type={self.type}, status={self.status})"


class TaskDB(Base):
__tablename__ = "tasks"

Expand Down Expand Up @@ -103,6 +83,99 @@ class TaskDB(Base):
),
)

_event_store = None

@classmethod
def set_event_store(cls, event_store):
cls._event_store = event_store

@hybrid_property
def duration(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_duration(self.id)

@hybrid_property
def queued(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_queued(self.id)

@hybrid_property
def runtime(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_runtime(self.id)

@hybrid_property
def cpu(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_cpu(self.id)

@hybrid_property
def memory(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_memory(self.id)

@hybrid_property
def disk(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_disk(self.id)

@hybrid_property
def network(self) -> float:
if self._event_store is None:
raise ValueError("EventStore instance is not set. Use TaskDB.set_event_store to set it.")

return self._event_store.get_task_network(self.id)


class Task(BaseModel):
model_config = ConfigDict(from_attributes=True)

id: uuid.UUID

scheduler_id: str

type: str

p_item: PrioritizedItem

status: TaskStatus

duration: Optional[float] = Field(None, alias="duration", readonly=True)

queued: Optional[float] = Field(None, alieas="queued", readonly=True)

runtime: Optional[float] = Field(None, alias="runtime", readonly=True)

cpu: Optional[float] = Field(None, alias="cpu", readonly=True)

memory: Optional[float] = Field(None, alias="memory", readonly=True)

disk: Optional[float] = Field(None, alias="disk", readonly=True)

network: Optional[float] = Field(None, alias="network", readonly=True)

created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

def __repr__(self):
return f"Task(id={self.id}, scheduler_id={self.scheduler_id}, type={self.type}, status={self.status})"

def model_dump_db(self):
return self.model_dump(exclude={"duration", "queued", "runtime", "cpu", "memory", "disk", "network"})


class NormalizerTask(BaseModel):
"""NormalizerTask represent data needed for a Normalizer to run."""
Expand Down Expand Up @@ -144,3 +217,35 @@ def hash(self) -> str:
return mmh3.hash_bytes(f"{self.input_ooi}-{self.boefje.id}-{self.organization}").hex()

return mmh3.hash_bytes(f"{self.boefje.id}-{self.organization}").hex()


func_record_event = DDL(
"""
CREATE OR REPLACE FUNCTION record_event()
RETURNS TRIGGER AS
$$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO events (task_id, type, context, event, data)
VALUES (NEW.id, 'events.db', 'task', 'insert', row_to_json(NEW));
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO events (task_id, type, context, event, data)
VALUES (NEW.id, 'events.db', 'task', 'update', row_to_json(NEW));
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
)

trigger_tasks_insert_update = DDL(
"""
CREATE TRIGGER tasks_insert_update_trigger
AFTER INSERT OR UPDATE ON tasks
FOR EACH ROW
EXECUTE FUNCTION record_event();
"""
)

event.listen(TaskDB.__table__, "after_create", func_record_event.execute_if(dialect="postgresql"))
event.listen(TaskDB.__table__, "after_create", trigger_tasks_insert_update.execute_if(dialect="postgresql"))
Loading