Skip to content

Commit

Permalink
Changes for OSDF support (gwastro#4459)
Browse files Browse the repository at this point in the history
* Changes for OSDF support

* Bump version here as well

* Add OSDF handling in workflow.datafind

* Add OSDF handling

* Add in OSDF transfer code

* Make OSDF local transfers work

* Code Climating

* Should include IGWN requirements

* Trying to fix failures

* Revert moving ciecplib import

* Missed undoing this change
  • Loading branch information
spxiwh authored and bhooshan-gadre committed Mar 1, 2024
1 parent 371460e commit ace4c45
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 21 deletions.
33 changes: 31 additions & 2 deletions pycbc/workflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@
import os
import stat
import shutil
import subprocess
from shutil import which
import urllib.parse
from urllib.parse import urlparse

from pycbc.types.config import InterpolatingConfigParser

# NOTE urllib is weird. For some reason it only allows known schemes and will
# give *wrong* results, rather then failing, if you use something like gsiftp
# We can add schemes explicitly, as below, but be careful with this!
urllib.parse.uses_relative.append('osdf')
urllib.parse.uses_netloc.append('osdf')


def resolve_url(url, directory=None, permissions=None, copy_to_cwd=True):
"""Resolves a URL to a local file, and returns the path to that file.
Expand Down Expand Up @@ -75,8 +83,9 @@ def resolve_url(url, directory=None, permissions=None, copy_to_cwd=True):
shutil.copy(u.path, filename)

elif u.scheme == "http" or u.scheme == "https":
# FIXME: Move to top and make optional once 4001 functionality is
# merged
# Would like to move ciecplib import to top using import_optional, but
# it needs to be available when documentation runs in the CI, and I
# can't get it to install in the GitHub CI
import ciecplib
with ciecplib.Session() as s:
if u.netloc in ("git.ligo.org", "code.pycbc.phy.syr.edu"):
Expand All @@ -89,6 +98,26 @@ def resolve_url(url, directory=None, permissions=None, copy_to_cwd=True):
output_fp.write(r.content)
output_fp.close()

elif u.scheme == "osdf":
# OSDF will require a scitoken to be present and stashcp to be
# available. Thanks Dunky for the code here!
cmd = [
which("stashcp") or "stashcp",
u.path,
filename,
]

try:
subprocess.run(cmd, check=True, capture_output=True)
except subprocess.CalledProcessError as err:
# Print information about the failure
print(err.cmd, "failed with")
print(err.stderr.decode())
print(err.stdout.decode())
raise

return filename

else:
# TODO: We could support other schemes as needed
errmsg = "Unknown URL scheme: %s\n" % (u.scheme)
Expand Down
3 changes: 2 additions & 1 deletion pycbc/workflow/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2078,7 +2078,8 @@ def resolve_url_to_file(curr_pfn, attrs=None):
"""
cvmfsstr1 = 'file:///cvmfs/'
cvmfsstr2 = 'file://localhost/cvmfs/'
cvmfsstrs = (cvmfsstr1, cvmfsstr2)
osdfstr1 = 'osdf:///' # Technically this isn't CVMFS, but same handling!
cvmfsstrs = (cvmfsstr1, cvmfsstr2, osdfstr1)

# Get LFN
urlp = urllib.parse.urlparse(curr_pfn)
Expand Down
36 changes: 25 additions & 11 deletions pycbc/workflow/datafind.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,21 @@

import os, copy
import logging
import urllib.parse
from ligo import segments
from ligo.lw import utils, table
from glue import lal
from gwdatafind import find_urls as find_frame_urls
from pycbc.workflow.core import SegFile, File, FileList, make_analysis_dir
from pycbc.io.ligolw import LIGOLWContentHandler

# NOTE urllib is weird. For some reason it only allows known schemes and will
# give *wrong* results, rather then failing, if you use something like gsiftp
# We can add schemes explicitly, as below, but be careful with this!
# (urllib is used indirectly through lal.Cache objects)
urllib.parse.uses_relative.append('osdf')
urllib.parse.uses_netloc.append('osdf')


def setup_datafind_workflow(workflow, scienceSegs, outputDir, seg_file=None,
tags=None):
Expand Down Expand Up @@ -729,7 +737,10 @@ def convert_cachelist_to_filelist(datafindcache_list):
curr_ifo = cache.ifo
for frame in cache:
# Pegasus doesn't like "localhost" in URLs.
frame.url = frame.url.replace('file://localhost','file://')
frame.url = frame.url.replace('file://localhost', 'file://')
# Not sure why it happens in OSDF URLs!!
# May need to remove use of Cache objects
frame.url = frame.url.replace('osdf://localhost/', 'osdf:///')

# Create one File() object for each unique frame file that we
# get back in the cache.
Expand All @@ -744,17 +755,20 @@ def convert_cachelist_to_filelist(datafindcache_list):
prev_file = currFile

# Populate the PFNs for the File() we just created
if frame.url.startswith('file://'):
if frame.url.startswith('file:///cvmfs/'):
# Frame is on CVMFS, so let all sites read it directly.
currFile.add_pfn(frame.url, site='all')
else:
# Frame not on CVMFS, so may need transferring.
# Be careful here! If all your frames files are on site
# = local and you try to run on OSG, it will likely
# overwhelm the condor file transfer process!
currFile.add_pfn(frame.url, site='local')
cvmfs_urls = ('file:///cvmfs/', 'osdf://')
if frame.url.startswith(cvmfs_urls):
# Frame is on CVMFS/OSDF, so let all sites read it directly.
currFile.add_pfn(frame.url, site='all')
elif frame.url.startswith('file://'):
# Frame not on CVMFS, so may need transferring.
# Be careful here! If all your frames files are on site
# = local and you try to run on OSG, it will likely
# overwhelm the condor file transfer process!
currFile.add_pfn(frame.url, site='local')
else:
# Frame is at some unknown URL. Pegasus will decide how to deal
# with this, but will likely transfer to local site first, and
# from there transfer to remote sites as needed.
currFile.add_pfn(frame.url, site='notlocal')

return datafind_filelist
Expand Down
11 changes: 6 additions & 5 deletions pycbc/workflow/pegasus_files/pegasus-properties.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
###############################################################################
#
# Reduce number of retries and patience with failing jobs. Turn to production for production runs.
pegasus.mode = development
pegasus.mode = development
# Test files that have been copied in (or staged back from OSG). Do not bother checking symlinked inputs.
pegasus.integrity.checking = nosymlink

Expand All @@ -20,10 +20,11 @@ pegasus.dir.storage.mapper.replica.file=output.map
# XrootD for OSG, then anything else
# FIXME: This feels like a *site* property, not a global
pegasus.selector.replica=Regex
pegasus.selector.replica.regex.rank.1=file://(?!.*(cvmfs)).*
pegasus.selector.replica.regex.rank.2=file:///cvmfs/.*
pegasus.selector.replica.regex.rank.3=root://.*
pegasus.selector.replica.regex.rank.6=.\*
pegasus.selector.replica.regex.rank.1=osdf:///*
pegasus.selector.replica.regex.rank.2=file://(?!.*(cvmfs)).*
pegasus.selector.replica.regex.rank.3=file:///cvmfs/.*
pegasus.selector.replica.regex.rank.4=root://.*
pegasus.selector.replica.regex.rank.5=.\*

dagman.maxpre=1
# Override default value of 1800s
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ tqdm
gwdatafind>=1.1.3

# Requirements for full pegasus env
pegasus-wms.api >= 5.0.3
pegasus-wms.api >= 5.0.6
# Need GitPython: See discussion in https://github.com/gwastro/pycbc/pull/4454
GitPython
# need to pin until pegasus for further upstream
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
'tqdm',
'setuptools',
'gwdatafind',
'pegasus-wms.api >= 5.0.3',
'pegasus-wms.api >= 5.0.6',
'python-ligo-lw >= 1.7.0',
'ligo-segments',
'lalsuite!=7.2',
Expand Down

0 comments on commit ace4c45

Please sign in to comment.