Skip to content

Commit

Permalink
Merge pull request #177 from ImperialCollegeLondon/ers_msg
Browse files Browse the repository at this point in the history
ERS Kafka message topic
  • Loading branch information
cc-a authored Oct 30, 2024
2 parents cb8dd07 + 51b56f1 commit 99163d2
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 24 deletions.
7 changes: 7 additions & 0 deletions drunc_ui/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@

KAFKA_ADDRESS = os.getenv("KAFKA_ADDRESS", "kafka:9092")

KAFKA_TOPIC_REGEX = {
# PROCMAN matches topics of the form "control.<session>.process_manager".
"PROCMAN": "^control\..+\.process_manager$",
# ERS matches only the topic "erskafka-reporting".
"ERS": "^erskafka-reporting$",
}

MESSAGE_EXPIRE_SECS = float(os.getenv("MESSAGE_EXPIRE_SECS", 1800))

django_stubs_ext.monkeypatch()
22 changes: 10 additions & 12 deletions main/management/commands/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,34 @@ def add_arguments(self, parser: ArgumentParser) -> None:
def handle(self, debug: bool = False, **kwargs: Any) -> None: # type: ignore[misc]
"""Command business logic."""
consumer = KafkaConsumer(bootstrap_servers=[settings.KAFKA_ADDRESS])
consumer.subscribe(pattern="control.*.process_manager")
consumer.subscribe(pattern=f"({'|'.join(settings.KAFKA_TOPIC_REGEX.values())})")
# TODO: determine why the below doesn't work
# consumer.subscribe(pattern="control.no_session.process_manager")

self.stdout.write("Listening for messages from Kafka.")
while True:
for messages in consumer.poll(timeout_ms=500).values():
message_timestamps = []
message_bodies = []
message_records = []

for message in messages:
if debug:
self.stdout.write(f"Message received: {message}")
self.stdout.flush()

# Convert Kafka timestamp (milliseconds) to datetime (seconds).
timestamp = datetime.fromtimestamp(message.timestamp / 1e3, tz=UTC)
message_timestamps.append(timestamp)
time = datetime.fromtimestamp(message.timestamp / 1e3, tz=UTC)

bm = BroadcastMessage()
bm.ParseFromString(message.value)
message_bodies.append(bm.data.value.decode("utf-8"))
body = bm.data.value.decode("utf-8")

