From 34d6a21069f58d8e91d6f0102b67b8a12474b579 Mon Sep 17 00:00:00 2001 From: Ian Harry Date: Thu, 1 Jun 2023 09:06:06 +0100 Subject: [PATCH] Migrate to new gwdatafind API (#4383) * Migrate to new gwdatafind API * CC and one regression fix * One more style change * Don't need these options now * Revert "Don't need these options now" This reverts commit 6f2ed1bddf830079d18c57d9b724985075345f5b. --- pycbc/frame/__init__.py | 2 +- pycbc/frame/frame.py | 25 ++-------- pycbc/workflow/datafind.py | 93 +++++++++++++++++--------------------- requirements.txt | 2 +- 4 files changed, 46 insertions(+), 76 deletions(-) diff --git a/pycbc/frame/__init__.py b/pycbc/frame/__init__.py index dbf840561be..0ef88b9c568 100644 --- a/pycbc/frame/__init__.py +++ b/pycbc/frame/__init__.py @@ -1,4 +1,4 @@ -from . frame import (locations_to_cache, read_frame, datafind_connection, +from . frame import (locations_to_cache, read_frame, query_and_read_frame, frame_paths, write_frame, DataBuffer, StatusBuffer, iDQBuffer) diff --git a/pycbc/frame/frame.py b/pycbc/frame/frame.py index 0c714e2bb0b..7837c8c89ee 100644 --- a/pycbc/frame/frame.py +++ b/pycbc/frame/frame.py @@ -22,7 +22,7 @@ import numpy import math import os.path, glob, time -import gwdatafind +from gwdatafind import find_urls as find_frame_urls import pycbc from urllib.parse import urlparse from pycbc.types import TimeSeries, zeros @@ -256,22 +256,6 @@ def read_frame(location, channels, start_time=None, else: return _read_channel(channels, stream, start_time, duration) -def datafind_connection(server=None): - """ Return a connection to the datafind server - - Parameters - ----------- - server : {SERVER:PORT, string}, optional - A string representation of the server and port. - The port may be ommitted. - - Returns - -------- - connection - The open connection to the datafind server. - """ - return gwdatafind.connect(host=server) - def frame_paths(frame_type, start_time, end_time, server=None, url_type='file'): """Return the paths to a span of frame files @@ -300,10 +284,8 @@ def frame_paths(frame_type, start_time, end_time, server=None, url_type='file'): >>> paths = frame_paths('H1_LDAS_C02_L2', 968995968, 968995968+2048) """ site = frame_type[0] - connection = datafind_connection(server) - connection.find_times(site, frame_type, - gpsstart=start_time, gpsend=end_time) - cache = connection.find_frame_urls(site, frame_type, start_time, end_time,urltype=url_type) + cache = find_frame_urls(site, frame_type, start_time, end_time, + urltype=url_type, host=server) return [urlparse(entry).path for entry in cache] def query_and_read_frame(frame_type, channels, start_time, end_time, @@ -365,7 +347,6 @@ def query_and_read_frame(frame_type, channels, start_time, end_time, check_integrity=check_integrity) __all__ = ['read_frame', 'frame_paths', - 'datafind_connection', 'query_and_read_frame'] def write_frame(location, channels, timeseries): diff --git a/pycbc/workflow/datafind.py b/pycbc/workflow/datafind.py index 9ccf09289b9..7eef3b3dc8c 100644 --- a/pycbc/workflow/datafind.py +++ b/pycbc/workflow/datafind.py @@ -34,8 +34,8 @@ from ligo import segments from ligo.lw import utils, table from glue import lal +from gwdatafind import find_urls as find_frame_urls from pycbc.workflow.core import SegFile, File, FileList, make_analysis_dir -from pycbc.frame import datafind_connection from pycbc.io.ligolw import LIGOLWContentHandler @@ -396,11 +396,6 @@ def setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs, if tags is None: tags = [] - # First job is to do setup for the datafind jobs - # First get the server name - logging.info("Setting up connection to datafind server.") - connection = setup_datafind_server_connection(cp, tags=tags) - # Now ready to loop over the input segments datafindouts = [] datafindcaches = [] @@ -419,14 +414,27 @@ def setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs, # Sometimes the connection can drop, so try a backup here try: - cache, cache_file = run_datafind_instance(cp, outputDir, - connection, observatory, frameType, - startTime, endTime, ifo, tags=tags) + cache, cache_file = run_datafind_instance( + cp, + outputDir, + observatory, + frameType, + startTime, + endTime, + ifo, + tags=tags + ) except: - connection = setup_datafind_server_connection(cp, tags=tags) - cache, cache_file = run_datafind_instance(cp, outputDir, - connection, observatory, frameType, - startTime, endTime, ifo, tags=tags) + cache, cache_file = run_datafind_instance( + cp, + outputDir, + observatory, + frameType, + startTime, + endTime, + ifo, + tags=tags + ) datafindouts.append(cache_file) datafindcaches.append(cache) return datafindcaches, datafindouts @@ -476,11 +484,6 @@ def setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir, if tags is None: tags = [] - # First job is to do setup for the datafind jobs - # First get the server name - logging.info("Setting up connection to datafind server.") - connection = setup_datafind_server_connection(cp, tags=tags) - # We want to ignore gaps as the detectors go up and down and calling this # way will give gaps. See the setup_datafind_runtime_generated function # for datafind calls that only query for data that will exist @@ -530,7 +533,6 @@ def setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir, cache, cache_file = run_datafind_instance( cp, outputDir, - connection, observatory, ftype, start, @@ -539,11 +541,9 @@ def setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir, tags=tags ) except: - connection = setup_datafind_server_connection(cp, tags=tags) cache, cache_file = run_datafind_instance( cp, outputDir, - connection, observatory, ftype, start, @@ -846,31 +846,6 @@ def get_missing_segs_from_frame_file_cache(datafindcaches): missingFrames[ifo].extend(currMissingFrames) return missingFrameSegs, missingFrames -def setup_datafind_server_connection(cp, tags=None): - """ - This function is resposible for setting up the connection with the datafind - server. - - Parameters - ----------- - cp : pycbc.workflow.configuration.WorkflowConfigParser - The memory representation of the ConfigParser - Returns - -------- - connection - The open connection to the datafind server. - """ - if tags is None: - tags = [] - - if cp.has_option_tags("workflow-datafind", - "datafind-ligo-datafind-server", tags): - datafind_server = cp.get_opt_tags("workflow-datafind", - "datafind-ligo-datafind-server", tags) - else: - datafind_server = None - - return datafind_connection(datafind_server) def get_segment_summary_times(scienceFile, segmentName): """ @@ -928,7 +903,7 @@ def get_segment_summary_times(scienceFile, segmentName): return summSegList -def run_datafind_instance(cp, outputDir, connection, observatory, frameType, +def run_datafind_instance(cp, outputDir, observatory, frameType, startTime, endTime, ifo, tags=None): """ This function will query the datafind server once to find frames between @@ -941,9 +916,6 @@ def run_datafind_instance(cp, outputDir, connection, observatory, frameType, outputDir : Output cache files will be written here. We also write the commands for reproducing what is done in this function to this directory. - connection : datafind connection object - Initialized through the `gwdatafind` module, this is the open - connection to the datafind server. observatory : string The observatory to query frames for. Ex. 'H', 'L' or 'V'. NB: not 'H1', 'L1', 'V1' which denote interferometers. @@ -977,6 +949,17 @@ def run_datafind_instance(cp, outputDir, connection, observatory, frameType, if tags is None: tags = [] + # Determine if we should override the default datafind server + if cp.has_option_tags("workflow-datafind", + "datafind-ligo-datafind-server", tags): + datafind_server = cp.get_opt_tags( + "workflow-datafind", + "datafind-ligo-datafind-server", + tags + ) + else: + datafind_server = None + seg = segments.segment([startTime, endTime]) # Take the datafind kwargs from config (usually urltype=file is # given). @@ -997,8 +980,14 @@ def run_datafind_instance(cp, outputDir, connection, observatory, frameType, os.path.join(outputDir,'logs'), **dfKwargs) logging.debug("Asking datafind server for frames.") dfCache = lal.Cache.from_urls( - connection.find_frame_urls(observatory, frameType, - startTime, endTime, **dfKwargs), + find_frame_urls( + observatory, + frameType, + startTime, + endTime, + host=datafind_server, + **dfKwargs + ), ) logging.debug("Frames returned") # workflow format output file diff --git a/requirements.txt b/requirements.txt index 137b74253db..1c5e4dad640 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ lalsuite!=7.2 lscsoft-glue>=1.59.3 ligo-segments tqdm -gwdatafind +gwdatafind>=1.1.3 # Requirements for full pegasus env pegasus-wms.api >= 5.0.3