Skip to content

Commit

Permalink
Merge branch 'main' into refactor/lc-classifier-group-by-oid
Browse files Browse the repository at this point in the history
  • Loading branch information
dirodriguezm authored Jan 3, 2024
2 parents b861c95 + 77bcc18 commit 12aa92e
Show file tree
Hide file tree
Showing 60 changed files with 460 additions and 384 deletions.
2 changes: 1 addition & 1 deletion alert_archiving_step/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "alert-archiving-step"
version = "23.12.26a5"
version = "23.12.26a8"
description = "Group alerts and upload to S3 by date"
authors = ["ALeRCE"]
readme = "README.md"
Expand Down
4 changes: 2 additions & 2 deletions charts/alert_archiving_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: A Helm chart for Kubernetes
name: alert-archive-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/correction_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: Correction step chart
name: correction-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/early_classification_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: A Helm chart for Kubernetes
name: early-classifier
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/feature_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: Step for feature calculation
name: feature-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/lc_classification_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: Lightcurve classifier step
name: lc-classifier-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/lightcurve-step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: Full lightcurve provider step
name: lightcurve-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/magstats_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: A Helm chart for Kubernetes
name: magstats-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/metadata_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: A Helm chart for Kubernetes
name: metadata-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/prv_candidates_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: Previous candidates processor step
name: prv-candidates
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/s3_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: A Helm chart for Kubernetes
name: s3-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/scribe/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 23.12.26-5
version: 23.12.26-8

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: 23.12.26a5
appVersion: 23.12.26a8
4 changes: 2 additions & 2 deletions charts/sorting_hat_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: Sorting Hat deployment chart
name: sorting-hat
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/watchlist_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: A Helm chart for Kubernetes
name: watchlist-step
type: application
version: 23.12.26-5
version: 23.12.26-8
4 changes: 2 additions & 2 deletions charts/xmatch_step/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: v2
appVersion: 23.12.26a5
appVersion: 23.12.26a8
description: A Helm chart for Kubernetes
name: xmatch-step
type: application
version: 23.12.26-5
version: 23.12.26-8
2 changes: 1 addition & 1 deletion correction_step/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "correction"
version = "23.12.26a5"
version = "23.12.26a8"
description = "Correction library for ALeRCE pipeline."
authors = ["ALeRCE Broker <[email protected]>"]
readme = "README.md"
Expand Down
8 changes: 7 additions & 1 deletion early_classification_step/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "early_classification_step"
version = "23.12.26a5"
version = "23.12.26a8"
description = "Early Classification Step"
authors = []
readme = "README.md"
Expand Down Expand Up @@ -59,6 +59,9 @@ line-length = 88









Expand Down Expand Up @@ -105,6 +108,9 @@ pytest-docker = "^1.0.1"









Expand Down
8 changes: 4 additions & 4 deletions feature_step/features/core/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def _discard_detections(self):
self.detections.not_enough(self.MIN_DETS_FID, by_fid=True)

def _create_xmatches(self, xmatches: list[dict]) -> pd.DataFrame:
"""Ensures cross-matches contain `aid` in detections and selects required columns."""
"""Ensures cross-matches contain `oid` in detections and selects required columns."""

def expand_catalogues(xm):
return {
Expand All @@ -188,12 +188,12 @@ def get_required_columns():
]

