diff --git a/python/boss_drp/post/fieldmerge.py b/python/boss_drp/post/fieldmerge.py index 37685394..4284cd1a 100755 --- a/python/boss_drp/post/fieldmerge.py +++ b/python/boss_drp/post/fieldmerge.py @@ -48,7 +48,7 @@ def read_zans(sp1d_dir, field, mjd): zans = None return(zans) -def read_zline(sp1d_dir, field, mjd): +def read_zline(sp1d_dir, field, mjd, obs): zlinefile = ptt.join(sp1d_dir,'spZline-' + field + '-' + mjd + '.fits') if ptt.exists(zlinefile): splog.log('Reading Zline file: '+ptt.basename(zlinefile)) @@ -56,6 +56,7 @@ def read_zline(sp1d_dir, field, mjd): zline = Table.read(zlinefile) for key in zline.colnames: zline.rename_column(key,key.upper()) + zline.add_column(obs, name = 'OBS') zline.meta = {} except: splog.log('Error reading '+zlinefile) @@ -306,7 +307,7 @@ def oneField(row, field, mjd, skip_line=False, include_bad=False, legacy=False, spAll = build_specobjid(spAll, epoch = epoch, custom = custom) if spline is None and skip_line is False: - spline = read_zline(sp1d_dir, field, mjd) + spline = read_zline(sp1d_dir, field, mjd, row['OBSERVATORY']) if spline is not None: if len(spline) > 0: @@ -423,34 +424,44 @@ def specPrimary_sdssid(spAll, update = False): score = (4 * (spAll['SN_MEDIAN'][:,jfilt] > 0) + 2*(wwhere(spAll['FIELDQUALITY'],'good*')) + 1 * (zw_primtest == 0) + (spAll['SN_MEDIAN'][:,jfilt] > 0)) / max(spAll['SN_MEDIAN'][:,jfilt]+1.) + + specprim = spAll['SPECPRIMARY'] + specboss = spAll['SPECBOSS'] + nspecobs = spAll['NSPECOBS'] + if isinstance(spAll['SDSS_ID'], MaskedColumn): + sdssids = spAll['SDSS_ID'].filled(-999).data + else: + sdssids = spAll['SDSS_ID'].data + sidx = np.where((sdssids < 0))[0] + specprim[sidx] = -999 + specboss[sidx] = -999 + nspecobs[sidx] = -999 + if update: splog.log(f'Keeping current and updating only') - specprim = spAll['SPECPRIMARY'] idx_new = np.where(specprim == -1)[0] - try: - sdssids = spAll['SDSS_ID'].filled(-999).data - except: - sdssids = spAll['SDSS_ID'].data sdssids = np.unique(sdssids[idx_new]) - sdssids = sdssids[sdssids >= 0] else: - try: - sdssids = np.unique(spAll['SDSS_ID'].filled(-999).data) - except: - sdssids = np.unique(spAll['SDSS_ID'].data) - sdssids = sdssids[sdssids >= 0] spAll['SPECPRIMARY'] = 0 spAll['SPECBOSS'] = 0 spAll['NSPECOBS'] = 0 - for id in sdssids: + sdssids = np.unique(sdssids) + sdssids = sdssids[sdssids >= 0] + counter_step = round(len(sdssids)/100) + for i, id in enumerate(sdssids): + if (i % counter_step) == 0: + if i + counter_step < len(sdssids): + splog.info(f'Assigning SpecPrimaries: {i+1} - {i+100000} (of {len(sdssids)})') + else: + splog.info(f'Assigning SpecPrimaries: {i+1} - {len(sdssids)} (of {len(sdssids)})') idx = np.where(spAll['SDSS_ID'].data == id)[0] - spAll[idx]['NSPECOBS'] = len(idx) + nspecobs[idx] = len(idx) if update: - spAll[idx]['SPECPRIMARY'] = 0 - spAll[idx]['SPECBOSS'] = 0 + specprim[idx] = 0 + specboss[idx] = 0 primary = np.argmax(score[idx]) - spAll[idx[primary]]['SPECPRIMARY'] = 1 - spAll[idx[primary]]['SPECBOSS'] = 1 + specprim[idx[primary]] = 1 + specboss[idx[primary]] = 1 splog.log('Time to assign primaries = '+str(timedelta(seconds = time.time() - t2))) return(spAll) @@ -520,7 +531,7 @@ def build_custom_fieldlist(indir, custom, run2d, run1d): 'OBSERVATORY':[obs]}) flist = vstack([flist, flist_row]) - print(flist) + flist.pprint_all() return(flist) @@ -592,8 +603,8 @@ def fieldmerge(run2d=getenv('RUN2D'), indir= getenv('BOSS_SPECTRO_REDUX'), logfile = ptt.join(indir, f'spAll_{custom}.log') if dev: logfile = logfile.replace('spAll','spAll_dev') - - makedirs(ptt.dirname(logfile),exist_ok=True) + if len(ptt.dirname(logfile)) > 0: + makedirs(ptt.dirname(logfile),exist_ok=True) splog.open(logfile=logfile, backup=False) splog.log('Log file '+logfile+' opened '+ time.ctime()) @@ -668,14 +679,11 @@ def fieldmerge(run2d=getenv('RUN2D'), indir= getenv('BOSS_SPECTRO_REDUX'), epoch=epoch, allsky=allsky, custom=custom) if not clobber: + spAll = None + spline = None if ptt.exists(spallfile): splog.log(f'Reading Existing spAll file: {spallfile}') spAll = Table.read(spallfile) - try: - spAll_fmjds = spAll['FIELD','MJD'] - spAll_fmjds = unique(spAll_fmjds,keys=['FIELD','MJD']) - except: - spAll_fmjds = Table(names = ['FIELD','MJD']) elif ptt.exists(spallfile.replace('.gz','')): splog.log(f"Reading Existing spAll file: {spallfile.replace('.gz','')}") try: @@ -683,13 +691,17 @@ def fieldmerge(run2d=getenv('RUN2D'), indir= getenv('BOSS_SPECTRO_REDUX'), except: time.sleep(60) spAll = Table.read(spallfile.replace('.gz','')) - try: - spAll_fmjds = spAll['FIELD','MJD'] - spAll_fmjds = unique(spAll_fmjds,keys=['FIELD','MJD']) - except: - spAll_fmjds = Table(names = ['FIELD','MJD']) else: - spAll_fmjds = Table(names = ['FIELD','MJD']) + spAll_fmjds = Table(names = ['FIELD','MJD','OBS']) + if spAll is not None: + try: + spAll_fmjds = spAll['FIELD','MJD','OBS'] + if isinstance(spAll_fmjds['FIELD'], MaskedColumn): + spAll_fmjds['FIELD'] = spAll_fmjds['FIELD'].filled(0) + spAll_fmjds = unique(spAll_fmjds,keys=['FIELD','MJD','OBS']) + except Exception as e: + splog.warning(f'{type(e).__name__}: {e}') + spAll_fmjds = Table(names = ['FIELD','MJD','OBS']) try: spAll.meta = {} spAll_fmjds.meta = {} @@ -702,11 +714,6 @@ def fieldmerge(run2d=getenv('RUN2D'), indir= getenv('BOSS_SPECTRO_REDUX'), except: time.sleep(60) spline = Table.read(splinefile) - try: - spline_fmjds = spline['FIELD','MJD'] - spline_fmjds = unique(spline_fmjds,keys=['FIELD','MJD']) - except: - spline_fmjds = Table(names = ['FIELD','MJD']) elif ptt.exists(splinefile.replace('.gz','')): splog.log(f"Reading Existing spLine file: {splinefile.replace('.gz','')}") try: @@ -714,56 +721,76 @@ def fieldmerge(run2d=getenv('RUN2D'), indir= getenv('BOSS_SPECTRO_REDUX'), except: time.sleep(60) spline = Table.read(splinefile.replace('.gz','')) - try: - spline_fmjds = spline['FIELD','MJD'] - spline_fmjds = unique(spline_fmjds,keys=['FIELD','MJD']) - except: - spline_fmjds = Table(names = ['FIELD','MJD']) else: - spline_fmjds = Table(names = ['FIELD','MJD']) + spline_fmjds = Table(names = ['FIELD','MJD','OBS']) + if spline is not None: + try: + try: + spline_fmjds = spline['FIELD','MJD','OBS'] + except: + spline_fmjds = spline['FIELD','MJD'] + spline_fmjds.add_column('unknown', name='OBS') + if isinstance(spline_fmjds['FIELD'], MaskedColumn): + spline_fmjds['FIELD'] = spline_fmjds['FIELD'].filled(0) + if isinstance(spAll_fmjds['OBS'], MaskedColumn): + spline_fmjds['OBS'] = spline_fmjds['OBS'].astype(object) + spline_fmjds['OBS'].filled('unknown') + spline_fmjds['OBS'] = spline_fmjds['OBS'].astype(str) + if isinstance(spAll_fmjds['FIELD'], MaskedColumn): + spline_fmjds['FIELD'] = spAll_fmjds['FIELD'].filled(0) + spline_fmjds = unique(spline_fmjds,keys=['FIELD','MJD','OBS']) + + except Exception as e: + splog.warning(f'{type(e).__name__}: {e}') + spline_fmjds = Table(names = ['FIELD','MJD','OBS']) try: spline.meta = {} spline_fmjds.meta = {} except: pass if remerge_fmjd is not None: - idx = np.where((spAll['FIELD'] == int(remerge_fmjd.split('-')[0])) & (spAll['MJD'] == int(remerge_fmjd.split('-')[1])))[0] - idxl = np.where((spline['FIELD'] == int(remerge_fmjd.split('-')[0])) & (spline['MJD'] == int(remerge_fmjd.split('-')[1])))[0] + idx = np.where((spAll_fmjds['FIELD'] == int(remerge_fmjd.split('-')[0])) & + (spAll_fmjds['MJD'] == int(remerge_fmjd.split('-')[1])))[0] + idxl = np.where((spline_fmjds['FIELD'] == int(remerge_fmjd.split('-')[0])) & + (spline_fmjds['MJD'] == int(remerge_fmjd.split('-')[1])))[0] if len(idx) > 0: - spAll.remove_rows(idx) - spAll_fmjds = spAll['FIELD','MJD'] - spAll_fmjds = unique(spAll_fmjds,keys=['FIELD','MJD']) + spAll_fmjds.remove_rows(idx) + spAll_fmjds = unique(spAll_fmjds,keys=['FIELD','MJD','OBS']) if len(idxl) > 0: - spline.remove_rows(idxl) - spline_fmjds = spline['FIELD','MJD'] - spline_fmjds = unique(spline_fmjds,keys=['FIELD','MJD']) + spline_fmjds.remove_rows(idxl) + spline_fmjds = unique(spline_fmjds,keys=['FIELD','MJD','OBS']) if remerge_mjd is not None: - idx = np.where((spAll['MJD'] == int(remerge_mjd)))[0] - idxl = np.where((spline['MJD'] == int(remerge_mjd)))[0] + idx = np.where((spAll_fmjds['MJD'] == int(remerge_mjd)))[0] + idxl = np.where((spline_fmjds['MJD'] == int(remerge_mjd)))[0] if len(idx) > 0: - spAll.remove_rows(idx) - spAll_fmjds = spAll['FIELD','MJD'] - spAll_fmjds = unique(spAll_fmjds,keys=['FIELD','MJD']) + spAll_fmjds.remove_rows(idx) + spAll_fmjds = unique(spAll_fmjds,keys=['FIELD','MJD','OBS']) if len(idxl) > 0: - spline.remove_rows(idxl) - spline_fmjds = spline['FIELD','MJD'] - spline_fmjds = unique(spline_fmjds,keys=['FIELD','MJD']) + spline_fmjds.remove_rows(idxl) + spline_fmjds = unique(spline_fmjds,keys=['FIELD','MJD','OBS']) flist.sort(['MJD','FIELD']) if mjdstart is not None: splog.info(f"Only Checking Field-MJDs with MJD >= {mjdstart}") flist = flist[flist['MJD'] >= mjdstart] j = 0 for i, row in enumerate(flist): + tfield_str = f"Field:{row['FIELD']} MJD:{row['MJD']} OBS:{row['OBSERVATORY']} ({i+1}/{len(flist)})" if spAll_fmjds is not None: - idx = spAll_fmjds[(spAll_fmjds['FIELD'] == int(row['FIELD'])) & (spAll_fmjds['MJD'] == int(row['MJD']))] - idxl = spline_fmjds[(spline_fmjds['FIELD'] == int(row['FIELD'])) & (spline_fmjds['MJD'] == int(row['MJD']))] + idx = spAll_fmjds[(spAll_fmjds['FIELD'] == int(row['FIELD'])) & + (spAll_fmjds['MJD'] == int(row['MJD'])) & + (spAll_fmjds['OBS'] == row['OBSERVATORY'])] + + idxl = spline_fmjds[(spline_fmjds['FIELD'] == int(row['FIELD'])) & + (spline_fmjds['MJD'] == int(row['MJD'])) & + ((spline_fmjds['OBS'] == row['OBSERVATORY']) | + (spline_fmjds['OBS'] == 'unknown'))] if len(idx)*len(idxl) > 0: - splog.log(f"Skipping (Complete) Field:{row['FIELD']} MJD:{row['MJD']} ({i+1}/{len(flist)})") + splog.log(f"Skipping (Complete) {tfield_str}") continue if (field is not None) and (mjd is not None) and (allsky is False): if (row['STATUS2D'].lower().strip() != 'done') or (row['STATUSCOMBINE'].lower().strip() != 'done') or (row['STATUS1D'].lower().strip() != 'done'): - splog.log(f"Checking incomplete status ({row['STATUS2D'].strip()} RUN2D) Field:{row['FIELD']} MJD:{row['MJD']} ({i+1}/{len(flist)})") + splog.log(f"Checking incomplete status ({row['STATUS2D'].strip()} RUN2D) {tfield_str}") fmlog = f'fieldlist-{field}-{mjd}.log' row = retry(fieldlist, retries=3, delay = 5, logger=splog.log, create=True, topdir=indir, run2d=[run2d], run1d=[getenv('RUN1D')], @@ -788,17 +815,16 @@ def fieldmerge(run2d=getenv('RUN2D'), indir= getenv('BOSS_SPECTRO_REDUX'), print(len(flist.columns)) raise if row['STATUS2D'].lower().strip() != 'done': - splog.log(f"Skipping ({row['STATUS2D'].strip()} RUN2D) Field:{row['FIELD']} MJD:{row['MJD']} ({i+1}/{len(flist)})") + splog.log(f"Skipping ({row['STATUS2D'].strip()} RUN2D) {tfield_str}") continue elif row['STATUSCOMBINE'].lower().strip() != 'done': - splog.log(f"Skipping ({row['STATUSCOMBINE'].strip()} Combine) Field:{row['FIELD']} MJD:{row['MJD']} ({i+1}/{len(flist)})") + splog.log(f"Skipping ({row['STATUSCOMBINE'].strip()} Combine) {tfield_str}") continue elif row['STATUS1D'].lower().strip() != 'done': - splog.log(f"Skipping ({row['STATUS1D'].strip()} RUN1D) Field:{row['FIELD']} MJD:{row['MJD']} ({i+1}/{len(flist)})") + splog.log(f"Skipping ({row['STATUS1D'].strip()} RUN1D) {tfield_str}") continue - splog.log(f"Reading/Building Field:{row['FIELD']} MJD:{row['MJD']} ({i+1}/{len(flist)})") - + splog.log(f"Reading/Building {tfield_str}") if dev: dev1 = True if merge_only else False else: @@ -878,7 +904,6 @@ def fieldmerge(run2d=getenv('RUN2D'), indir= getenv('BOSS_SPECTRO_REDUX'), else: del onefield gc.collect() - if custom is not None and mjd is not None: field = custom if not(field is not None and mjd is not None): if spline is not None: @@ -914,58 +939,61 @@ def fieldmerge(run2d=getenv('RUN2D'), indir= getenv('BOSS_SPECTRO_REDUX'), - for col in ['ASSIGNED','ON_TARGET','VALID','DECOLLIDED', 'TOO']: + for col in ['ASSIGNED','ON_TARGET','VALID','DECOLLIDED', 'TOO','CARTON_TO_TARGET_PK']: if col not in spAll.columns: + splog.info(f'{col} missing from spAll') spAll[col] = '-999' - for col in ['CARTON_TO_TARGET_PK']: - if col not in spAll.columns: - spAll[col] = '0' - for col in ['MOON_DIST','MOON_PHASE']: + for col in ['MOON_DIST','MOON_PHASE','DELTA_RA_LIST','DELTA_DEC_LIST']: if col not in spAll.columns: + splog.info(f'{col} missing from spAll') spall[col] = 'nan' spAll_lite = spAll['ASSIGNED','ON_TARGET','VALID','DECOLLIDED', 'TOO', 'MOON_DIST','MOON_PHASE','CARTON_TO_TARGET_PK', 'DELTA_RA_LIST','DELTA_DEC_LIST'].copy() - spAll_lite.rename_column('DELTA_RA_LIST', 'DELTA_RA') - spAll_lite.rename_column('DELTA_DEC_LIST', 'DELTA_DEC') - for i in range(mr): if (i % 100000) == 0: if i + 100000 < mr: splog.info(f'Re-Formatting arrays in spAll-lite rows: {i+1} - {i+100000} (of {mr})') else: splog.info(f'Re-Formatting arrays in spAll-lite rows: {i+1} - {mr} (of {mr})') + + for col in ['ASSIGNED','ON_TARGET','VALID','DECOLLIDED','TOO','CARTON_TO_TARGET_PK']: + try: + if col in ['CARTON_TO_TARGET_PK']: + spAll_lite[col][i] = str(int(np.asarray(spAll[col][i].split())[0])) + else: + spAll_lite[col][i] = str(min(np.asarray(spAll[col][i].split()).astype(int))) + except Exception as e: + splog.warning(f'{col}: {type(e).__name__}: {e}') + print(f'{col}: {type(e).__name__}: {e}') + spAll_lite[col][i] = '0' with warnings.catch_warnings(): warnings.filterwarnings(action='ignore', message='Mean of empty slice') - for col in ['ASSIGNED','ON_TARGET','VALID','DECOLLIDED','TOO']: - try: - spAll_lite[i][col] = str(min(np.array(spAll[i][col].split()).astype(int))) - except: - spAll_lite[i][col] = '0' - for col in ['MOON_DIST','MOON_PHASE','DELTA_RA','DELTA_DEC']: + for col in ['MOON_DIST','MOON_PHASE','DELTA_RA_LIST','DELTA_DEC_LIST']: try: - spAll_lite[i][col] = str(np.nanmean(np.array(spAll[i][col].split()).astype(float))) - except: - spAll_lite[i][col] = 'nan' - for col in ['CARTON_TO_TARGET_PK']: - try: - spAll_lite[i][col] = str(int(np.array(spAll[i][col].split())[0])) - except: - spAll_lite[i][col] = '0' - for col in ['ASSIGNED','ON_TARGET','VALID','DECOLLIDED','TOO']: - spAll_lite[col].fill_value = False + spAll_lite[col][i] = str(np.nanmean(np.asarray(spAll[col][i].split()).astype(float))) + except Exception as e: + print(f'{col}: {type(e).__name__}: {e}') + splog.warning(f'{col}: {type(e).__name__}: {e}') + spAll_lite[col][i] = 'nan' + + for col in ['ASSIGNED','ON_TARGET','VALID','DECOLLIDED','TOO','CARTON_TO_TARGET_PK']: + spAll_lite[col].fill_value = -999 try: - spAll_lite[col] = spAll_lite[col].astype(bool) - except: + spAll_lite[col] = spAll_lite[col].astype(int) + except Exception as e: + splog.warning(f'{type(e).__name__}: {e}') column_data = np.array(spAll_lite[col]) column_data = np.array([x.decode('utf-8') if x is not None else None for x in column_data]) - column_data = np.array([x == 'True' if x is not None else None for x in column_data]) + column_data = np.asarray([x == 'True' if x is not None else None for x in column_data]).astype(int) spAll_lite.remove_column(col) spAll_lite[col] = MaskedColumn(column_data, mask=[x is None for x in column_data]) column_data = None del column_data + spAll_lite.rename_column('DELTA_RA_LIST', 'DELTA_RA') + spAll_lite.rename_column('DELTA_DEC_LIST', 'DELTA_DEC') for col in ['CARTON_TO_TARGET_PK']: spAll_lite[col] = spAll_lite[col].astype(int) @@ -1115,10 +1143,10 @@ def write_spAll(spAll, spline, spAll_lite, indir, run2d, datamodel, line_datamod spAll = Table(spAll.data) for col in spAll_lite.colnames: try: - spAll.remve_column(col) - except: + spAll.remove_column(col) + except Exception as e: + splog.warning(f'{col} {type(e).__name__}: {e}') pass - #spAll.remove_columns(list(filter(lambda x: x not in ['delta_ra','delta_dec'], spAll_lite.colnames))) spAll = hstack([spAll, spAll_lite], join_type = 'exact') else: spAll = None