Skip to content

Commit

Permalink
Merge pull request #118 from EBISPOT/lsf_check
Browse files Browse the repository at this point in the history
Lsf check
  • Loading branch information
jdhayhurst authored Jun 29, 2022
2 parents 858ec80 + 78abe06 commit ee73ff3
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 106 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ certifi==2020.6.20
chardet==3.0.4
ConfigArgParse==1.2.3
coverage==5.5
cx-Oracle==7.3.0
cx-Oracle>=7.3.0
datrie==0.8.2
docutils==0.16
et-xmlfile==1.0.1
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
description='A variety of utilities for activities involved in the running of the GWAS Catalog',
name='gwasUtils',
version='0.1-SNAPSHOT',
data_files=[('r_scripts',['catalogPlots/dataReleaseTimer.R', 'catalogPlots/SumStats_plotter.R', 'catalogPlots/TA_vs_GWAS_publication.R'])],
data_files=[('r_scripts',['catalogPlots/dataReleaseTimer.R', 'catalogPlots/SumStats_plotter.R', 'catalogPlots/TA_vs_GWAS_publication.R']),
('nf', ['solrIndexerManager/solr_indexing.nf'])],
packages=['gwasAssociationFilter',
'curationUtils',
'curationUtils.curationQueue',
Expand Down
8 changes: 4 additions & 4 deletions solrIndexerManager/components/lsf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def submit_job(self, command, workingDir = None, jobname = None):
'working_dir' : workingDir,
'job_name' : jobname
})

def kill_job(self, jobID):
x = Popen(['bkill', jobID], stdout=PIPE, stderr=PIPE)

Expand All @@ -99,11 +100,10 @@ def check_job(self,jobID):
x = Popen(['bjobs', '-a', jobID], stdout=PIPE, stderr=PIPE)
y = Popen(['tail', '-n+2'], stdin=x.stdout, stdout=PIPE)
x.stdout.close()

output = y.communicate()
stdout = str(output[0])

return stdout.split()[2]
stdout = str(output[0].decode())
status = stdout.split()[2] if stdout else None
return status

def generate_report(self):

Expand Down
222 changes: 122 additions & 100 deletions solrIndexerManager/indexer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,116 @@
from solrWrapper import solr_wrapper
import time
import os
import sys
import json
import subprocess

# Loading components:
from solrIndexerManager.components import getUpdated
from solrIndexerManager.components import solrUpdater
from solrIndexerManager.components import lsf_manager

def job_generator(db_updates, wrapper):
"""
This function generates the jobs based on the provided wrapper script and the dictionary with the db updates.
Return data:
{
${pmid} : './${wrapper} -d -e -p ${pmid}',
...
efotrait : './${wrapper} -a -s -d',
diseasetrait : './${wrapper} -a -s -e'
}
"""

# Indexing jobs with associations and studies for each pubmed ID:
jobs = {pmid : '{} -d -e -p {}'.format(wrapper, pmid) for x in db_updates.values() for pmid in x if pmid != '*'}

# Indexing job to generate disease trait and efo trait documents:
jobs['efo_traits'] = '{} -a -s -d '.format(wrapper)
jobs['disease_traits'] = '{} -a -s -e '.format(wrapper)

return jobs

def manage_lsf_jobs(job_list, workingDir):
'''
This function handles the lsf jobs. Monitors their progression and decides when to exit and how.... boy it needs to be improved.
'''

# Hardcoded variables:
memoryLimit = 8000
jobPrefix = 'solr_indexing'
jobGroup = '/gwas_catalog/solr_indexer'
queue = 'production'

# Initialize lsf object:
LSF_obj = lsf_manager.LSF_manager(memory=memoryLimit, job_prefix=jobPrefix, job_group=jobGroup, workingDir=workingDir, queue=queue)

# Looping though all jobs:
folder_index = 0

# Looping through all the jobs, create separate folders and submit each to the farm:
for job_ID, job in job_list.items():
try:
os.mkdir('{}/{}'.format(workingDir, job_ID))
except FileExistsError:
print('[Warning] folder already exists: {}/{}'.format(workingDir, job_ID))

# Submit job to farm:
LSF_obj.submit_job(job, workingDir='{}/{}'.format(workingDir, job_ID), jobname = job_ID)

# Wait until all jobs are finished or terminated:
while True:
report = LSF_obj.generate_report()

print('[Info] Checking statuses of the submitted jobs at: {:%b %d %H:%M}'.format(datetime.now()))
for status, count in report.items():
print("\t{}: {}".format(status, count))

if 'RUN' not in report and 'PEND' not in report and 'EXIT' not in report:
print('[Info] No running or pending jobs were found. Exiting.')
break

time.sleep(600)
class IndexerManager:
def __init__(self,
newInstance=None,
oldInstance=None,
solrHost=None,
solrPort=None,
solrCore=None,
wrapperScript=None,
logDir=None,
fullIndex=None,
memory=None,
job_prefix=None,
job_group=None,
workingDir=None,
queue=None,
nfScriptPath=None):
self.newInstance = newInstance
self.oldInstance = oldInstance
self.solrHost = solrHost
self.solrPort = solrPort
self.solrCore = solrCore
self.wrapperScript = wrapperScript
self.logDir = logDir
self.fullIndex = fullIndex
self.memory = memory
self.job_prefix = job_prefix
self.job_group = job_group
self.workingDir = workingDir
self.queue = queue
self.db_updates = None
self.nfScriptPath = nfScriptPath
self.job_file = None


def job_generator(self):
"""
This function generates the jobs based on the provided wrapper script and the dictionary with the db updates.
Return data:
{
${pmid} : './${wrapper} -d -e -p ${pmid}',
...
efotrait : './${wrapper} -a -s -d',
diseasetrait : './${wrapper} -a -s -e'
}
"""