xmatches = [
{"aid": xm["aid"]} | expand_catalogues(xm) for xm in xmatches
{"oid": xm["oid"]} | expand_catalogues(xm) for xm in xmatches
]
xmatches = pd.DataFrame(
xmatches, columns=["aid"] + get_required_columns()
xmatches, columns=["oid"] + get_required_columns()
)
xmatches = xmatches.rename(columns={"aid": "id"}).drop_duplicates(
xmatches = xmatches.rename(columns={"oid": "id"}).drop_duplicates(
subset=["id"]
)
return xmatches[xmatches["id"].isin(self.detections.ids())].set_index(
Expand Down
2 changes: 1 addition & 1 deletion feature_step/features/core/elasticc.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def generate_features(self):

def _create_lightcurve_dataframe(self, detections: List[dict]):
self._rename_detections_columns(detections)
return pd.DataFrame.from_records(detections).set_index("aid")
return pd.DataFrame.from_records(detections).set_index("oid")

def _rename_detections_columns(self, detections: List[dict]):
for det in detections:
Expand Down
4 changes: 2 additions & 2 deletions feature_step/features/core/handlers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class BaseHandler(abc.ABC):
This provides the general base methods for extracting statistics and applying functions to the alerts
considered.
Alerts require at the very least fields `sid`, `fid`, `mjd` and `aid` (the latter will be renamed to `id`).
Alerts require at the very least fields `sid`, `fid`, `mjd` and `oid` (the latter will be renamed to `id`).
Usually, a subclass will define additional fields that are required, but missing one of the aforementioned fields
will result in an initialization error, unless the initialization methods are adapted as well. These always
required fields are defined the class attribute `COLUMNS`.
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(
)
except KeyError: # extra_fields is not present
self._alerts = pd.DataFrame.from_records(alerts)
self._alerts = self._alerts.rename(columns={"aid": "id"})
self._alerts = self._alerts.rename(columns={"oid": "id"})
if self._alerts.size == 0:
index = (
{self.INDEX}
Expand Down
2 changes: 1 addition & 1 deletion feature_step/features/core/handlers/detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class DetectionsHandler(BaseHandler):
Indexed by `candid`.
Criteria for uniqueness is based on `id` (`aid` or `oid`, depending on use of `legacy`), `fid` and `mjd`.
Criteria for uniqueness is based on `id` (`oid`, depending on use of `legacy`), `fid` and `mjd`.
Required fields are: `id`, `sid`, `fid`, `mjd`, `mag` and `e_mag`.
Expand Down
2 changes: 1 addition & 1 deletion feature_step/features/core/handlers/non_detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class NonDetectionsHandler(BaseHandler):
"""Class for handling non-detections.
Criteria for uniqueness is based on `id` (`aid` or `oid`, depending on use of `legacy`), `fid` and `mjd`.
Criteria for uniqueness is based on `id` (or `oid`, depending on use of `legacy`), `fid` and `mjd`.
Required fields are `id`, `sid`, `fid`, `mjd` and `diffmaglim`.
Expand Down
2 changes: 1 addition & 1 deletion feature_step/features/core/utils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def wrapper(self, *args, **kwargs):


def fill_in_every_fid(counters: str = None):
"""Decorated method must produce a multi-indexed data frame with two levels, `aid` and `fid` (in that order)"""
"""Decorated method must produce a multi-indexed data frame with two levels, `oid` and `fid` (in that order)"""

def decorator(method):
@functools.wraps(method)
Expand Down
4 changes: 2 additions & 2 deletions feature_step/features/core/utils/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def fill_index(
raise ValueError(
f"DataFrame has MultiIndex with {df.index.names}. Requested filling: {check}"
)
aids = df.index.get_level_values("id").unique()
values = [aids] + [kwargs[k] for k in df.index.names if k != "id"]
oids = df.index.get_level_values("id").unique()
values = [oids] + [kwargs[k] for k in df.index.names if k != "id"]

df = df.reindex(pd.MultiIndex.from_product(values, names=df.index.names))
if counters is None:
Expand Down
2 changes: 0 additions & 2 deletions feature_step/features/core/ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,9 @@ def _legacy(cls, detections, non_detections, xmatches, metadata):
non_detections["fid"].replace(
{v: k for k, v in cls.BANDS_MAPPING.items()}, inplace=True
)
non_detections = non_detections.rename(columns={"oid": "aid"})

if isinstance(xmatches, pd.DataFrame):
xmatches = xmatches.reset_index()
xmatches = xmatches.rename(columns={"oid": "aid"})

return detections, non_detections, xmatches

Expand Down
36 changes: 17 additions & 19 deletions feature_step/features/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas as pd
from apf.core import get_class
from apf.core.step import GenericStep
from apf.consumers import KafkaConsumer

from features.core.elasticc import ELAsTiCCFeatureExtractor
from features.core.ztf import ZTFFeatureExtractor
Expand Down Expand Up @@ -40,12 +41,8 @@ def __init__(
self.config["SCRIBE_PRODUCER_CONFIG"]
)

def produce_to_scribe(
self, messages_aid_oid: dict, features: pd.DataFrame
):
commands = parse_scribe_payload(
messages_aid_oid, features, self.features_extractor
)
def produce_to_scribe(self, features: pd.DataFrame):
commands = parse_scribe_payload(features, self.features_extractor)

count = 0
flush = False
Expand All @@ -58,35 +55,29 @@ def produce_to_scribe(
)

def pre_produce(self, result: Iterable[Dict[str, Any]] | Dict[str, Any]):
self.set_producer_key_field("aid")
self.set_producer_key_field("oid")
return result

def execute(self, messages):
detections, non_detections, xmatch, messages_aid_oid = [], [], [], {}
detections, non_detections, xmatch = [], [], []
candids = {}

for message in messages:
if not message["aid"] in candids:
candids[message["aid"]] = []
candids[message["aid"]].extend(message["candid"])
if not message["oid"] in candids:
candids[message["oid"]] = []
candids[message["oid"]].extend(message["candid"])
detections.extend(message.get("detections", []))
non_detections.extend(message.get("non_detections", []))
xmatch.append(
{"aid": message["aid"], **(message.get("xmatches", {}) or {})}
{"oid": message["oid"], **(message.get("xmatches", {}) or {})}
)
oids_of_aid = []
oids_of_aid = [
message_detection["oid"]
for message_detection in message["detections"]
]
messages_aid_oid[message["aid"]] = list(set(oids_of_aid))

features_extractor = self.features_extractor(
detections, non_detections, xmatch
)
features = features_extractor.generate_features()
if len(features) > 0:
self.produce_to_scribe(messages_aid_oid, features)
self.produce_to_scribe(features)

output = parse_output(
features, messages, self.features_extractor, candids
Expand All @@ -96,3 +87,10 @@ def execute(self, messages):
def post_execute(self, result):
self.metrics["sid"] = get_sid(result)
return result

def tear_down(self):
if isinstance(self.consumer, KafkaConsumer):
self.consumer.teardown()
else:
self.consumer.__del__()
self.producer.__del__()
Loading

0 comments on commit 12aa92e

Please sign in to comment.