Skip to content

Commit

Permalink
Segment tags (gwastro#4468)
Browse files Browse the repository at this point in the history
* Added support for dq tags

* Small fix

* Addressed some of Ian's comments

* Fixed indent mistake

* Added description of tags argument to docstring

* Small fix to tags

* Don't use tags with veto-definer

* Don't require dq segments to be set for each ifo separately

* Fixed over-indent
  • Loading branch information
maxtrevor authored and bhooshan-gadre committed Mar 1, 2024
1 parent c77f57a commit 638b34c
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 80 deletions.
9 changes: 6 additions & 3 deletions bin/workflows/pycbc_make_offline_search_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ save_fig_with_metadata(time_str, time_file.storage_path, **kwds)
# Get segments and find the data locations
sci_seg_name = 'science'
science_seg_file = wf.get_segments_file(workflow, sci_seg_name, 'segments-science',
rdir['analysis_time/segment_data'])
rdir['analysis_time/segment_data'],
tags=['science'])

ssegs = {}
for ifo in workflow.ifos:
Expand All @@ -145,13 +146,15 @@ datafind_files, analyzable_file, analyzable_segs, analyzable_name = \
final_veto_name = 'vetoes'
final_veto_file = wf.get_segments_file(workflow, final_veto_name,
'segments-vetoes',
rdir['analysis_time/segment_data'])
rdir['analysis_time/segment_data'],
tags=['veto'])

# Get dq segments from veto definer and calculate data quality timeseries
dq_flag_name = 'dq_flag'
dq_segment_file = wf.get_flag_segments_file(workflow, dq_flag_name,
'segments-dq',
rdir['analysis_time/segment_data'])
rdir['analysis_time/segment_data'],
tags=['dq'])