# Indexing jobs with associations and studies for each pubmed ID:
jobs = {pmid : '{} -d -e -p {}'.format(self.wrapperScript, pmid) for x in self.db_updates.values() for pmid in x if pmid != '*'}
# Indexing job to generate disease trait and efo trait documents:
jobs['efo_traits'] = '{} -a -s -d '.format(self.wrapperScript)
jobs['disease_traits'] = '{} -a -s -e '.format(self.wrapperScript)
return jobs


def get_database_updates(self):
# Determine updates by comparing old and new database instances:
new_table = getUpdated.get_studies(self.newInstance)
# The update object is generated depending on if the flag is enabled or not:
if self.fullIndex:
db_updates = {
"added": new_table.PUBMED_ID.unique().tolist(), # As if all publications in the new table was newly added
"removed": ['*'], # As if all publications were removed.
"updated": []
}
else:
old_table = getUpdated.get_studies(self.oldInstance)
db_updates = getUpdated.get_db_updates(old_table, new_table)
return db_updates

def set_database_updates(self, db_updates):
self.db_updates = db_updates

def generate_job_list_file(self):
job_map = self.job_generator()
self.job_file = os.path.join(self.logDir, 'job_map.csv')
with open(self.job_file, 'w') as f:
for k, v in job_map.items():
s = ",".join([k, v]) + "\n"
f.write(s)

def prepare_solr(self):
# Instantiate solr object:
solr_object = solr_wrapper.solrWrapper(host=self.solrHost, port=self.solrPort, core=self.solrCore)
# Removed associations and studies for all updated/deleted studies + removing all trait documents:
solrUpdater.removeUpdatedSolrData(solr_object, self.db_updates)

def run_indexer(self):
"""
Indexing is managed by a Nextflow workflow
"""
nextflow_cmd = """
nextflow -log {logs} \
run {nf} \
--job_map_file {jm}
""".format(logs=os.path.join(self.logDir, "nextflow.log"),
nf=self.nfScriptPath,
jm=self.job_file)
print("Running nextflow: {}".format(nextflow_cmd))
subproc_cmd = nextflow_cmd.split()
process = subprocess.run(subproc_cmd, check=True)
print(process.stdout)


def main():
Expand All @@ -95,8 +137,8 @@ def main():
action="store_true")

# Location for log files:
parser.add_argument('--logFolder', help='Folder into which the log files will be generated.')

parser.add_argument('--logFolder', help='Folder into which the log files will be generated.', default="./")
parser.add_argument('--nfScript', help='Nextflow script path', default=os.path.join(sys.prefix,"nf/solr_indexing.nf"))
# Print out excessive reports:
parser.add_argument('--verbose', help='Flag to give more informative output.', action="store_true")
args = parser.parse_args()
Expand All @@ -110,6 +152,7 @@ def main():
solrCore = args.solrCore
solrPort = args.solrPort
fullIndex = args.fullIndex
nfScriptPath = args.nfScript
verbose = args.verbose

# Parse wrapper:
Expand All @@ -118,41 +161,20 @@ def main():
# Parse log directory:
logDir = args.logFolder

# Determine updates by comparing old and new database instances:
old_table = getUpdated.get_studies(oldInstance)
new_table = getUpdated.get_studies(newInstance)

# The update object is generated depending on if the flag is enabled or not:
if fullIndex:
db_updates = {
"added": new_table.PUBMED_ID.unique().tolist(), # As if all publications in the new table was newly added
"removed": ['*'], # As if all publications were removed.
"updated": []
}
else:
db_updates = getUpdated.get_db_updates(old_table, new_table)

# Instantiate solr object:
solr_object = solr_wrapper.solrWrapper(host=solrHost, port=solrPort, core=solrCore, verbose=True)

# Removed associations and studies for all updated/deleted studies + removing all trait documents:
solrUpdater.removeUpdatedSolrData(solr_object, db_updates)

# Generate a list of jobs:
joblist = job_generator(db_updates, wrapperScript)

# Print reports before submit to farm:
if verbose:
print('[Info] List of jobs to be submitted to the farm:')
print('\n\t'.join(joblist.values()))

# Submitting the jobs to the farm:
manage_lsf_jobs(joblist, logDir)

# At this point there's no downstream management of the jobs.... just submit and wait.
# we are assuming things went well. If not downstream QC measures will terminate the plan anyway.
# Later time based on experience this can be made more sophisticated.

manager = IndexerManager(newInstance=newInstance,
oldInstance=oldInstance,
solrHost=solrHost,
solrPort=solrPort,
solrCore=solrCore,
wrapperScript=wrapperScript,
logDir=logDir,
fullIndex=fullIndex,
nfScriptPath=nfScriptPath)
db_updates = manager.get_database_updates()
manager.set_database_updates(db_updates=db_updates)
manager.generate_job_list_file()
manager.prepare_solr()
manager.run_indexer()

if __name__ == '__main__':
main()
Expand Down
34 changes: 34 additions & 0 deletions solrIndexerManager/solr_indexing.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
nextflow.enable.dsl=2

params.job_map_file = ''
params.log_path = ''

// ------------------------------------------------------------- //
// Run the solr indexer jobs //
// ------------------------------------------------------------- //


process run_solr_indexer {
tag "$id"
memory { 4.GB * task.attempt }
time { 6.hour * task.attempt }
maxRetries 3
errorStrategy { task.exitStatus in 2..140 ? 'retry' : 'terminate' }

input:
tuple val(id), val(cmd)

output:
stdout

"""
echo $id
$cmd
"""
}


workflow {
jobs = channel.fromPath("$params.job_map_file").splitCsv()
run_solr_indexer(jobs)
}

0 comments on commit ee73ff3

Please sign in to comment.