Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

U/jrbogart/parquet star2 #82

Merged
merged 3 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ jobs:
run: |
mkdir rubin_sim_data
mkdir rubin_sim_data/sims_sed_library
# Just get the skybrightness, throughputs, and SED data for now.
curl https://s3df.slac.stanford.edu/groups/rubin/static/sim-data/rubin_sim_data/skybrightness_may_2021.tgz | tar -C rubin_sim_data -xz
# Just get the throughputs and SED data for now.
curl https://s3df.slac.stanford.edu/groups/rubin/static/sim-data/rubin_sim_data/throughputs_2023_09_07.tgz | tar -C rubin_sim_data -xz
curl https://s3df.slac.stanford.edu/groups/rubin/static/sim-data/sed_library/seds_170124.tar.gz | tar -C rubin_sim_data/sims_sed_library -xz

Expand Down
81 changes: 47 additions & 34 deletions skycatalogs/catalog_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .utils.config_utils import assemble_MW_extinction, assemble_cosmology
from .utils.config_utils import assemble_object_types, assemble_provenance
from .utils.config_utils import write_yaml
from .utils.star_parquet_input import _star_parquet_reader
from .utils.parquet_schema_utils import make_galaxy_schema
from .utils.parquet_schema_utils import make_galaxy_flux_schema
from .utils.parquet_schema_utils import make_star_flux_schema
Expand Down Expand Up @@ -194,7 +195,7 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None,
main_only=False, flux_parallel=16, galaxy_nside=32,
galaxy_stride=1000000, provenance=None,
dc2=False, sn_object_type='sncosmo', galaxy_type='cosmodc2',
include_roman_flux=False):
include_roman_flux=False, star_input_fmt='sqlite'):
"""
Store context for catalog creation

Expand Down Expand Up @@ -240,6 +241,7 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None,
sn_object_type Which object type to use for SNe.
galaxy_type Currently allowed values are cosmodc2 and diffsky
include_roman_flux Calculate and write Roman flux values
star_input_fmt May be either 'sqlite' or 'parquet'

Might want to add a way to specify template for output file name
and template for input sedLookup file name.
Expand All @@ -250,6 +252,8 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None,
_star_db = '/global/cfs/cdirs/lsst/groups/SSim/DC2/dc2_stellar_healpixel.db'
_sn_db = '/global/cfs/cdirs/lsst/groups/SSim/DC2/cosmoDC2_v1.1.4/sne_cosmoDC2_v1.1.4_MS_DDF_healpix.db'

_star_parquet = '/global/cfs/cdirs/descssim/postDC2/UW_star_catalog'

self._galaxy_stride = galaxy_stride
if pkg_root:
self._pkg_root = pkg_root
Expand Down Expand Up @@ -279,8 +283,12 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None,
self._sn_object_type = sn_object_type

self._star_truth = star_truth
self._star_input_fmt = star_input_fmt
if self._star_truth is None:
self._star_truth = _star_db
if self._star_input_fmt == 'sqlite':
self._star_truth = _star_db
elif self._star_input_fmt == 'parquet':
self._star_truth = _star_parquet
self._cat = None

self._parts = parts
Expand Down Expand Up @@ -315,7 +323,6 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None,
self._provenance = provenance
self._dc2 = dc2
self._include_roman_flux = include_roman_flux

self._obs_sed_factory = None

def _make_tophat_columns(self, dat, names, cmp):
Expand Down Expand Up @@ -830,19 +837,22 @@ def create_pointsource_pixel(self, pixel, arrow_schema, star_cat=None,

if star_cat:
# Get data for this pixel
cols = ','.join(['format("%s",simobjid) as id', 'ra',
'decl as dec',
'magNorm as magnorm', 'mura', 'mudecl as mudec',
'radialVelocity as radial_velocity', 'parallax',
'sedFilename as sed_filepath', 'ebv'])
q = f'select {cols} from stars where hpid={pixel} '
with sqlite3.connect(star_cat) as conn:
star_df = pd.read_sql_query(q, conn)

star_df['sed_filepath'] = get_star_sed_path(star_df['sed_filepath'])

if self._star_input_fmt == 'sqlite':
cols = ','.join(['format("%s",simobjid) as id', 'ra',
'decl as dec', 'magNorm as magnorm', 'mura',
'mudecl as mudec',
'radialVelocity as radial_velocity',
'parallax',
'sedFilename as sed_filepath', 'ebv'])
q = f'select {cols} from stars where hpid={pixel} '
with sqlite3.connect(star_cat) as conn:
star_df = pd.read_sql_query(q, conn)
elif self._star_input_fmt == 'parquet':
star_df = _star_parquet_reader(self._star_truth, pixel,
arrow_schema)
nobj = len(star_df['id'])
self._logger.debug(f'Found {nobj} stars')
star_df['sed_filepath'] = get_star_sed_path(star_df['sed_filepath'])
star_df['object_type'] = np.full((nobj,), 'star')
star_df['host_galaxy_id'] = np.zeros((nobj,), np.int64())

Expand Down Expand Up @@ -873,26 +883,29 @@ def create_pointsource_pixel(self, pixel, arrow_schema, star_cat=None,
params_df = pd.read_sql_query(q2, conn)

nobj = len(sn_df['ra'])
sn_df['object_type'] = np.full((nobj,), self._sn_object_type)

sn_df['MW_rv'] = make_MW_extinction_rv(sn_df['ra'], sn_df['dec'])
sn_df['MW_av'] = make_MW_extinction_av(sn_df['ra'], sn_df['dec'])

# Add fillers for columns not relevant for sn
sn_df['sed_filepath'] = np.full((nobj), '')
sn_df['magnorm'] = np.full((nobj,), None)
sn_df['mura'] = np.full((nobj,), None)
sn_df['mudec'] = np.full((nobj,), None)
sn_df['radial_velocity'] = np.full((nobj,), None)
sn_df['parallax'] = np.full((nobj,), None)
sn_df['variability_model'] = np.full((nobj,), 'salt2_extended')

# Form array of struct from params_df
sn_df['salt2_params'] = params_df.to_records(index=False)
out_table = pa.Table.from_pandas(sn_df, schema=arrow_schema)
self._logger.debug('Created arrow table from sn dataframe')

writer.write_table(out_table)
if nobj > 0:
sn_df['object_type'] = np.full((nobj,), self._sn_object_type)

sn_df['MW_rv'] = make_MW_extinction_rv(sn_df['ra'],
sn_df['dec'])
sn_df['MW_av'] = make_MW_extinction_av(sn_df['ra'],
sn_df['dec'])

# Add fillers for columns not relevant for sn
sn_df['sed_filepath'] = np.full((nobj), '')
sn_df['magnorm'] = np.full((nobj,), None)
sn_df['mura'] = np.full((nobj,), None)
sn_df['mudec'] = np.full((nobj,), None)
sn_df['radial_velocity'] = np.full((nobj,), None)
sn_df['parallax'] = np.full((nobj,), None)
sn_df['variability_model'] = np.full((nobj,), 'salt2_extended')

# Form array of struct from params_df
sn_df['salt2_params'] = params_df.to_records(index=False)
out_table = pa.Table.from_pandas(sn_df, schema=arrow_schema)
self._logger.debug('Created arrow table from sn dataframe')

writer.write_table(out_table)

writer.close()
if self._provenance == 'yaml':
Expand Down
7 changes: 6 additions & 1 deletion skycatalogs/scripts/create_sc.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
depends on value of --galaxy-type option''')
parser.add_argument('--include-roman-flux', action='store_true',
help='If supplied calculate & store Roman as well as LSST fluxes')
parser.add_argument('--star-input-fmt', default='sqlite',
choices=['sqlite', 'parquet'], help='''
star truth may come from either sqlite db or collection
of parquet files''')

args = parser.parse_args()

Expand Down Expand Up @@ -139,7 +143,8 @@
provenance=provenance,
dc2=args.dc2, galaxy_type=args.galaxy_type,
galaxy_truth=args.galaxy_truth,
include_roman_flux=args.include_roman_flux)
include_roman_flux=args.include_roman_flux,
star_input_fmt=args.star_input_fmt)
logger.info(f'Starting with healpix pixel {parts[0]}')
if not args.no_galaxies:
logger.info("Creating galaxy catalogs")
Expand Down
95 changes: 95 additions & 0 deletions skycatalogs/utils/star_parquet_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# import pyarrow as pa
import os
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import numpy.ma as ma
import healpy


def _find_star_files(dirpath, pixel):
'''
Parameters
----------
dirpath string Path to directory containing input parquet files
HTM partitioning
pixel int Healpix pixel

Returns
-------
List of parquet file paths for files which might have sources belonging
to specified pixel
'''
# For now, hard-code to 3 files which have everything needed
# for joint simulation region

fnames = ['stars_chunk_8800000000000_8900000000000.parquet',
'stars_chunk_9200000000000_9300000000000.parquet',
'stars_chunk_9700000000000_9800000000000.parquet']

return [os.path.join(dirpath, fname) for fname in fnames]


def _calculate_pixel_mask(ra, dec, pixel, nside=32):
'''
Parameters
----------
ra Array of float
dec Array of float
pixel int

Returns
-------
Mask, set to 1 for all elements not in the pixel
'''
ra = np.array(ra)
dec = np.array(dec)
in_pix = healpy.pixelfunc.ang2pix(nside, ra, dec, nest=False, lonlat=True)
mask = ma.logical_not(in_pix == pixel)
return mask


def _star_parquet_reader(dirpath, pixel, output_arrow_schema):
# Get requisite info from parquet files for sources in pixel.
# Next do renames and calculation for magnorm
to_read = ['simobjid', 'ra', 'decl', 'mura', 'mudecl', 'vrad',
'parallax', 'sedfilename', 'flux_scale', 'ebv']
rename = {'decl': 'dec', 'sedfilename': 'sed_filepath', 'mudecl': 'mudec',
'vrad': 'radial_velocity'}
out_fields = ['id', 'ra', 'dec', 'mura', 'mudec', 'radial_velocity',
'parallax', 'sed_filepath', 'magnorm', 'ebv']

paths = _find_star_files(dirpath, pixel)
out_dict = {}
for k in out_fields:
out_dict[k] = []
for f in paths:
f_dict = {}
for k in out_fields:
f_dict[k] = []
meta = pq.read_metadata(f)
pq_file = pq.ParquetFile(f)
for rg in range(meta.num_row_groups):
tbl = pq_file.read_row_group(rg, columns=to_read)
msk = _calculate_pixel_mask(tbl['ra'], tbl['decl'], pixel)
for col in to_read:
if col in rename:
f_dict[rename[col]] += list(ma.array(np.array(tbl[col]),
mask=msk).compressed())
elif col == 'flux_scale':
# compute magnorm from flux_scale
tmp = ma.array(np.array(tbl['flux_scale']),
mask=msk).compressed()
f_dict['magnorm'] += list(-2.5*np.log(tmp)/np.log(10.0) - 18.402732642)
elif col == 'simobjid':
# convert simobjid to string, change name to id
tmp = [str(sim_id) for sim_id in tbl['simobjid']]
f_dict['id'] += list(ma.array(np.array(tmp),
mask=msk).compressed())
else:
f_dict[col] += list(ma.array(np.array(tbl[col]),
mask=msk).compressed())
for k in out_fields:
out_dict[k] += f_dict[k]

return pd.DataFrame(out_dict)