Skip to content

Commit

Permalink
feat: listen to updates from Product Opener using Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael0202 committed Dec 11, 2024
1 parent 797c1bf commit d895839
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 60 deletions.
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@ TRITON_URI=host.docker.internal:5504
# During local development, to enable ML predictions, set this to True and make sure
# you have Triton running on port 5504.
ENABLE_ML_PREDICTIONS=False

# If you want to enable listening for Redis updates, set this to True
ENABLE_REDIS_UPDATES=False
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ status:

log:
@echo "🥫 Reading logs (docker compose) …"
${DOCKER_COMPOSE} logs -f api
${DOCKER_COMPOSE} logs -f api update-listener


#------------#
Expand Down
11 changes: 11 additions & 0 deletions config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,14 @@

# https://world.openfoodfacts.org/contributor/openfoodfacts-contributors
ANONYMOUS_USER_ID = "openfoodfacts-contributors"


# Redis (for product updates)
# ------------------------------------------------------------------------------
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = os.getenv("REDIS_PORT", 6379)
REDIS_STREAM_NAME = os.getenv("REDIS_STREAM_NAME", "product_updates_off")
REDIS_LATEST_ID_KEY = os.getenv(
"REDIS_LATEST_ID_KEY", "open-prices:product_updates:latest_id"
)
ENABLE_REDIS_UPDATES = os.getenv("ENABLE_REDIS_UPDATES") == "True"
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ x-api-common: &api-common
- GOOGLE_GEMINI_API_KEY
- TRITON_URI
- ENABLE_ML_PREDICTIONS
- ENABLE_REDIS_UPDATES
- REDIS_HOST
- REDIS_PORT
- REDIS_STREAM_NAME
- REDIS_LATEST_ID_KEY
networks:
- default

Expand All @@ -42,6 +47,12 @@ services:
depends_on:
- postgres

update-listener:
<<: *api-common
command: ["python", "manage.py", "run_update_listener"]
depends_on:
- postgres

postgres:
restart: $RESTART_POLICY
image: postgres:16.1-alpine
Expand Down
17 changes: 17 additions & 0 deletions docker/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,20 @@ services:
scheduler:
<<: *api-base
command: ["python", "manage.py", "qcluster"]

update-listener:
<<: *api-base
command: ["python", "manage.py", "run_update_listener"]

redis:
restart: $RESTART_POLICY
image: redis:7.0.5-alpine
volumes:
- redis-data:/data
environment:
REDIS_ARGS: --save 60 1000 --appendonly yes
mem_limit: 4g

volumes:
redis-data:
name: ${COMPOSE_PROJECT_NAME:-open_prices}_redis-data
4 changes: 2 additions & 2 deletions open_prices/common/openfoodfacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ def get_product(code: str, flavor: Flavor = Flavor.off) -> JSONType | None:
return client.product.get(code)


def get_product_dict(product, flavor=Flavor.off) -> JSONType | None:
def get_product_dict(code: str, flavor=Flavor.off) -> JSONType | None:
product_dict = dict()
try:
response = get_product(code=product.code, flavor=flavor)
response = get_product(code=code, flavor=flavor)
if response:
product_dict = build_product_dict(response, flavor)
return product_dict
Expand Down
59 changes: 59 additions & 0 deletions open_prices/products/management/commands/run_update_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import time

from django.conf import settings
from django.core.management.base import BaseCommand
from openfoodfacts import Flavor
from openfoodfacts.redis import RedisUpdate
from openfoodfacts.redis import UpdateListener as BaseUpdateListener
from openfoodfacts.redis import get_redis_client
from openfoodfacts.utils import get_logger

from open_prices.products.tasks import process_delete, process_update

logger = get_logger()


class UpdateListener(BaseUpdateListener):
def process_redis_update(self, redis_update: RedisUpdate):
logger.debug("New update: %s", redis_update)

if redis_update.product_type == "food":
flavor = Flavor.off
elif redis_update.product_type == "beauty":
redis_update.obf
elif redis_update.product_type == "petfood":
flavor = Flavor.opff
elif redis_update.product_type == "product":
flavor = Flavor.opf
else:
raise ValueError(
f"no Flavor matched for product_type {redis_update.product_type}"
)
if redis_update.action == "deleted":
logger.info("Product %s has been deleted", redis_update.code)
process_delete(redis_update.code, flavor)
elif redis_update.action == "updated":
process_update(redis_update.code, flavor)


class Command(BaseCommand):
help = """Run a daemon that listens to product updates from Open Food Facts from a Redis stream."""

def handle(self, *args, **options) -> None: # type: ignore
self.stdout.write("Launching the update listener...")

if settings.ENABLE_REDIS_UPDATES is False:
self.stdout.write("Redis updates are disabled, waiting forever...")

while True:
time.sleep(60)

redis_client = get_redis_client(
host=settings.REDIS_HOST, port=settings.REDIS_PORT
)
listener = UpdateListener(
redis_client=redis_client,
redis_stream_name=settings.REDIS_STREAM_NAME,
redis_latest_id_key=settings.REDIS_LATEST_ID_KEY,
)
listener.run()
39 changes: 38 additions & 1 deletion open_prices/products/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,47 @@
import logging

from openfoodfacts import Flavor

from open_prices.common import openfoodfacts as common_openfoodfacts
from open_prices.products.models import Product

logger = logging.getLogger(__name__)


def fetch_and_save_data_from_openfoodfacts(product: Product):
product_openfoodfacts_details = common_openfoodfacts.get_product_dict(product)
product_openfoodfacts_details = common_openfoodfacts.get_product_dict(product.code)
if product_openfoodfacts_details:
for key, value in product_openfoodfacts_details.items():
setattr(product, key, value)
product.save()


def process_update(code: str, flavor: Flavor):
product_openfoodfacts_details = common_openfoodfacts.get_product_dict(code, flavor)

if product_openfoodfacts_details:
try:
product = Product.objects.get(code=code)
except Product.DoesNotExist:
logger.info(
"Product %s does not exist in the database, cannot update product table",
code,
)
return

for key, value in product_openfoodfacts_details.items():
setattr(product, key, value)
product.save()


def process_delete(code: str, flavor: Flavor):
try:
product = Product.objects.get(code=code)
except Product.DoesNotExist:
logger.info(
"Product %s does not exist in the database, cannot delete product table",
code,
)
return

product.delete()
Loading

0 comments on commit d895839

Please sign in to comment.