Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

debug missing ms directory but json present when ignore_missing=True,… #1381

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
37 changes: 35 additions & 2 deletions caracal/workers/obsconf_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,26 @@ def worker(pipeline, recipe, config):
step = None

for i, (msname, msroot, prefix) in enumerate(zip(pipeline.msnames, pipeline.msbasenames, pipeline.prefix_msbases)):

#look ms file is where it should be
in_dir = pipeline.rawdatadir
if in_dir[-1] != "/":
in_dir += "/"
if not os.path.exists(in_dir+msname):
if pipeline.ignore_missing:
log.warning(f"'{msname}' did not match any files, but getdata: ignore_missing is set, proceeding anyway")
continue
else:
raise caracal.ConfigurationError(f"'{msname}' did not match any files under {pipeline.rawdatadir}. Check your "
"'general: msdir/rawdatadir' and/or 'getdata: dataid/extension' settings, or "
"set 'getdata: ignore_missing: true'")


# filenames generated
obsinfo = f'{msroot}-obsinfo.txt'
summary = f'{msroot}-summary.json'
elevplot = f'{msroot}-elevation-tracks.png'

if pipeline.enable_task(config, 'obsinfo'):
if config['obsinfo']['listobs']:
if os.path.exists(os.path.join(pipeline.msdir, obsinfo)):
Expand Down Expand Up @@ -105,6 +120,24 @@ def worker(pipeline, recipe, config):
recipe.run()
recipe.jobs = []

#check if json files are there
missingSummary=[]
counter=0
for i, (msname, msroot, prefix) in enumerate(zip(pipeline.msnames, pipeline.msbasenames, pipeline.prefix_msbases)):
# filenames generated
summary = f'/{msroot}-summary.json'
if not os.path.exists(pipeline.msdir+summary):
missingSummary.append(counter)
counter+=1

#update file list
if missingSummary:
for kk in range (0, len(missingSummary)):
del pipeline.msnames[missingSummary[kk]]
del pipeline.msbasenames[missingSummary[kk]]
del pipeline.prefix_msbases[missingSummary[kk]]
pipeline.nobs=len(pipeline.msnames)

# initialse things
for item in 'xcal fcal bpcal gcal target refant minbase maxdist'.split():
val = config[item]
Expand All @@ -122,7 +155,6 @@ def worker(pipeline, recipe, config):
# Set antenna properties
#pipeline.Tsys_eta = config['Tsys_eta']
#pipeline.dish_diameter = config['dish_diameter']

for i, (msname, msroot, prefix) in enumerate(zip(pipeline.msnames, pipeline.msbasenames, pipeline.prefix_msbases)):
caracal.log.info(f"MS #{i}: {msname}")

Expand Down Expand Up @@ -182,6 +214,7 @@ def worker(pipeline, recipe, config):
pipeline.chanwidth[i] = chanwidth
caracal.log.info(' CHAN_FREQ from {0:s} Hz to {1:s} Hz with average channel width of {2:s} Hz'.format(
','.join(map(str, firstchanfreq)), ','.join(map(str, lastchanfreq)), ','.join(map(str, chanwidth))))

if i == pipeline.nobs-1 and np.max(pipeline.chanwidth) > 0 and np.min(pipeline.chanwidth) < 0:
caracal.log.err('Some datasets have a positive channel increment, some negative. This will lead to errors. Exiting')
raise caracal.BadDataError("MSs with mixed channel ordering not supported")
Expand Down
24 changes: 24 additions & 0 deletions caracal/workers/transform_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,36 @@ def worker(pipeline, recipe, config):
transform_mode = 'split'

for i, (msbase, prefix_msbase) in enumerate(zip(pipeline.msbasenames, pipeline.prefix_msbases)):



# if splitting from target, we have multiple MSs to iterate over
if transform_mode == 'split':
from_mslist = pipeline.get_mslist(i, label_in, target=from_target)
elif transform_mode == 'concat':
from_mslist = pipeline.get_mslist(i, '', target=from_target)
to_mslist = pipeline.get_mslist(i, label_out, target=not splitting_cals)
print(from_mslist)

#look if input files are actually where they should be
in_dir = pipeline.msdir if label_in else pipeline.rawdatadir
if in_dir[-1] != "/":
in_dir += "/"
print(in_dir)
#sys.exit(0)

if len(from_mslist) ==1 and os.path.exists(in_dir+'/'+from_mslist[0]) == False:
raise caracal.ConfigurationError(f"'{from_mslist} did not match any files, but these MS files are required for this worker to continue. Check your 'general: msdir/rawdatadir' and/or 'getdata: dataid/extension' settings.'")
elif len(from_mslist) >1:
ms_list_tmp=[]
for jj in range (0,len(from_mslist)):
if not os.path.exists(in_dir+from_mslist[jj]):
log.warning(f"'{from_mslist[jj]}' did not match any files, but getdata: ignore_missing is set, and multiple dataIDs are given. CARACal assumes you know what you are doing, this dataset is skipped, proceeding anyway")

else:
ms_list_tmp.append(from_mslist[jj])
from_mslist = ms_list_tmp.copy()
print(from_mslist)

# if splitting cals, we'll split one (combined) target to one output MS
if splitting_cals:
Expand Down
9 changes: 8 additions & 1 deletion caracal/workers/worker_administrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ def init_names(self, dataids):

for pattern in patterns:
msnames = [os.path.basename(ms) for ms in glob.glob(os.path.join(self.rawdatadir, pattern))]

if not msnames:
if self.ignore_missing:
log.warning(f"'{pattern}' did not match any files, but getdata: ignore_missing is set, proceeding anyway")
msnames=[pattern]
else:
raise caracal.ConfigurationError(f"'{pattern}' did not match any files under {self.rawdatadir}. Check your "
"'general: msdir/rawdatadir' and/or 'getdata: dataid/extension' settings, or "
Expand All @@ -196,7 +198,12 @@ def get_msinfo(self, msname):
msinfo_path = os.path.join(self.msdir, msinfo_file)
msdict, mtime_cache = self._msinfo_cache.get(msname, (None, 0))
if not os.path.exists(msinfo_path):
raise RuntimeError(f"MS summary file {msinfo_file} not found at expected location. This is a bug or "
if self.ignore_missing:
log.warning(f"'{msinfo_file}' did not match any files, but getdata: ignore_missing is set,"
"this dataset will be skipped,proceeding anyway")
return None
else:
raise RuntimeError(f"MS summary file {msinfo_file} not found at expected location. This is a bug or "
"a misconfiguration. Was the MS transformed properly?")
# reload cached dict if file on disk is newer
mtime = os.path.getmtime(msinfo_path)
Expand Down