Skip to content

Useful scripts

Raphaël Odini edited this page Jan 5, 2025 · 21 revisions

Prices

Count

Price.objects.filter(owner=username).count()

Prices per source

from collections import Counter
from datetime import datetime, date

Counter(Price.objects.values_list("source", flat=True))

// today
Counter(Price.objects.filter(created__date=date.today()).values_list("source", flat=True))

Delete duplicate prices

  • price duplicates from mobile (Smoothie)
  • ignore duplicates receipt/GDPR
  • also deletes proofs (if the proof had only 1 price, the duplicate that will be deleted)
from open_prices.proofs import constants as proof_constants

DUPLICATE_FIELDS = ["product_code", "location_id", "date", "price", "currency"]
for user in User.objects.all():
    user_prices = list()

    for price in Price.objects.select_related("proof").filter(owner=user.user_id, source__contains="Smoothie", proof__type__in=proof_constants.TYPE_SINGLE_SHOP_LIST).order_by("created"):
        res = next((p for p in user_prices if all(getattr(p, duplicate_field) == getattr(price, duplicate_field) for duplicate_field in DUPLICATE_FIELDS)), None)
        if res:
            if price.proof.prices.count() == 1:
                print("deleting proof", price.proof.id)
                price.proof.delete()
            print(user.user_id, "/ duplicate", price.id)
            price.delete()
        else:
            user_prices.append(price)

Fix proofs without locations

for proof in Proof.objects.exclude(type="GDPR_REQUEST").filter(location=None):
   if proof.prices.count():
      proof_prices_location_list = list(proof.prices.values_list("location", flat=True).distinct())
      if len(proof_prices_location_list) == 1:
         location = proof.prices.first().location
         proof.location_osm_id = location.osm_id
         proof.location_osm_type = location.osm_type
         proof.save()
      else:
         print("different locations", proof, proof.prices.count(), proof_prices_location_list)

// run afterwards
for location in Location.objects.all():
    location.update_price_count()

Fix proofs without date

for proof in Proof.objects.exclude(type="GDPR_REQUEST").filter(date=None):
   if proof.prices.count():
      proof_prices_date_list = list(proof.prices.values_list("date", flat=True).distinct())
      if len(proof_prices_date_list) == 1:
         proof.date = proof_prices_date_list[0]
         proof.save()
      else:
         print("different dates", proof, proof.prices.count(), proof_prices_date_list)

Fix proofs without currency

for proof in Proof.objects.exclude(type="GDPR_REQUEST").filter(currency=None):
   if proof.prices.count():
      proof_prices_currency_list = list(proof.prices.values_list("currency", flat=True).distinct())
      if len(proof_prices_currency_list) == 1:
         proof.currency = proof_prices_currency_list[0]
         proof.save()
      else:
         print("different currencies", proof, proof.prices.count(), proof_prices_currency_list)

Products

Count

// product in OFF DB
Price.objects.filter(owner=u).values_list("product_id", flat=True).distinct().count()
// product code (product in OFF DB + product not found)
Price.objects.filter(owner=u).values_list("product_code", flat=True).distinct().count()
// product category_tag
Price.objects.filter(owner=u).values_list("category_tag", flat=True).distinct().count()

Prices per category

from collections import Counter
Price.objects.filter(type="CATEGORY").count()
Counter(Price.objects.filter(type="CATEGORY").values_list("category_tag", flat=True))

Top products without source

for product in Product.objects.filter(source=None).exclude(code__startswith=0, code__startswith=2).order_by("-price_count")[:50]:
    print(product.code, product.price_count)

Detect wrong barcodes

from openfoodfacts.barcode import has_valid_check_digit

for product in Product.objects.prefetch_related("prices").filter(prices__isnull=False):
    if not has_valid_check_digit(product.code):
        print(product.id, product.code)

Locations

Count

Price.objects.filter(owner=username).values_list("location_id", flat=True).distinct().count()

Fix a location

Useful to update a location if the user did a mistake (goal: replace this with allowing a user to edit it)

  • first get the ids of the old & the new locations
// Django
new_location, created = Location.objects.get_or_create(osm_id=<location_osm_id>, osm_type=<location_osm_type>)
proof = Proof.objects.get(id=<proof_id>)
old_location = proof.location
proof.location_id = new_location.id
proof.location_osm_id = new_location.osm_id
proof.location_osm_type = new_location.osm_type
proof.save()
for price in proof.objects.all():
    price.location_id = new_location.id
    price.location_osm_id = new_location.osm_id
    price.location_osm_type = new_location.osm_type
    price.save()
old_location.prices = old_location.prices.count()
old_location.save()
new_location.prices = new_location.prices.count()
new_location.save()

// SQL
// example : old location is 593, new location is 46
update prices set location_id = 46, location_osm_id = 5406986218 where owner = 'user' and location_id = 593 and date = '2024-06-27';
update proofs set location_id = 46, location_osm_id = 5406986218 where owner = 'user' and location_id = 593 and date = '2024-06-27';
// also update the location price_count
update locations set price_count = ? where id = 593;
update locations set price_count = ? where id = 46;

Update all locations

For instance if you add a new field, and want to update all the existing locations. Example: following this PR

// Django
from open_prices.locations.models import Location
// TODO


// FastAPI (old)
import time
from app.crud import get_locations, update_location
from app.db import session
from app.models import Location
from app.utils import fetch_location_openstreetmap_details

Location.__table__.columns.keys()

db = session()
db_locations = get_locations(db=db)

for index, db_location in enumerate(db_locations):
    if not db_location[0].osm_tag_key:
        location_openstreetmap_details = fetch_location_openstreetmap_details(location=db_location[0])
        if location_openstreetmap_details:
            update_location(db, location=db_location[0], update_dict=location_openstreetmap_details)
            print(index, "updated", db_location[0].id)
            if index % 10 == 0:
                time.sleep(1)

Proofs

Count

Proof.objects.filter(owner=username).count()

JSONL data dumps

Download and import a data dump

// Django
from openfoodfacts import Flavor
from open_prices.common import openfoodfacts as common_openfoodfacts

common_openfoodfacts.import_product_db(flavor=Flavor.opf)

// FastAPI (old)
from openfoodfacts import Flavor
from app.db import session
from app.tasks import import_product_db

db = session()
import_product_db(db=db, flavor=Flavor.opf)

Check if a code is inside a data dump

The file is usually located in /home/<user>/.cache/openfoodfacts/datasets/

gzip -dk /home/<user>/.cache/openfoodfacts/datasets/openproductsfacts-products.jsonl.gz
grep -r 5702017582931 /home/<user>/.cache/openfoodfacts/datasets/openproductsfacts-products.jsonl

Scheduled tasks

// get latest location OSM fetch
Task.objects.filter(func="open_prices.locations.tasks.fetch_and_save_data_from_openstreetmap").first().__dict__