if message_bodies:
DruncMessage.objects.bulk_create(
[
DruncMessage(timestamp=t, message=msg)
for t, msg in zip(message_timestamps, message_bodies)
]
message_records.append(
DruncMessage(topic=message.topic, timestamp=time, message=body)
)

if message_records:
DruncMessage.objects.bulk_create(message_records)

# Remove expired messages from the database.
message_timeout = timedelta(seconds=settings.MESSAGE_EXPIRE_SECS)
expire_time = datetime.now(tz=UTC) - message_timeout
Expand Down
27 changes: 27 additions & 0 deletions main/management/commands/store_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Django management command to populate Kafka messages into application database."""

from argparse import ArgumentParser
from datetime import UTC, datetime
from typing import Any

from django.core.management.base import BaseCommand

from ...models import DruncMessage


class Command(BaseCommand):
"""Store Kafka messages in the database."""

help = __doc__

def add_arguments(self, parser: ArgumentParser) -> None:
"""Add commandline options."""
parser.add_argument("-t", "--topic", default="NO_TOPIC")
parser.add_argument("-m", "--message", default="NO_MESSAGE")

def handle(self, *args: Any, **kwargs: Any) -> None: # type: ignore[misc]
"""Command business logic."""
topic = kwargs["topic"]
message = kwargs["message"]
timestamp = datetime.now(tz=UTC)
DruncMessage.objects.create(topic=topic, timestamp=timestamp, message=message)
19 changes: 19 additions & 0 deletions main/migrations/0004_druncmessage_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 5.1.2 on 2024-10-22 16:26

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('main', '0003_druncmessage'),
]

operations = [
migrations.AddField(
model_name='druncmessage',
name='topic',
field=models.CharField(default='', max_length=255),
preserve_default=False,
),
]
1 change: 1 addition & 0 deletions main/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ class Meta:
class DruncMessage(models.Model):
"""Model for drunc broadcast messages."""

topic = models.CharField(max_length=255)
timestamp = models.DateTimeField()
message = models.TextField()
18 changes: 10 additions & 8 deletions process_manager/views/partials.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""View functions for partials."""

import django_tables2
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.http import HttpRequest, HttpResponse
from django.shortcuts import render
Expand Down Expand Up @@ -95,16 +96,17 @@ def process_table(request: HttpRequest) -> HttpResponse:

@login_required
def messages(request: HttpRequest) -> HttpResponse:
"""Renders Kafka messages from the database."""
messages = []

# Filter messages based on search parameter.
"""Search and render Kafka messages from the database."""
search = request.GET.get("search", "")
for msg in DruncMessage.objects.filter(message__icontains=search):
# Time is stored as UTC. localtime(t) converts this to our configured timezone.
timestamp = localtime(msg.timestamp).strftime("%Y-%m-%d %H:%M:%S")
records = DruncMessage.objects.filter(
topic__regex=settings.KAFKA_TOPIC_REGEX["PROCMAN"], message__icontains=search
)

messages.append(f"{timestamp}: {msg.message}")
# Time is stored as UTC. localtime(t) converts this to our configured timezone.
messages = [
f"{localtime(record.timestamp).strftime('%Y-%m-%d %H:%M:%S')}: {record.message}"
for record in records
]

return render(
request=request,
Expand Down
24 changes: 20 additions & 4 deletions tests/process_manager/views/test_partial_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,21 @@ class TestMessagesView(LoginRequiredTest):
"""Test the process_manager.views.messages view function."""

endpoint = reverse("process_manager:messages")
topic = "control.test.process_manager"

@pytest.fixture(autouse=True)
def kafka_topic_regex(self, settings):
"""Set Kafka topic regex patterns for tests."""
settings.KAFKA_TOPIC_REGEX["PROCMAN"] = "^control\..+\.process_manager$"

def test_get(self, auth_client):
"""Tests basic calls of view method."""
t1 = datetime.now(tz=UTC)
t2 = t1 + timedelta(minutes=10)
DruncMessage.objects.bulk_create(
[
DruncMessage(timestamp=t1, message="message 0"),
DruncMessage(timestamp=t2, message="message 1"),
DruncMessage(topic=self.topic, timestamp=t1, message="message 0"),
DruncMessage(topic=self.topic, timestamp=t2, message="message 1"),
]
)

Expand All @@ -89,8 +95,8 @@ def test_get_with_search(self, auth_client):
his_msg = "HIs meSsaGe"
DruncMessage.objects.bulk_create(
[
DruncMessage(timestamp=t, message=her_msg),
DruncMessage(timestamp=t, message=his_msg),
DruncMessage(topic=self.topic, timestamp=t, message=her_msg),
DruncMessage(topic=self.topic, timestamp=t, message=his_msg),
]
)

Expand All @@ -112,6 +118,16 @@ def test_get_with_search(self, auth_client):
assert response.status_code == HTTPStatus.OK
assert len(response.context["messages"]) == 0

def test_get_wrong_topic(self, auth_client):
"""Test view method plays nice with other Kafka topics."""
DruncMessage.objects.create(
topic="the.wrong.topic", timestamp=datetime.now(tz=UTC), message="message"
)

response = auth_client.get(self.endpoint)
assert response.status_code == HTTPStatus.OK
assert len(response.context["messages"]) == 0


process_1 = {
"uuid": "1",
Expand Down

0 comments on commit 99163d2

Please sign in to comment.