Skip to content

Commit

Permalink
Add sdss_id_to_catalog materialised view (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox authored Sep 27, 2024
1 parent cb62723 commit c812ab9
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 0 deletions.
17 changes: 17 additions & 0 deletions schema/sdss5db/catalogdb/sdss_id_to_catalog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# sdss_id_to_catalog

`sdss_id_to_catalog.py` contains code to create a materialised view with a mapping of sdss_id/catalogid to each one of the parent catalogue table unique identifiers.

For each catalogid associated with an sdss_id in `catalogdb.sdss_id_flat`, the catalogid is joined to all the `catalog_to_X` tables and then to the parent catalogue table to get the unique identifier (primary key in the parent catalogue table). The join is always done using `catalog_to_X.best = TRUE` so only the best match is considered.

The list of parent catalogues is generated dynamically from the tables in `catalogdb`. For each catalogue, a column is added to the resulting materialised view with the format `<parent_table>__<unique_identifier>`, for example `gaia_dr3_source__source_id`.

This code must be run either in the `pipelines` or `operations` machine, or in a local computer in which the `pipelines` database server (port 5432) has been forwarded to `localhost:7602`. For example, to create the view in a temporary location prior to renaming it to its final name:

```python
from sdss_id_to_catalog import create_sdss_id_to_catalog_view

create_sdss_id_to_catalog_view("sdss_id_to_catalog_temp", local=False)
```

Here `local=False` means that we are running the code somewhere at Utah that can access `pipelines.sdss.org:5432`. It also assumes that the user running the code has permissions to log in and create a new view. This can be changed by setting `user=`.
210 changes: 210 additions & 0 deletions schema/sdss5db/catalogdb/sdss_id_to_catalog/sdss_id_to_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2024-05-09
# @Filename: sdss_id_to_catalog.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import getpass
import textwrap

from typing import TYPE_CHECKING

from sdsstools import get_logger
from sdsstools.utils import Timer

from sdssdb.peewee.sdss5db import database


if TYPE_CHECKING:
pass


def create_sdss_id_to_catalog_view(
view_name: str = "sdss_id_to_catalog",
schema: str = "catalogdb",
user: str | None = None,
drop_existing: bool = False,
local: bool = False,
show_query=False,
):
"""Creates a view that maps SDSS IDs to parent catalogue PKs in ``pipelines.sdss.org``.
Parameters
----------
view_name
The name of the view to create.
schema
The schema where the view will be created.
user
The user to connect to the database. If not provided, the current user is used.
drop_existing
Whether to drop the view if it already exists.
local
If `True`, will connect to the database in port 7602. This is useful if the ``pipelines``
database server port has been forwarded to the local machine.
show_query
If `True`, will print the query that will be executed.
"""

log = get_logger("sdssdb.sdss_id_to_catalog", use_rich_handler=True)
log.set_level(5)
log.sh.setLevel(5)

if user is None:
user = getpass.getuser()

if local:
database.connect(user=user, host="localhost", port=7602)
else:
database.connect(user=user, host="pipelines.sdss.org", port=5432)

assert database.connected, "Database not connected."

view_query = database.execute_sql(
f"SELECT * FROM pg_matviews WHERE matviewname = '{view_name}';"
)
view_exists = view_query.fetchone() is not None

if view_exists:
if drop_existing:
log.warning(f'Droping existing view "{view_name}"')
database.execute_sql(f"DROP MATERIALIZED VIEW IF EXISTS catalogdb.{view_name};")
else:
raise ValueError(f'View "{view_name}" already exists.')

# We build the query manually so that the resulting query is easy to read in
# the materialized view.
tables = database.get_tables(schema="catalogdb")
catalog_to_tables = [table for table in tables if table.startswith("catalog_to_")]

select_columns_list: list[str] = []
aliases: list[str] = []
query = """
CREATE MATERIALIZED VIEW {schema}.{view_name} TABLESPACE pg_default AS
SELECT row_number() OVER () as pk,
catalogdb.sdss_id_flat.sdss_id,
catalogdb.catalog.catalogid,
catalogdb.catalog.version_id,
catalogdb.catalog.lead,
{select_columns}
FROM catalogdb.sdss_id_flat
JOIN catalogdb.catalog
ON sdss_id_flat.catalogid = catalog.catalogid
"""

for c2table in sorted(catalog_to_tables):
table = c2table.replace("catalog_to_", "")

if c2table in ["catalog_to_sdss_dr13_photoobj"]:
continue
if table in ["skies_v1", "skies_v2"]:
continue

pks = database.get_primary_keys(table, schema="catalogdb")

if c2table == "catalog_to_sdss_dr13_photoobj_primary":
table = "sdss_dr13_photoobj"
pks = ["objid"]
elif table.startswith("catwise"):
pks = ["source_id"]

if not database.table_exists(table, schema="catalogdb"):
continue

if len(pks) != 1:
log.warning(f"Skipping table {table!r} with multiple primary keys.")
continue

pk = pks[0]
alias = f"{table}__{pk}"
aliases.append(alias)

# Gaia DR2 and 2MASS PSC are a special case because in v0.1 and v0.5 we
# did not explicitely cross-match them as they were complete in TIC v8. But
# in v1 we are not using the TIC and we did cross-match them using Gaia DR3
# best neighbour tables. Here we need to include either one of the values
# (only one of them will be non-NULL) in the view.

if table == "gaia_dr2_source":
select_columns_list.append(
"COALESCE(catalogdb.gaia_dr2_source.source_id, "
f"catalogdb.tic_v8.gaia_int) AS {alias}"
)
elif table == "twomass_psc":
select_columns_list.append(
f"COALESCE(catalogdb.twomass_psc.pts_key, tm2.pts_key) AS {alias}"
)
else:
select_columns_list.append(f"catalogdb.{table}.{pk} AS {alias}")

query += f"""
LEFT JOIN catalogdb.{c2table}
ON catalog.catalogid = {c2table}.catalogid
AND {c2table}.best
AND {c2table}.version_id = catalog.version_id
LEFT JOIN catalogdb.{table}
ON catalogdb.{c2table}.target_id = catalogdb.{table}.{pk}
"""

if table == "twomass_psc":
# catalog_to_tic_v8 has already been added (its alphabetically before
# catalog_to_twomass_psc) so we only add the extra join here. We need to
# use an alias since there is already a direct join
# from catalog_to_twomass_psc to twomass_psc.
query += """
LEFT JOIN catalogdb.twomass_psc AS tm2
ON catalogdb.tic_v8.twomass_psc = tm2.designation
"""

select_columns: str = ""
for column in select_columns_list:
comma = "," if column != select_columns_list[-1] else ""
select_columns += f" {column}{comma}\n"

query = textwrap.dedent(
query.format(
select_columns=select_columns,
schema=schema,
view_name=view_name,
)
)

if show_query:
log.info("The following query will be run:")
log.info(query)

log.info(f"Creating view '{view_name}' ...")

with Timer() as timer:
with database.atomic():
database.execute_sql("SET LOCAL search_path TO catalogdb;")
database.execute_sql("SET LOCAL max_parallel_workers = 64;")
database.execute_sql("SET LOCAL max_parallel_workers_per_gather = 32;")
database.execute_sql("SET LOCAL effective_io_concurrency = 500;")
database.execute_sql('SET LOCAL effective_cache_size = "1TB";')
database.execute_sql('SET LOCAL work_mem = "1000MB";')
database.execute_sql('SET LOCAL temp_buffers = "1000MB";')

database.execute_sql(query)

log.debug(f"Query executed in {timer.elapsed:.2f} seconds.")

log.info("Creating indices ..")

database.execute_sql(f"CREATE INDEX ON {view_name} (pk);")
database.execute_sql(f"CREATE INDEX ON {view_name} (sdss_id);")
database.execute_sql(f"CREATE INDEX ON {view_name} (catalogid);")
database.execute_sql(f"CREATE INDEX ON {view_name} (version_id);")

for alias in aliases:
database.execute_sql(f"CREATE INDEX ON {view_name} ({alias});")

log.info("Running VACUUM ANALYZE ...")
database.execute_sql(f"VACUUM ANALYZE {view_name};")

log.info("Done.")

0 comments on commit c812ab9

Please sign in to comment.