Skip to content

Commit

Permalink
Small fix to baseline distance sort.
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhosk committed Oct 5, 2023
1 parent 08163d1 commit f85bc8a
Showing 1 changed file with 67 additions and 63 deletions.
130 changes: 67 additions & 63 deletions src/astrohack/extract_holog.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,14 @@ def extract_holog(
extract_holog_params["ddi"] = ddi
extract_holog_params["chan_setup"] = {}
extract_holog_params["pol_setup"] = {}
extract_holog_params["chan_setup"]["chan_freq"] = spw_ctb.getcol("CHAN_FREQ", startrow=spw_setup_id, nrow=1)[0, :]
extract_holog_params["chan_setup"]["chan_width"] = spw_ctb.getcol("CHAN_WIDTH", startrow=spw_setup_id, nrow=1)[0, :]
extract_holog_params["chan_setup"]["eff_bw"] = spw_ctb.getcol("EFFECTIVE_BW", startrow=spw_setup_id, nrow=1)[0, :]
extract_holog_params["chan_setup"]["ref_freq"] = spw_ctb.getcol("REF_FREQUENCY", startrow=spw_setup_id, nrow=1)[0]
extract_holog_params["chan_setup"]["chan_freq"] = spw_ctb.getcol("CHAN_FREQ", startrow=spw_setup_id, nrow=1)[0,
:]
extract_holog_params["chan_setup"]["chan_width"] = spw_ctb.getcol("CHAN_WIDTH", startrow=spw_setup_id, nrow=1)[
0, :]
extract_holog_params["chan_setup"]["eff_bw"] = spw_ctb.getcol("EFFECTIVE_BW", startrow=spw_setup_id, nrow=1)[0,
:]
extract_holog_params["chan_setup"]["ref_freq"] = spw_ctb.getcol("REF_FREQUENCY", startrow=spw_setup_id, nrow=1)[
0]
extract_holog_params["chan_setup"]["total_bw"] = \
spw_ctb.getcol("TOTAL_BANDWIDTH", startrow=spw_setup_id, nrow=1)[0]
extract_holog_params["pol_setup"]["pol"] = pol_str[
Expand Down Expand Up @@ -519,9 +523,6 @@ def generate_holog_obs_dict(
:param ms_name: Name of input measurement file name.
:type ms_name: str
:param ddi: DDI(s) that should be extracted from the measurement set. Defaults to all DDI's in the ms.
:type ddi: int numpy.ndarray | int list, optional
:param baseline_average_distance: To increase the signal to noise for a mapping antenna mutiple reference
antennas can be used. The baseline_average_distance is the acceptable distance between a mapping antenna and a
Expand Down Expand Up @@ -670,7 +671,7 @@ def generate_holog_obs_dict(
ctb.close()

# Create holog_obs_dict or modify user supplied holog_obs_dict.
#ddi = extract_holog_params['ddi']
# ddi = extract_holog_params['ddi']

pnt_mds = AstrohackPointFile(extract_holog_params['point_name'])
pnt_mds._open()
Expand All @@ -686,7 +687,7 @@ def generate_holog_obs_dict(
)

# From the generated holog_obs_dict subselect user supplied ddis.
#if ddi != 'all':
# if ddi != 'all':
# holog_obs_dict_keys = list(holog_obs_dict.keys())
# for ddi_key in holog_obs_dict_keys:
# if 'ddi' in ddi_key:
Expand All @@ -708,146 +709,149 @@ class HologObsDict(dict):
|
o--> map: [reference, ...]
"""

def __init__(self, obj):
super().__init__(obj)
self.logger = _get_astrohack_logger()
self.logger = _get_astrohack_logger()

def __getitem__(self, key):
return super().__getitem__(key)

def __setitem__(self, key, value):
return super().__setitem__(key, value)

def print(self, style="static"):
if style == "dynamic":
return astrohack.dio.inspect_holog_obs_dict(self, style="dynamic")

else:
return astrohack.dio.inspect_holog_obs_dict(self, style="static")

def select(self, key, value, inplace=False, **kwargs):

if inplace == True:
if inplace:
obs_dict = self

else:
obs_dict = HologObsDict(self)

if key == "ddi":
return self._select_ddi(value, obs_dict=obs_dict)

elif key == "map":
return self._select_ddi(value, obs_dict=obs_dict)

elif key == "antenna":
return self._select_antenna(value, obs_dict=obs_dict)

elif key == "scan":
return self._select_scan(value, obs_dict=obs_dict)

elif key == "baseline":
if kwargs["reference"]:
return self._select_baseline(value, reference=kwargs["reference"], obs_dict=obs_dict)

else:
self.logger.error("Must specify a list of reference antennae for this option.")
else:
self.logger.error("Valid key not found: {key}".format(key=key))
return {}

@staticmethod
def get_nearest_baselines(antenna, n_baselines, path_to_matrix=None):
logger = _get_astrohack_logger()
import pandas as pd

if path_to_matrix == None:
path_to_matrix = os.getcwd()+"/.baseline_distance_matrix.csv"
if os.path.exists(path_to_matrix) == False:
if path_to_matrix is None:
path_to_matrix = os.getcwd() + "/.baseline_distance_matrix.csv"

if not os.path.exists(path_to_matrix):
self.logger.error("Unable to find baseline distance matrix in: {path}".format(path=path_to_matrix))

df_matrix = pd.read_csv(path_to_matrix, sep="\t", index_col=0)
return df_matrix[antenna].sort_values(ascending=True).index[:n_baselines].values.tolist()

df_matrix = pd.read_csv(path_to_matrix, sep="\t", index_col=0)

# Skip the first index because it is a self distance
return df_matrix[antenna].sort_values(ascending=True).index[1:n_baselines].values.tolist()

def _select_ddi(self, value, obs_dict):
convert = lambda x: "ddi_" + str(x)

if isinstance(value, list) == False:
if not isinstance(value, list):
value = [value]

value = list(map(convert, value))
ddi_list = list(obs_dict.keys())

for ddi in ddi_list:
if ddi not in value:
obs_dict.pop(ddi)

return obs_dict

def _select_map(self, value, obs_dict):
convert = lambda x: "map_" + str(x)

if isinstance(value, list) == False:
if not isinstance(value, list):
value = [value]

value = list(map(convert, value))
ddi_list = list(obs_dict.keys())

for ddi in ddi_list:
map_list = list(obs_dict[ddi].keys())
for mp in map_list:
for mp in map_list:
if mp not in value:
obs_dict[ddi].pop(mp)

return obs_dict

def _select_antenna(self, value, obs_dict):
if isinstance(value, list) == False:
if not isinstance(value, list):
value = [value]

ddi_list = list(obs_dict.keys())

for ddi in ddi_list:
map_list = list(obs_dict[ddi].keys())
for mp in map_list:
for mp in map_list:
ant_list = list(obs_dict[ddi][mp]["ant"].keys())
for ant in ant_list:
if ant not in value:
obs_dict[ddi][mp]["ant"].pop(ant)

return obs_dict

def _select_scan(self, value, obs_dict):
if isinstance(value, list) == False:
if not isinstance(value, list):
value = [value]

ddi_list = list(obs_dict.keys())

for ddi in ddi_list:
map_list = list(obs_dict[ddi].keys())
for mp in map_list:
for mp in map_list:
obs_dict[ddi][mp]["scans"] = value

return obs_dict

def _select_baseline(self, value, reference, obs_dict):
if isinstance(value, list) == False:
if not isinstance(value, list):
value = [value]
if isinstance(reference, list) == False:

if not isinstance(reference, list):
value = [reference]

ddi_list = list(obs_dict.keys())

for ddi in ddi_list:
map_list = list(obs_dict[ddi].keys())
for mp in map_list:
for mp in map_list:
ant_list = list(obs_dict[ddi][mp]["ant"].keys())
for ant in ant_list:
if ant not in value:
obs_dict[ddi][mp]["ant"].pop(ant)
continue

obs_dict[ddi][mp]["ant"][ant] = reference
return obs_dict

return obs_dict

0 comments on commit f85bc8a

Please sign in to comment.