Skip to content

Commit

Permalink
Improvements to XMatch (#426)
Browse files Browse the repository at this point in the history
* Allow to read XMatchPlanner configuration from a dictionary

* Use rich handler for logger

* Allow _temp_count=0

* Allow to override the logger used by the XMatchPlanner

* Linting indentation fix

* Add added_by_phase column to relational model

* Revert Felipe's use of SQL files for phase 1

* Phase 1: distinct only on model_pk

This should prevent duplicates when a target in the intermediate
table has been cross-matched to multiple catalogids.

* Phase 2: allow to cross-match with temporary and real catalog

* Phase 3: require using only best matches

This is an imporant bug fix. Before we were missing targets
that where associated with a catalogid in phase 2 but only as
a secondary match.

* Revert Felipe's use of SQL files for phase 3

* Consolidate line

* Revert using rich logger

* Delete phase1_range option from XMatchPlanner

* Deepcopy config file

* Add mode parameter to load method of BaseCarton

* Add ToO_Carton

* Always use self.log instead of log in BaseCarton

* Import CatalogToToO_Target inside build_query

* Go back to importing CatalogToToO_Target at top

* Add IF NOT EXISTS to the creation of the q3c index

* Store intermediate catalog_to_XXX results in sandbox

* Linting
  • Loading branch information
albireox authored Apr 23, 2024
1 parent 1749f83 commit fb697c6
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 334 deletions.
84 changes: 54 additions & 30 deletions python/target_selection/cartons/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,12 @@ def run(self,

if self.database.table_exists(self.table_name, schema=self.schema):
if overwrite:
log.info(f'Dropping table {path!r}.')
self.log.info(f'Dropping table {path!r}.')
self.drop_table()
else:
raise RuntimeError(f'Temporary table {path!r} already exists.')

log.info('Running query ...')
self.log.info('Running query ...')
version_id = self.get_version_id()

with Timer() as timer:
Expand Down Expand Up @@ -304,21 +304,24 @@ def run(self,
query_str = cursor.mogrify(query_sql, params).decode()

if not self._disable_query_log:
log.debug(color_text(f'CREATE TABLE IF NOT EXISTS {path} AS ' + query_str,
'darkgrey'))
log_message = f'CREATE TABLE IF NOT EXISTS {path} AS ' + query_str
if self.log.rich_console:
self.log.debug(log_message, extra={"highlighter": None})
else:
self.log.debug(color_text(log_message, 'darkgrey'))
else:
log.debug('Not printing VERY long query.')
self.log.debug('Not printing VERY long query.')

with self.database.atomic():
self.setup_transaction()
execute_sql(f'CREATE TABLE IF NOT EXISTS {path} AS ' + query_sql,
params)

log.info(f'Created table {path!r} in {timer.interval:.3f} s.')
self.log.info(f'Created table {path!r} in {timer.interval:.3f} s.')

self.RModel = self.get_model()

log.debug('Adding columns and indexes.')
self.log.debug('Adding columns and indexes.')

columns = [
col.name for col in self.database.get_columns(self.table_name, self.schema)
Expand All @@ -345,18 +348,18 @@ def run(self,
execute_sql(f'ANALYZE {path};')

n_rows = self.RModel.select().count()
log.debug(f'Table {path!r} contains {n_rows:,} rows.')
self.log.debug(f'Table {path!r} contains {n_rows:,} rows.')

log.debug('Running post-process.')
self.log.debug('Running post-process.')
with self.database.atomic():
self.setup_transaction()
self.post_process(self.RModel, **post_process_kawrgs)

n_selected = self.RModel.select().where(self.RModel.selected >> True).count()
log.debug(f'Selected {n_selected:,} rows after post-processing.')
self.log.debug(f'Selected {n_selected:,} rows after post-processing.')

if add_optical_magnitudes:
log.debug('Adding optical magnitude columns.')
self.log.debug('Adding optical magnitude columns.')
self.add_optical_magnitudes()

self.has_run = True
Expand All @@ -380,12 +383,10 @@ def add_optical_magnitudes(self):
if any([mag in Model._meta.columns for mag in magnitudes]):
if not all([mag in Model._meta.columns for mag in magnitudes]):
raise TargetSelectionError(
'Some optical magnitudes are defined in the query '
'but not all of them.')
'Some optical magnitudes are defined in the query but not all of them.')
if 'optical_prov' not in Model._meta.columns:
raise TargetSelectionError('optical_prov column does not exist.')
warnings.warn('All optical magnitude columns are defined in the query.',
TargetSelectionUserWarning)
self.log.warning('All optical magnitude columns are defined in the query.')
return

# First create the columns. Also create z to speed things up. We won't
Expand Down Expand Up @@ -716,7 +717,7 @@ def write_table(self, filename=None, mode='results', write=True):
else:
filename = f'{self.name}_{self.plan}_targetdb.fits.gz'

log.debug(f'Writing table to {filename}.')
self.log.debug(f'Writing table to {filename}.')

if not self.RModel:
self.RModel = self.get_model()
Expand Down Expand Up @@ -793,24 +794,47 @@ def write_table(self, filename=None, mode='results', write=True):

return carton_table

def load(self, overwrite=False):
"""Loads the output of the intermediate table into targetdb."""
def load(self, mode='fail', overwrite=False):
"""Loads the output of the intermediate table into targetdb.
Parameters
----------
mode : str
The mode to use when loading the targets. If ``'fail'``, raises an
error if the carton already exist. If ``'overwrite'``, overwrites
the targets. If ``'append'``, appends the targets.
overwrite : bool
Equivalent to setting ``mode='overwrite'``. This option is deprecated and
will raise a warning.
"""

if overwrite:
mode = 'overwrite'
warnings.warn(
'The `overwrite` option is deprecated and will be removed in a future version. '
'Use `mode="overwrite"` instead.',
TargetSelectionUserWarning)

if self.check_targets():
if overwrite:
if mode == 'overwrite':
warnings.warn(
f'Carton {self.name!r} with plan {self.plan!r} '
f'already has targets loaded. '
'Dropping carton-to-target entries.',
TargetSelectionUserWarning,
)
self.drop_carton()
else:
elif mode == 'append':
pass
elif mode == 'fail':
raise TargetSelectionError(
f'Found existing targets for '
f'carton {self.name!r} with plan '
f'{self.plan!r}.'
)
else:
raise ValueError(f'Invalid mode {mode!r}. Use "fail", "overwrite", or "append".')

if self.RModel is None:
RModel = self.get_model()
Expand Down Expand Up @@ -872,7 +896,7 @@ def _create_carton_metadata(self):
version_pk = version.pk

if created:
log.info(
self.log.info(
f'Created record in targetdb.version for '
f'{self.plan!r} with tag {self.tag!r}.'
)
Expand All @@ -889,13 +913,13 @@ def _create_carton_metadata(self):
mapper, created_pk = tdb.Mapper.get_or_create(label=self.mapper)
mapper_pk = mapper.pk
if created:
log.debug(f'Created mapper {self.mapper!r}')
self.log.debug(f'Created mapper {self.mapper!r}')

if self.category:
category, created = tdb.Category.get_or_create(label=self.category)
category_pk = category.pk
if created:
log.debug(f'Created category {self.category!r}')
self.log.debug(f'Created category {self.category!r}')

tdb.Carton.create(
carton=self.name,
Expand All @@ -906,12 +930,12 @@ def _create_carton_metadata(self):
run_on=datetime.datetime.now().isoformat().split('T')[0]
).save()

log.debug(f'Created carton {self.name!r}')
self.log.debug(f'Created carton {self.name!r}')

def _load_targets(self, RModel):
"""Load data from the intermediate table tp targetdb.target."""

log.debug('loading data into targetdb.target.')
self.log.debug('loading data into targetdb.target.')

n_inserted = ( # noqa: 841
tdb.Target.insert_from(
Expand Down Expand Up @@ -947,14 +971,14 @@ def _load_targets(self, RModel):
.execute()
)

log.info('Inserted new rows into targetdb.target.')
self.log.info('Inserted new rows into targetdb.target.')

return

def _load_magnitudes(self, RModel):
"""Load magnitudes into targetdb.magnitude."""

log.debug('Loading data into targetdb.magnitude.')
self.log.debug('Loading data into targetdb.magnitude.')

Magnitude = tdb.Magnitude

Expand Down Expand Up @@ -1035,12 +1059,12 @@ def _load_magnitudes(self, RModel):

n_inserted = Magnitude.insert_from(select_from, fields).returning().execute() # noqa: 841

log.info('Inserted new rows into targetdb.magnitude.')
self.log.info('Inserted new rows into targetdb.magnitude.')

def _load_carton_to_target(self, RModel):
"""Populate targetdb.carton_to_target."""

log.debug('Loading data into targetdb.carton_to_target.')
self.log.debug('Loading data into targetdb.carton_to_target.')

version_pk = tdb.Version.get(
plan=self.plan,
Expand Down Expand Up @@ -1228,7 +1252,7 @@ def _load_carton_to_target(self, RModel):
.execute()
)

log.info('Inserted rows into targetdb.carton_to_target.')
self.log.info('Inserted rows into targetdb.carton_to_target.')

def drop_carton(self):
"""Drops the entry in ``targetdb.carton``."""
Expand Down
63 changes: 63 additions & 0 deletions python/target_selection/cartons/too.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2024-02-16
# @Filename: too.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

from sdssdb.peewee.sdss5db.catalogdb import (CatalogToToO_Target,
ToO_Metadata, ToO_Target)
from sdssdb.peewee.sdss5db.targetdb import (Carton, CartonToTarget,
Target, Version)

from .base import BaseCarton


__all__ = ['ToO_Carton']


class ToO_Carton(BaseCarton):
"""Target of opportunity carton.
Selects all the targets in ``catalogdb.too_target`` that don't yet exist in
the carton.
"""

name = 'too'
category = 'science'
cadence = 'bright_1x1'
priority = 3000
program = 'too'
can_offset = True

def build_query(self, version_id, query_region=None):

C2TT = CatalogToToO_Target

too_in_carton = (Target
.select(Target.catalogid)
.join(CartonToTarget)
.join(Carton)
.join(Version)
.where(Version.plan == self.plan,
Carton.carton == self.name)).alias('too_in_carton')

query = (ToO_Target.select(C2TT.catalogid,
ToO_Target.fiber_type.alias('instrument'),
ToO_Metadata.g_mag.alias('g'),
ToO_Metadata.r_mag.alias('r'),
ToO_Metadata.i_mag.alias('i'),
ToO_Metadata.z_mag.alias('z'),
ToO_Metadata.optical_prov)
.join(C2TT, on=(ToO_Target.too_id == C2TT.target_id))
.switch(ToO_Target)
.join(ToO_Metadata, on=(ToO_Target.too_id == ToO_Metadata.too_id))
.where(C2TT.version_id == version_id,
C2TT.best >> True,
C2TT.catalogid.not_in(too_in_carton)))

return query
15 changes: 15 additions & 0 deletions python/target_selection/config/target_selection.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
'1.1.0': # This is a dedicated plan for ToO
xmatch_plan: 1.0.0
cartons:
- too
schema: sandbox
magnitudes:
h: [catalog_to_twomass_psc, twomass_psc, twomass_psc.h_m]
j: [catalog_to_twomass_psc, twomass_psc, twomass_psc.j_m]
k: [catalog_to_twomass_psc, twomass_psc, twomass_psc.k_m]
bp: [catalog_to_gaia_dr3_source, gaia_dr3_source, gaia_dr3_source.phot_bp_mean_mag]
rp: [catalog_to_gaia_dr3_source, gaia_dr3_source, gaia_dr3_source.phot_rp_mean_mag]
gaia_g: [catalog_to_gaia_dr3_source, gaia_dr3_source, gaia_dr3_source.phot_g_mean_mag]
database_options:
work_mem: '5GB'

'1.0.51':
xmatch_plan: 1.0.0
cartons:
Expand Down
Loading

0 comments on commit fb697c6

Please sign in to comment.