diff --git a/pycbc/workflow/configuration.py b/pycbc/workflow/configuration.py index 35c4038ce4a..0943e286fe3 100644 --- a/pycbc/workflow/configuration.py +++ b/pycbc/workflow/configuration.py @@ -30,11 +30,19 @@ import os import stat import shutil +import subprocess from shutil import which +import urllib.parse from urllib.parse import urlparse from pycbc.types.config import InterpolatingConfigParser +# NOTE urllib is weird. For some reason it only allows known schemes and will +# give *wrong* results, rather then failing, if you use something like gsiftp +# We can add schemes explicitly, as below, but be careful with this! +urllib.parse.uses_relative.append('osdf') +urllib.parse.uses_netloc.append('osdf') + def resolve_url(url, directory=None, permissions=None, copy_to_cwd=True): """Resolves a URL to a local file, and returns the path to that file. @@ -75,8 +83,9 @@ def resolve_url(url, directory=None, permissions=None, copy_to_cwd=True): shutil.copy(u.path, filename) elif u.scheme == "http" or u.scheme == "https": - # FIXME: Move to top and make optional once 4001 functionality is - # merged + # Would like to move ciecplib import to top using import_optional, but + # it needs to be available when documentation runs in the CI, and I + # can't get it to install in the GitHub CI import ciecplib with ciecplib.Session() as s: if u.netloc in ("git.ligo.org", "code.pycbc.phy.syr.edu"): @@ -89,6 +98,26 @@ def resolve_url(url, directory=None, permissions=None, copy_to_cwd=True): output_fp.write(r.content) output_fp.close() + elif u.scheme == "osdf": + # OSDF will require a scitoken to be present and stashcp to be + # available. Thanks Dunky for the code here! + cmd = [ + which("stashcp") or "stashcp", + u.path, + filename, + ] + + try: + subprocess.run(cmd, check=True, capture_output=True) + except subprocess.CalledProcessError as err: + # Print information about the failure + print(err.cmd, "failed with") + print(err.stderr.decode()) + print(err.stdout.decode()) + raise + + return filename + else: # TODO: We could support other schemes as needed errmsg = "Unknown URL scheme: %s\n" % (u.scheme) diff --git a/pycbc/workflow/core.py b/pycbc/workflow/core.py index 692b04c6e60..3b03ae5d400 100644 --- a/pycbc/workflow/core.py +++ b/pycbc/workflow/core.py @@ -2078,7 +2078,8 @@ def resolve_url_to_file(curr_pfn, attrs=None): """ cvmfsstr1 = 'file:///cvmfs/' cvmfsstr2 = 'file://localhost/cvmfs/' - cvmfsstrs = (cvmfsstr1, cvmfsstr2) + osdfstr1 = 'osdf:///' # Technically this isn't CVMFS, but same handling! + cvmfsstrs = (cvmfsstr1, cvmfsstr2, osdfstr1) # Get LFN urlp = urllib.parse.urlparse(curr_pfn) diff --git a/pycbc/workflow/datafind.py b/pycbc/workflow/datafind.py index 39b8b6d440a..9dc898effcd 100644 --- a/pycbc/workflow/datafind.py +++ b/pycbc/workflow/datafind.py @@ -31,6 +31,7 @@ import os, copy import logging +import urllib.parse from ligo import segments from ligo.lw import utils, table from glue import lal @@ -38,6 +39,13 @@ from pycbc.workflow.core import SegFile, File, FileList, make_analysis_dir from pycbc.io.ligolw import LIGOLWContentHandler +# NOTE urllib is weird. For some reason it only allows known schemes and will +# give *wrong* results, rather then failing, if you use something like gsiftp +# We can add schemes explicitly, as below, but be careful with this! +# (urllib is used indirectly through lal.Cache objects) +urllib.parse.uses_relative.append('osdf') +urllib.parse.uses_netloc.append('osdf') + def setup_datafind_workflow(workflow, scienceSegs, outputDir, seg_file=None, tags=None): @@ -729,7 +737,10 @@ def convert_cachelist_to_filelist(datafindcache_list): curr_ifo = cache.ifo for frame in cache: # Pegasus doesn't like "localhost" in URLs. - frame.url = frame.url.replace('file://localhost','file://') + frame.url = frame.url.replace('file://localhost', 'file://') + # Not sure why it happens in OSDF URLs!! + # May need to remove use of Cache objects + frame.url = frame.url.replace('osdf://localhost/', 'osdf:///') # Create one File() object for each unique frame file that we # get back in the cache. @@ -744,17 +755,20 @@ def convert_cachelist_to_filelist(datafindcache_list): prev_file = currFile # Populate the PFNs for the File() we just created - if frame.url.startswith('file://'): - if frame.url.startswith('file:///cvmfs/'): - # Frame is on CVMFS, so let all sites read it directly. - currFile.add_pfn(frame.url, site='all') - else: - # Frame not on CVMFS, so may need transferring. - # Be careful here! If all your frames files are on site - # = local and you try to run on OSG, it will likely - # overwhelm the condor file transfer process! - currFile.add_pfn(frame.url, site='local') + cvmfs_urls = ('file:///cvmfs/', 'osdf://') + if frame.url.startswith(cvmfs_urls): + # Frame is on CVMFS/OSDF, so let all sites read it directly. + currFile.add_pfn(frame.url, site='all') + elif frame.url.startswith('file://'): + # Frame not on CVMFS, so may need transferring. + # Be careful here! If all your frames files are on site + # = local and you try to run on OSG, it will likely + # overwhelm the condor file transfer process! + currFile.add_pfn(frame.url, site='local') else: + # Frame is at some unknown URL. Pegasus will decide how to deal + # with this, but will likely transfer to local site first, and + # from there transfer to remote sites as needed. currFile.add_pfn(frame.url, site='notlocal') return datafind_filelist diff --git a/pycbc/workflow/pegasus_files/pegasus-properties.conf b/pycbc/workflow/pegasus_files/pegasus-properties.conf index 06fd37bc9fe..736e48691af 100644 --- a/pycbc/workflow/pegasus_files/pegasus-properties.conf +++ b/pycbc/workflow/pegasus_files/pegasus-properties.conf @@ -1,7 +1,7 @@ ############################################################################### # # Reduce number of retries and patience with failing jobs. Turn to production for production runs. -pegasus.mode = development +pegasus.mode = development # Test files that have been copied in (or staged back from OSG). Do not bother checking symlinked inputs. pegasus.integrity.checking = nosymlink @@ -20,10 +20,11 @@ pegasus.dir.storage.mapper.replica.file=output.map # XrootD for OSG, then anything else # FIXME: This feels like a *site* property, not a global pegasus.selector.replica=Regex -pegasus.selector.replica.regex.rank.1=file://(?!.*(cvmfs)).* -pegasus.selector.replica.regex.rank.2=file:///cvmfs/.* -pegasus.selector.replica.regex.rank.3=root://.* -pegasus.selector.replica.regex.rank.6=.\* +pegasus.selector.replica.regex.rank.1=osdf:///* +pegasus.selector.replica.regex.rank.2=file://(?!.*(cvmfs)).* +pegasus.selector.replica.regex.rank.3=file:///cvmfs/.* +pegasus.selector.replica.regex.rank.4=root://.* +pegasus.selector.replica.regex.rank.5=.\* dagman.maxpre=1 # Override default value of 1800s diff --git a/requirements.txt b/requirements.txt index a4dad7533ba..377238db4a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,7 +18,7 @@ tqdm gwdatafind>=1.1.3 # Requirements for full pegasus env -pegasus-wms.api >= 5.0.3 +pegasus-wms.api >= 5.0.6 # Need GitPython: See discussion in https://github.com/gwastro/pycbc/pull/4454 GitPython # need to pin until pegasus for further upstream diff --git a/setup.py b/setup.py index f573c27dac4..b180ec9519e 100755 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ 'tqdm', 'setuptools', 'gwdatafind', - 'pegasus-wms.api >= 5.0.3', + 'pegasus-wms.api >= 5.0.6', 'python-ligo-lw >= 1.7.0', 'ligo-segments', 'lalsuite!=7.2',