# Template bank stuff
hdfbank = wf.setup_tmpltbank_workflow(workflow, analyzable_segs,
Expand Down
169 changes: 92 additions & 77 deletions pycbc/workflow/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def save_veto_definer(cp, out_dir, tags=None):
return veto_def_new_path


def get_segments_file(workflow, name, option_name, out_dir):
def get_segments_file(workflow, name, option_name, out_dir, tags=None):
"""Get cumulative segments from option name syntax for each ifo.
Use syntax of configparser string to define the resulting segment_file
Expand All @@ -76,6 +76,9 @@ def get_segments_file(workflow, name, option_name, out_dir):
Name of the segment list being created
option_name: str
Name of option in the associated config parser to get the flag list
tags : list of strings
Used to retrieve subsections of the ini file for
configuration options.
returns
--------
Expand All @@ -88,25 +91,29 @@ def get_segments_file(workflow, name, option_name, out_dir):
start = workflow.analysis_time[0]
end = workflow.analysis_time[1]

if tags is None:
tags = []

# Check for veto definer file
veto_definer = None
if cp.has_option("workflow-segments", "segments-veto-definer-url"):
veto_definer = save_veto_definer(workflow.cp, out_dir, [])
veto_definer = save_veto_definer(workflow.cp, out_dir)

# Check for provided server
server = "https://segments.ligo.org"
if cp.has_option("workflow-segments", "segments-database-url"):
server = cp.get("workflow-segments",
"segments-database-url")
if cp.has_option_tags("workflow-segments", "segments-database-url", tags):
server = cp.get_opt_tags("workflow-segments",
"segments-database-url", tags)

if cp.has_option("workflow-segments", "segments-source"):
source = cp.get("workflow-segments", "segments-source")
if cp.has_option_tags("workflow-segments", "segments-source", tags):
source = cp.get_opt_tags("workflow-segments", "segments-source", tags)
else:
source = "any"

if source == "file":
local_file_path = \
resolve_url(cp.get("workflow-segments", option_name+"-file"))
resolve_url(cp.get_opt_tag("workflow-segments",
option_name+"-file", tags))
pfn = os.path.join(out_dir, os.path.basename(local_file_path))
shutil.move(local_file_path, pfn)
return SegFile.from_segment_xml(pfn)
Expand Down Expand Up @@ -380,72 +387,80 @@ def generate_triggered_segment(workflow, out_dir, sciencesegs):
except UnboundLocalError:
return None, min_seg

def get_flag_segments_file(workflow, name, option_name, out_dir):
"""Get segments from option name syntax for each ifo for indivudal flags.
Use syntax of configparser string to define the resulting segment_file
e.x. option_name = +up_flag1,+up_flag2,+up_flag3,-down_flag1,-down_flag2
Each ifo may have a different string and is stored separately in the file.
Each flag is stored separately in the file.
Flags which add time must precede flags which subtract time.
Parameters
----------
workflow: pycbc.workflow.Workflow
name: string
Name of the segment list being created
option_name: str
Name of option in the associated config parser to get the flag list
returns
--------
seg_file: pycbc.workflow.SegFile
SegFile intance that points to the segment xml file on disk.
"""
from pycbc.dq import query_str
make_analysis_dir(out_dir)
cp = workflow.cp
start = workflow.analysis_time[0]
end = workflow.analysis_time[1]

# Check for veto definer file
veto_definer = None
if cp.has_option("workflow-segments", "segments-veto-definer-url"):
veto_definer = save_veto_definer(workflow.cp, out_dir, [])

# Check for provided server
server = "https://segments.ligo.org"
if cp.has_option("workflow-segments", "segments-database-url"):
server = cp.get("workflow-segments", "segments-database-url")

source = "any"
if cp.has_option("workflow-segments", "segments-source"):
source = cp.get("workflow-segments", "segments-source")
if source == "file":
local_file_path = \
resolve_url(cp.get("workflow-segments", option_name+"-file"))
pfn = os.path.join(out_dir, os.path.basename(local_file_path))
shutil.move(local_file_path, pfn)
return SegFile.from_segment_xml(pfn)

segs = {}
for ifo in workflow.ifos:
if cp.has_option_tags("workflow-segments", option_name, [ifo]):
flag_str = cp.get_opt_tags("workflow-segments", option_name, [ifo])
flag_list = flag_str.split(',')
for flag in flag_list:
flag_name = flag[1:]
key = flag_name
if len(key.split(':')) > 2:
key = ':'.join(key.split(':')[:2])
segs[key] = query_str(ifo, flag, start, end,
source=source, server=server,
veto_definer=veto_definer)
logging.info("%s: got %s segments", ifo, flag_name)
else:
logging.info("%s: no segments requested", ifo)

return SegFile.from_segment_list_dict(name, segs,
extension='.xml',
valid_segment=workflow.analysis_time,
directory=out_dir)
def get_flag_segments_file(workflow, name, option_name, out_dir, tags=None):
"""Get segments from option name syntax for each ifo for indivudal flags.
Use syntax of configparser string to define the resulting segment_file
e.x. option_name = +up_flag1,+up_flag2,+up_flag3,-down_flag1,-down_flag2
Each ifo may have a different string and is stored separately in the file.
Each flag is stored separately in the file.
Flags which add time must precede flags which subtract time.
Parameters
----------
workflow: pycbc.workflow.Workflow
name: string
Name of the segment list being created
option_name: str
Name of option in the associated config parser to get the flag list
tags : list of strings
Used to retrieve subsections of the ini file for
configuration options.
returns
--------
seg_file: pycbc.workflow.SegFile
SegFile intance that points to the segment xml file on disk.
"""
from pycbc.dq import query_str
make_analysis_dir(out_dir)
cp = workflow.cp
start = workflow.analysis_time[0]
end = workflow.analysis_time[1]

if tags is None:
tags = []

# Check for veto definer file
veto_definer = None
if cp.has_option("workflow-segments", "segments-veto-definer-url"):
veto_definer = save_veto_definer(workflow.cp, out_dir)

# Check for provided server
server = "https://segments.ligo.org"
if cp.has_option_tags("workflow-segments", "segments-database-url", tags):
server = cp.get_opt_tags("workflow-segments",
"segments-database-url", tags)

source = "any"
if cp.has_option_tags("workflow-segments", "segments-source", tags):
source = cp.get_opt_tags("workflow-segments", "segments-source", tags)
if source == "file":
local_file_path = \
resolve_url(cp.get_opt_tags("workflow-segments",
option_name+"-file", tags))
pfn = os.path.join(out_dir, os.path.basename(local_file_path))
shutil.move(local_file_path, pfn)
return SegFile.from_segment_xml(pfn)

segs = {}
for ifo in workflow.ifos:
if cp.has_option_tags("workflow-segments", option_name, [ifo]):
flag_str = cp.get_opt_tags("workflow-segments", option_name, [ifo])
flag_list = flag_str.split(',')
for flag in flag_list:
flag_name = flag[1:]
if len(flag_name.split(':')) > 1:
flag_name = name.split(':')[1]
key = ifo + ':' + flag_name
segs[key] = query_str(ifo, flag, start, end,
source=source, server=server,
veto_definer=veto_definer)
logging.info("%s: got %s segments", ifo, flag_name)
else:
logging.info("%s: no segments requested", ifo)

return SegFile.from_segment_list_dict(name, segs,
extension='.xml',
valid_segment=workflow.analysis_time,
directory=out_dir)

0 comments on commit 638b34c

Please sign in to comment.