Skip to content

Commit

Permalink
Migrate to new gwdatafind API (gwastro#4383)
Browse files Browse the repository at this point in the history
* Migrate to new gwdatafind API

* CC and one regression fix

* One more style change

* Don't need these options now

* Revert "Don't need these options now"

This reverts commit 6f2ed1b.
  • Loading branch information
spxiwh authored and PRAVEEN-mnl committed Jun 19, 2023
1 parent b2b0bb1 commit 34d6a21
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pycbc/frame/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from . frame import (locations_to_cache, read_frame, datafind_connection,
from . frame import (locations_to_cache, read_frame,
query_and_read_frame, frame_paths, write_frame,
DataBuffer, StatusBuffer, iDQBuffer)

Expand Down
25 changes: 3 additions & 22 deletions pycbc/frame/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import numpy
import math
import os.path, glob, time
import gwdatafind
from gwdatafind import find_urls as find_frame_urls
import pycbc
from urllib.parse import urlparse
from pycbc.types import TimeSeries, zeros
Expand Down Expand Up @@ -256,22 +256,6 @@ def read_frame(location, channels, start_time=None,
else:
return _read_channel(channels, stream, start_time, duration)

def datafind_connection(server=None):
""" Return a connection to the datafind server
Parameters
-----------
server : {SERVER:PORT, string}, optional
A string representation of the server and port.
The port may be ommitted.
Returns
--------
connection
The open connection to the datafind server.
"""
return gwdatafind.connect(host=server)

def frame_paths(frame_type, start_time, end_time, server=None, url_type='file'):
"""Return the paths to a span of frame files
Expand Down Expand Up @@ -300,10 +284,8 @@ def frame_paths(frame_type, start_time, end_time, server=None, url_type='file'):
>>> paths = frame_paths('H1_LDAS_C02_L2', 968995968, 968995968+2048)
"""
site = frame_type[0]
connection = datafind_connection(server)
connection.find_times(site, frame_type,
gpsstart=start_time, gpsend=end_time)
cache = connection.find_frame_urls(site, frame_type, start_time, end_time,urltype=url_type)
cache = find_frame_urls(site, frame_type, start_time, end_time,
urltype=url_type, host=server)
return [urlparse(entry).path for entry in cache]

def query_and_read_frame(frame_type, channels, start_time, end_time,
Expand Down Expand Up @@ -365,7 +347,6 @@ def query_and_read_frame(frame_type, channels, start_time, end_time,
check_integrity=check_integrity)

__all__ = ['read_frame', 'frame_paths',
'datafind_connection',
'query_and_read_frame']

def write_frame(location, channels, timeseries):
Expand Down
93 changes: 41 additions & 52 deletions pycbc/workflow/datafind.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
from ligo import segments
from ligo.lw import utils, table
from glue import lal
from gwdatafind import find_urls as find_frame_urls
from pycbc.workflow.core import SegFile, File, FileList, make_analysis_dir
from pycbc.frame import datafind_connection
from pycbc.io.ligolw import LIGOLWContentHandler


Expand Down Expand Up @@ -396,11 +396,6 @@ def setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs,
if tags is None:
tags = []

# First job is to do setup for the datafind jobs
# First get the server name
logging.info("Setting up connection to datafind server.")
connection = setup_datafind_server_connection(cp, tags=tags)

# Now ready to loop over the input segments
datafindouts = []
datafindcaches = []
Expand All @@ -419,14 +414,27 @@ def setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs,

# Sometimes the connection can drop, so try a backup here
try:
cache, cache_file = run_datafind_instance(cp, outputDir,
connection, observatory, frameType,
startTime, endTime, ifo, tags=tags)
cache, cache_file = run_datafind_instance(
cp,
outputDir,
observatory,
frameType,
startTime,
endTime,
ifo,
tags=tags
)
except:
connection = setup_datafind_server_connection(cp, tags=tags)
cache, cache_file = run_datafind_instance(cp, outputDir,
connection, observatory, frameType,
startTime, endTime, ifo, tags=tags)
cache, cache_file = run_datafind_instance(
cp,
outputDir,
observatory,
frameType,
startTime,
endTime,
ifo,
tags=tags
)
datafindouts.append(cache_file)
datafindcaches.append(cache)
return datafindcaches, datafindouts
Expand Down Expand Up @@ -476,11 +484,6 @@ def setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir,
if tags is None:
tags = []

# First job is to do setup for the datafind jobs
# First get the server name
logging.info("Setting up connection to datafind server.")
connection = setup_datafind_server_connection(cp, tags=tags)

# We want to ignore gaps as the detectors go up and down and calling this
# way will give gaps. See the setup_datafind_runtime_generated function
# for datafind calls that only query for data that will exist
Expand Down Expand Up @@ -530,7 +533,6 @@ def setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir,
cache, cache_file = run_datafind_instance(
cp,
outputDir,
connection,
observatory,
ftype,
start,
Expand All @@ -539,11 +541,9 @@ def setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir,
tags=tags
)
except:
connection = setup_datafind_server_connection(cp, tags=tags)
cache, cache_file = run_datafind_instance(
cp,
outputDir,
connection,
observatory,
ftype,
start,
Expand Down Expand Up @@ -846,31 +846,6 @@ def get_missing_segs_from_frame_file_cache(datafindcaches):
missingFrames[ifo].extend(currMissingFrames)
return missingFrameSegs, missingFrames

def setup_datafind_server_connection(cp, tags=None):
"""
This function is resposible for setting up the connection with the datafind
server.
Parameters
-----------
cp : pycbc.workflow.configuration.WorkflowConfigParser
The memory representation of the ConfigParser
Returns
--------
connection
The open connection to the datafind server.
"""
if tags is None:
tags = []

if cp.has_option_tags("workflow-datafind",
"datafind-ligo-datafind-server", tags):
datafind_server = cp.get_opt_tags("workflow-datafind",
"datafind-ligo-datafind-server", tags)
else:
datafind_server = None

return datafind_connection(datafind_server)

def get_segment_summary_times(scienceFile, segmentName):
"""
Expand Down Expand Up @@ -928,7 +903,7 @@ def get_segment_summary_times(scienceFile, segmentName):

return summSegList

def run_datafind_instance(cp, outputDir, connection, observatory, frameType,
def run_datafind_instance(cp, outputDir, observatory, frameType,
startTime, endTime, ifo, tags=None):
"""
This function will query the datafind server once to find frames between
Expand All @@ -941,9 +916,6 @@ def run_datafind_instance(cp, outputDir, connection, observatory, frameType,
outputDir : Output cache files will be written here. We also write the
commands for reproducing what is done in this function to this
directory.
connection : datafind connection object
Initialized through the `gwdatafind` module, this is the open
connection to the datafind server.
observatory : string
The observatory to query frames for. Ex. 'H', 'L' or 'V'. NB: not
'H1', 'L1', 'V1' which denote interferometers.
Expand Down Expand Up @@ -977,6 +949,17 @@ def run_datafind_instance(cp, outputDir, connection, observatory, frameType,
if tags is None:
tags = []

# Determine if we should override the default datafind server
if cp.has_option_tags("workflow-datafind",
"datafind-ligo-datafind-server", tags):
datafind_server = cp.get_opt_tags(
"workflow-datafind",
"datafind-ligo-datafind-server",
tags
)
else:
datafind_server = None

seg = segments.segment([startTime, endTime])
# Take the datafind kwargs from config (usually urltype=file is
# given).
Expand All @@ -997,8 +980,14 @@ def run_datafind_instance(cp, outputDir, connection, observatory, frameType,
os.path.join(outputDir,'logs'), **dfKwargs)
logging.debug("Asking datafind server for frames.")
dfCache = lal.Cache.from_urls(
connection.find_frame_urls(observatory, frameType,
startTime, endTime, **dfKwargs),
find_frame_urls(
observatory,
frameType,
startTime,
endTime,
host=datafind_server,
**dfKwargs
),
)
logging.debug("Frames returned")
# workflow format output file
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ lalsuite!=7.2
lscsoft-glue>=1.59.3
ligo-segments
tqdm
gwdatafind
gwdatafind>=1.1.3

# Requirements for full pegasus env
pegasus-wms.api >= 5.0.3
Expand Down

0 comments on commit 34d6a21

Please sign in to comment.