Skip to content

Commit

Permalink
Merge branch 'u/tjr/pittgoogle' of github.com:LSSTDESC/tom_desc into …
Browse files Browse the repository at this point in the history
…u/tjr/pittgoogle
  • Loading branch information
AlexGKim committed Oct 28, 2021
2 parents 30a499c + b834fa7 commit 1f947c1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 59 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ FINK_USERNAME
FINK_GROUP_ID
FINK_SERVER
FINK_TOPIC
GOOGLE_CLOUD_PROJECT
GOOGLE_APPLICATION_CREDENTIALS
```

## Local Database Server
Expand Down
68 changes: 33 additions & 35 deletions stream/management/commands/pittgoogle.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""Listen to a Pitt-Google Pub/Sub stream and save alerts to the database."""
from importlib import import_module
import logging

from django.conf import settings
from django.contrib.auth.models import Group
from django.core.management.base import BaseCommand
from django.db import transaction
# from django.db import transaction
from tom_alerts.alerts import get_service_class
from tom_targets.models import Target

# from stream.models import Alert, Topic

Expand All @@ -16,38 +18,13 @@
logger = logging.getLogger(__name__)

PITTGOOGLE_CONSUMER_CONFIGURATION = settings.PITTGOOGLE_CONSUMER_CONFIGURATION
PITTGOOGLE_PARSERS = settings.PITTGOOGLE_PARSERS


def get_parser_classes(topic):
"""."""
parser_classes = []

try:
parsers = PITTGOOGLE_PARSERS[topic]
except KeyError:
logger.log(msg=f'PITTGOOGLE_PARSERS not found for topic: {topic}.', level=logging.WARNING)
return parser_classes

for parser in parsers:
mod_name, class_name = parser.rsplit('.', 1)
try:
mod = import_module(mod_name)
clazz = getattr(mod, class_name)
except (ImportError, AttributeError):
raise ImportError(f'Unable to import parser {parser}')
parser_classes.append(clazz)

return parser_classes


class Command(BaseCommand):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.consumer = Consumer(PITTGOOGLE_CONSUMER_CONFIGURATION['subscription_name'])

def handle(self, *args, **options):
"""Pull and process alerts from a Pitt-Google Pub/Sub stream.
Expand All @@ -59,15 +36,36 @@ def handle(self, *args, **options):
- Block until the processing is complete and the connection has been closed.
"""
self.consumer.stream_alerts(
# callback=self.parse_and_save,
user_filter=self.parse_and_save,
parameters=PITTGOOGLE_CONSUMER_CONFIGURATION, # stopping conditions
kwargs = {
# "desc_group": Group.objects.get(name='DESC'),
# "broker": get_service_class('Pitt-Google StreamPython')(),
**PITTGOOGLE_CONSUMER_CONFIGURATION, # stopping conditions
}

consumer = Consumer(PITTGOOGLE_CONSUMER_CONFIGURATION['subscription_name'])
_ = consumer.stream_alerts(
user_callback=self.parse_and_save,
**kwargs
)

@staticmethod
def parse_and_save(alert, parameters):
def parse_and_save(alert_dict, **kwargs):
"""Parse the alert and save to the database."""
mylogger = logging.getLogger(__name__)
mylogger.info("Success! We have pulled an alert.")
return alert

# target = kwargs["broker"].to_target(alert_dict)
target, created = Target.objects.get_or_create(
name=alert_dict['objectId'],
type='SIDEREAL',
ra=alert_dict['ra'],
dec=alert_dict['dec'],
)

if created:
extra_fields = [item["name"] for item in settings.EXTRA_FIELDS]
extras = {k: v for k, v in alert_dict.items() if k in extra_fields}

target.save(extras=extras)
# assign_perm('tom_targets.view_target', kwargs["desc_group"], target)

success = True
return success
17 changes: 0 additions & 17 deletions stream/parsers/pittgoogle_alert_parser.py

This file was deleted.

15 changes: 8 additions & 7 deletions tom_desc/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,11 @@
GOOGLE_CLOUD_PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
PITTGOOGLE_CONSUMER_CONFIGURATION = {
# pull a small number of alerts, for testing
# pull a small number of alerts from the heartbeat stream for testing
"subscription_name": "ztf-loop",
"max_results": 10,
"timeout": 30,
"max_backlog": 10,
"save_metadata": "yes",
}

PITTGOOGLE_PARSERS = {
# TODO
}

# Caching
Expand Down Expand Up @@ -361,7 +356,13 @@
# {'name': 'eligible', 'type': 'boolean'},
# {'name': 'dicovery_date', 'type': 'datetime'}
# ]
EXTRA_FIELDS = []
EXTRA_FIELDS = [
{"name": "classifierNames", "type": "string"},
# proposing to encode "no classification reported" as -1
{"name": "Bogus", "type": "number", "default": -1},
{"name": "Real", "type": "number", "default": -1},
{"name": "Ia", "type": "number", "default": -1},
]

# Authentication strategy can either be LOCKED (required login for all views)
# or READ_ONLY (read only access to views)
Expand Down

0 comments on commit 1f947c1

Please sign in to comment.