Skip to content

Commit

Permalink
modified al drop duplicates by candid in the pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
AlxEnashi committed Jan 12, 2024
1 parent 6eba432 commit 9cf1f5b
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 6 deletions.
3 changes: 2 additions & 1 deletion correction_step/correction/_step/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def produce_scribe(self, detections: list[dict]):
if not detection.pop("new"):
continue
candid = detection.pop("candid")
oid = detection.get("oid")
is_forced = detection.pop("forced")
set_on_insert = not detection.get("has_stamp", False)
extra_fields = detection["extra_fields"].copy()
Expand All @@ -150,7 +151,7 @@ def produce_scribe(self, detections: list[dict]):
scribe_data = {
"collection": "forced_photometry" if is_forced else "detection",
"type": "update",
"criteria": {"_id": candid},
"criteria": {"candid": candid, "oid": oid},
"data": detection,
"options": {"upsert": True, "set_on_insert": set_on_insert},
}
Expand Down
4 changes: 2 additions & 2 deletions correction_step/correction/core/corrector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ def __init__(self, detections: list[dict]):
"""
self.logger = logging.getLogger(f"alerce.{self.__class__.__name__}")
self._detections = pd.DataFrame.from_records(detections, exclude={"extra_fields"})
self._detections = self._detections.drop_duplicates("candid").set_index("candid")
self._detections = self._detections.drop_duplicates(["candid", "oid"]).set_index("candid")

self.__extras = {alert["candid"]: alert["extra_fields"] for alert in detections}
extras = pd.DataFrame.from_dict(self.__extras, orient="index", columns=self._EXTRA_FIELDS)
extras = extras.reset_index(names=["candid"]).drop_duplicates("candid").set_index("candid")
extras = extras.reset_index(names=["candid"]).drop_duplicates(["candid", "oid"]).set_index("candid")

self._detections = self._detections.join(extras)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def create_detections_dto(messages: List[dict]) -> pd.DataFrame:
pd.DataFrame.from_records(msg["detections"]) for msg in messages
]
detections = pd.concat(detections)
detections.drop_duplicates("candid", inplace=True)
detections.drop_duplicates(["candid", "oid"], inplace=True)
detections = detections.set_index("oid")
detections["extra_fields"] = parse_extra_fields(detections)

Expand Down
2 changes: 1 addition & 1 deletion lightcurve-step/lightcurve_step/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def execute(self, messages: dict) -> dict:
# so this will drop alerts coming from the database if they are also in the stream
detections = detections.sort_values(
["has_stamp", "new"], ascending=[False, False]
).drop_duplicates("candid", keep="first")
).drop_duplicates(["candid", "oid"], keep="first")

non_detections = non_detections.drop_duplicates(["oid", "fid", "mjd"])
self.logger.debug(
Expand Down
2 changes: 1 addition & 1 deletion magstats_step/magstats_step/core/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, detections: List[dict]):
except KeyError: # extra_fields is not present
self._detections = pd.DataFrame.from_records(detections)
self._detections = self._detections.drop_duplicates(
"candid"
["candid", "oid"]
).set_index("candid")
# Select only non-forced detections
self._detections = self._detections[~self._detections["forced"]]
Expand Down

0 comments on commit 9cf1f5b

Please sign in to comment.