Skip to content

Commit

Permalink
feat: add UpdateListener class
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael0202 committed Dec 11, 2024
1 parent 7e5a697 commit 7ce0f79
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions openfoodfacts/redis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from abc import ABC, abstractmethod
from typing import Any, Iterator, Optional, Union, cast

from pydantic import BaseModel, Json
Expand Down Expand Up @@ -180,3 +181,81 @@ def get_new_updates_multistream(
product_type=item["product_type"],
diffs=item.get("diffs"),
)


class UpdateListener(ABC):
"""A class representing a daemon that listens to updates from a Redis
stream and processes them.
The class is meant to be subclassed to implement the processing logic.
Subclasses must implement the `process_redis_update` method.
"""

def __init__(
self, redis_client: Redis, redis_stream_name: str, redis_latest_id_key: str
):
self.redis_client = redis_client
self.redis_stream_name = redis_stream_name
self.redis_latest_id_key = redis_latest_id_key

def run(self):
"""Run the update import daemon.
This daemon listens to the Redis stream containing information about
product updates, and triggers
"""
logger.info("Starting update listener daemon")

logger.info("Redis client: %s", self.redis_client)
logger.info("Pinging client...")
self.redis_client.ping()
logger.info("Connection successful")

latest_id = self.redis_client.get(self.redis_latest_id_key)

if latest_id:
logger.info(
"Latest ID processed: %s (datetime: %s)",
latest_id,
datetime.datetime.fromtimestamp(int(latest_id.split("-")[0]) / 1000),
)
else:
logger.info("No latest ID found")

for redis_update in get_new_updates(
self.redis_client, stream_name=self.redis_stream_name, min_id=latest_id
):
try:
self.process_redis_update(redis_update)
except Exception as e:
logger.exception(e)
self.redis_client.set(self.redis_latest_id_key, redis_update.id)

def process_updates_since(
self, since: datetime.datetime, to: Optional[datetime.datetime] = None
):
"""Process all the updates since the given timestamp.
:param client: the Redis client
:param since: the timestamp to start from
:param to: the timestamp to stop, defaults to None (process all
updates)
"""
logger.info("Redis client: %s", self.redis_client)
logger.info("Pinging client...")
self.redis_client.ping()

processed = 0
for product_update in get_processed_since(
self.redis_client, stream_name=self.redis_stream_name, min_id=since
):
if to is not None and product_update.timestamp > to:
break
self.process_redis_update(product_update)
processed += 1

logger.info("Processed %d updates", processed)

@abstractmethod
def process_redis_update(self, redis_update: RedisUpdate):
pass

0 comments on commit 7ce0f79

Please sign in to comment.