diff --git a/prody/database/__init__.py b/prody/database/__init__.py index fb523f789..87a736d32 100644 --- a/prody/database/__init__.py +++ b/prody/database/__init__.py @@ -10,8 +10,7 @@ * :func:`.fetchPfamMSA` - download MSA files * :func:`.searchPfam` - search for domain families of a protein - -.. _Pfam: http://pfam.sanger.ac.uk/ +.. _Pfam: https://www.ebi.ac.uk/interpro/entry/pfam/ UniProt ======== @@ -70,7 +69,14 @@ .. _GOA: https://www.ebi.ac.uk/GOA/ +Interpro +==== + +The following functions can be used to search and retrieve Pfam_ data: + + * :func:`.searchInterpro` - search for domain families of a protein +.. _Pfam: https://www.ebi.ac.uk/interpro/ """ __all__ = [] @@ -98,3 +104,7 @@ from . import quartataweb from .quartataweb import * __all__.extend(quartataweb.__all__) + +from . import interpro +from .interpro import * +__all__.extend(interpro.__all__) diff --git a/prody/database/interpro.py b/prody/database/interpro.py new file mode 100644 index 000000000..fe2d786af --- /dev/null +++ b/prody/database/interpro.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +"""This module defines functions for interfacing Interpro database.""" + +__author__ = 'James Krieger' + +import json +from os.path import isfile +from prody import LOGGER, PY3K + +__all__ = ['searchInterpro'] + +prefix = 'https://www.ebi.ac.uk/interpro/wwwapi/entry/' + +def searchInterpro(query, **kwargs): + """Returns Interpro search results in a list of dictionaries. + + Matching family accessions as keys will map to various properties, + including start and end residue positions. + + :arg query: UniProt ID or PDB identifier with or without a + chain identifier, e.g. ``'1mkp'`` or ``'1mkpA'``. + UniProt ID of the specified chain, or the first + protein chain will be used for searching the Pfam database + :type query: str + + :arg timeout: timeout for blocking connection attempt in seconds, default + is 60 + :type timeout: int + """ + import requests + + LOGGER.timeit('_interpro') + timeout = int(kwargs.get('timeout', 60)) + + if len(query) == 4: + url = prefix + "all/structure/pdb/" + query + + elif len(query) == 5: + accession = None + + from prody import parsePDBHeader + try: + polymers = parsePDBHeader(query[:4], 'polymers') + except Exception as err: + raise ValueError('failed to parse header for {0} ({1})' + .format(query[:4], str(err))) + + chid = query[4:].upper() + + for poly in polymers: + if chid and poly.chid != chid: + continue + for dbref in poly.dbrefs: + if dbref.database != 'UniProt': + continue + accession = dbref.accession + LOGGER.info('UniProt accession {0} for {1} chain ' + '{2} will be used.' + .format(accession, query[:4], poly.chid)) + break + if accession is not None: + break + + if accession is None: + raise ValueError('A UniProt accession for PDB {0} could not be ' + 'parsed.'.format(repr(query))) + else: + url = prefix + "all/protein/uniprot/" + accession + + else: + url = prefix + "all/protein/uniprot/" + query + + LOGGER.debug('Retrieving Interpro search results: ' + url) + result = None + sleep = 2 + while LOGGER.timing('_interpro') < timeout: + try: + result = requests.get(url, verify=False).content + except Exception: + pass + else: + if result not in ['PEND','RUN']: + break + + sleep = 20 if int(sleep * 1.5) >= 20 else int(sleep * 1.5) + LOGGER.sleep(int(sleep), '. Trying to reconnect...') + + if not result: + raise IOError('Interpro search timed out or failed to parse results, ' + ' check URL: ' + url) + else: + LOGGER.report('Interpro search completed in %.2fs.', '_interpro') + + if PY3K: + result = result.decode() + + try: + result = json.loads(result) + except Exception as err: + raise ValueError('failed to parse results as json, check URL: ' + url) + + return result["results"] diff --git a/prody/database/pfam.py b/prody/database/pfam.py index c8c01d583..f60654eac 100644 --- a/prody/database/pfam.py +++ b/prody/database/pfam.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """This module defines functions for interfacing Pfam database.""" -__author__ = 'Anindita Dutta, Ahmet Bakan, Cihan Kaya' +__author__ = 'Anindita Dutta, Ahmet Bakan, Cihan Kaya, James Krieger' import re from numbers import Integral @@ -22,6 +22,7 @@ import urllib import urllib2 +import json __all__ = ['searchPfam', 'fetchPfamMSA', 'parsePfamPDBs'] @@ -29,186 +30,78 @@ SELEX = 'selex' STOCKHOLM = 'stockholm' -DOWNLOAD_FORMATS = set(['seed', 'full', 'ncbi', 'metagenomics', - 'rp15', 'rp35', 'rp55', 'rp75', 'uniprot']) +DOWNLOAD_FORMATS = set(['seed', 'full', 'uniprot', + #'ncbi', 'metagenomics', + #'rp15', 'rp35', 'rp55', 'rp75' + ]) FORMAT_OPTIONS = ({'format': set([FASTA, SELEX, STOCKHOLM]), 'order': set(['tree', 'alphabetical']), 'inserts': set(['lower', 'upper']), 'gaps': set(['mixed', 'dots', 'dashes', 'none'])}) -MINSEQLEN = 16 - -old_prefix = 'https://pfam.xfam.org/' -prefix = 'https://pfam-legacy.xfam.org/' +prefix = 'https://www.ebi.ac.uk/interpro/wwwapi/entry/' def searchPfam(query, **kwargs): """Returns Pfam search results in a dictionary. Matching Pfam accession as keys will map to evalue, alignment start and end residue positions. - :arg query: UniProt ID, PDB identifier, a protein sequence, or a sequence - file. Sequence queries must not contain without gaps and must be at - least 16 characters long + :arg query: UniProt ID or PDB identifier with or without a + chain identifier, e.g. ``'1mkp'`` or ``'1mkpA'``. + UniProt ID of the specified chain, or the first + protein chain will be used for searching the Pfam database :type query: str :arg timeout: timeout for blocking connection attempt in seconds, default is 60 :type timeout: int - - *query* can also be a PDB identifier, e.g. ``'1mkp'`` or ``'1mkpA'`` with - chain identifier. UniProt ID of the specified chain, or the first - protein chain will be used for searching the Pfam database.""" + """ import requests - if isfile(query): - from prody.sequence import MSAFile - try: - seq = next(MSAFile(query)) - except: - with openFile(query) as inp: - seq = ''.join(inp.read().split()) - else: - seq = seq[0][1] - if not seq.isalpha(): - raise ValueError('could not parse a sequence without gaps from ' + - query) - else: - seq = ''.join(query.split()) + seq = ''.join(query.split()) import xml.etree.cElementTree as ET LOGGER.timeit('_pfam') timeout = int(kwargs.get('timeout', 60)) - if len(seq) >= MINSEQLEN: - if not seq.isalpha(): - raise ValueError(repr(seq) + ' is not a valid sequence') - fseq = '>Seq\n' + seq - parameters = { 'hmmdb' : 'pfam', 'seq': fseq } - enc_params = urllib.urlencode(parameters).encode('utf-8') - request = urllib2.Request('https://www.ebi.ac.uk/Tools/hmmer/search/hmmscan', enc_params) - - results_url = urllib2.urlopen(request).geturl() - - #res_params = { 'output' : 'xml' } - res_params = { 'format' : 'tsv' } - enc_res_params = urllib.urlencode(res_params) - #modified_res_url = results_url + '?' + enc_res_params - modified_res_url = results_url.replace('results','download') + '?' + enc_res_params - - result_request = urllib2.Request(modified_res_url) - # url = ( urllib2.urlopen(request).geturl() + '?output=xml') - LOGGER.debug('Submitted Pfam search for sequence "{0}...".' - .format(seq[:MINSEQLEN])) + if len(seq) <= 5: + accession = None + from prody import parsePDBHeader try: - #xml = urllib2.urlopen(result_request).read() - tsv = urllib2.urlopen(result_request).read() - # openURL(url, timeout=timeout).read() - except: - raise ValueError('No matching Pfam domains were found.') - - # try: - # root = ET.XML(xml) - # except Exception as err: - # raise ValueError('failed to parse results XML, check URL: ' + modified_res_url) - - matches = {} - #for child in root[0]: - #if child.tag == 'hits': - # accession = child.get('acc') - # pfam_id = accession.split('.')[0] - # matches[pfam_id]={} - # matches[pfam_id]['accession']=accession - # matches[pfam_id]['class']='Domain' - # matches[pfam_id]['id']=child.get('name') - # matches[pfam_id]['locations']={} - # matches[pfam_id]['locations']['ali_end']=child[0].get('alisqto') - # matches[pfam_id]['locations']['ali_start']=child[0].get('alisqfrom') - # matches[pfam_id]['locations']['bitscore']=child[0].get('bitscore') - # matches[pfam_id]['locations']['end']=child[0].get('alisqto') - # matches[pfam_id]['locations']['evalue']=child.get('evalue') - # matches[pfam_id]['locations']['evidence']='hmmer v3.0' - # matches[pfam_id]['locations']['hmm_end']=child[0].get('alihmmto') - # matches[pfam_id]['locations']['hmm_start']=child[0].get('alihmmfrom') - # matches[pfam_id]['locations']['significant']=child[0].get('significant') - # matches[pfam_id]['locations']['start']=child[0].get('alisqfrom') - # matches[pfam_id]['type']='Pfam-A' - # return matches - - if PY3K: - tsv = tsv.decode() - - lines = tsv.split('\n') - keys = lines[0].split('\t') - root = {} - for i, line in enumerate(lines[1:-1]): - root[i] = {} - for j, key in enumerate(keys): - root[i][key] = line.split('\t')[j] - - for child in root.values(): - accession = child['Family Accession'] - pfam_id = accession.split('.')[0] - matches[pfam_id]={} - matches[pfam_id]['accession'] = accession - matches[pfam_id]['class'] = 'Domain' - matches[pfam_id]['id'] = child['Family id'] - matches[pfam_id]['locations'] = {} - matches[pfam_id]['locations']['ali_end'] = child['Ali. End'] - matches[pfam_id]['locations']['ali_start'] = child['Ali. Start'] - matches[pfam_id]['locations']['bitscore'] = child['Bit Score'] - matches[pfam_id]['locations']['end'] = child['Env. End'] - matches[pfam_id]['locations']['cond_evalue'] = child['Cond. E-value'] - matches[pfam_id]['locations']['ind_evalue'] = child['Ind. E-value'] - matches[pfam_id]['locations']['evidence'] = 'hmmer v3.0' - matches[pfam_id]['locations']['hmm_end'] = child['Model End'] - matches[pfam_id]['locations']['hmm_start'] = child['Model Start'] - #matches[pfam_id]['locations']['significant'] = child['significant'] - matches[pfam_id]['locations']['start'] = child['Env. Start'] - matches[pfam_id]['type'] = 'Pfam-A' - return matches + polymers = parsePDBHeader(seq[:4], 'polymers') + except Exception as err: + raise ValueError('failed to parse header for {0} ({1})' + .format(seq[:4], str(err))) + else: + chid = seq[4:].upper() - else: - if len(seq) <= 5: - idcode = None - from prody import parsePDBHeader - try: - polymers = parsePDBHeader(seq[:4], 'polymers') - except Exception as err: - LOGGER.warn('failed to parse header for {0} ({1})' - .format(seq[:4], str(err))) - else: - chid = seq[4:].upper() - - for poly in polymers: - if chid and poly.chid != chid: + for poly in polymers: + if chid and poly.chid != chid: + continue + for dbref in poly.dbrefs: + if dbref.database != 'UniProt': continue - for dbref in poly.dbrefs: - if dbref.database != 'UniProt': - continue - idcode = dbref.idcode - accession = dbref.accession - LOGGER.info('UniProt ID code {0} for {1} chain ' - '{2} will be used.' - .format(idcode, seq[:4], poly.chid)) - break - if idcode is not None: - break - if idcode is None: - LOGGER.warn('A UniProt ID code for PDB {0} could not be ' - 'parsed.'.format(repr(seq))) - url = prefix + 'protein/' + seq + '?output=xml' - else: - url = prefix + 'protein/' + idcode + '?output=xml' - + accession = dbref.accession + LOGGER.info('UniProt accession {0} for {1} chain ' + '{2} will be used.' + .format(accession, seq[:4], poly.chid)) + break + if accession is not None: + break + if accession is None: + raise ValueError('A UniProt accession for PDB {0} could not be ' + 'parsed.'.format(repr(seq))) else: - url = prefix + 'protein/' + seq + '?output=xml' + url = prefix + "all/protein/uniprot/" + accession + + else: + url = prefix + "all/protein/uniprot/" + seq LOGGER.debug('Retrieving Pfam search results: ' + url) xml = None sleep = 2 while LOGGER.timing('_pfam') < timeout: try: - # xml = openURL(url, timeout=timeout).read() xml = requests.get(url, verify=False).content except Exception: pass @@ -227,78 +120,47 @@ def searchPfam(query, **kwargs): if PY3K: xml = xml.decode() + else: + xml = xml.encode() if xml.find('There was a system error on your last request.') > 0: LOGGER.warn('No Pfam matches found for: ' + seq) return None elif xml.find('No valid UniProt accession or ID') > 0: - try: - url = prefix + 'protein/' + accession + '?output=xml' - LOGGER.debug('Retrieving Pfam search results: ' + url) - xml = openURL(url, timeout=timeout).read() - except: - raise ValueError('No valid UniProt accession or ID for: ' + seq) - - if xml.find('No valid UniProt accession or ID') > 0: - try: - ag = parsePDB(seq, subset='ca') - ag_seq = ag.getSequence() - return searchPfam(ag_seq) - except: - try: - url = 'https://uniprot.org/uniprot/' + accession + '.xml' - xml = openURL(url, timeout=timeout).read() - if len(xml) > 0: - root = ET.XML(xml) - accession = root[0][0].text - - url = prefix + 'protein/' + accession + '?output=xml' - LOGGER.debug('Retrieving Pfam search results: ' + url) - xml = openURL(url, timeout=timeout).read() - else: - raise ValueError('No valid UniProt accession or ID for: ' + seq) - except: - raise ValueError('No valid UniProt accession or ID for: ' + seq) + raise ValueError('No valid UniProt accession or ID for: ' + seq) try: - root = ET.XML(xml) + root = json.loads(xml) except Exception as err: raise ValueError('failed to parse results XML, check URL: ' + url) - if len(seq) >= MINSEQLEN: - try: - xml_matches = root[0][0][0][0] - except IndexError: - raise ValueError('failed to parse results XML, check URL: ' + url) - else: - key = '{' + old_prefix + '}' - results = dictElement(root[0], key) - try: - xml_matches = results['matches'] - except KeyError: - raise ValueError('failed to parse results XML, check URL: ' + url) - matches = dict() - for child in xml_matches: - + for entry in root["results"]: try: - accession = child.attrib['accession'][:7] + metadata = entry["metadata"] + accession = metadata["accession"] except KeyError: - raise ValueError('failed to parse results XML, check URL: ' + url) + raise ValueError('failed to parse accessions from results, check URL: ' + url) - if not re.search('^P(F|B)[0-9]{5}$', accession): - raise ValueError('{0} does not match pfam accession' - ' format'.format(accession)) - - match = matches.setdefault(accession, dict(child.items())) - locations = match.setdefault('locations', []) - for loc in child: - locations.append(dict(loc.items())) + if not re.search('PF[0-9]{5}$', accession): + continue - if len(seq) < MINSEQLEN: - query = 'Query ' + repr(query) - else: - query = 'Query sequence' + match = matches.setdefault(accession, dict(metadata.items())) + + other_data = entry["proteins"] + locations = match.setdefault("locations", []) + for item1 in other_data: + for key, value in item1.items(): + if key == "entry_protein_locations": + for item2 in value: + new_dict = {} + for item3 in value[0]["fragments"]: + new_dict["start"] = item3["start"] + new_dict["end"] = item3["end"] + new_dict["score"] = item2["score"] + locations.append(new_dict) + + query = 'Query ' + repr(query) if matches: LOGGER.info(query + ' matched {0} Pfam families.'.format(len(matches))) @@ -320,21 +182,6 @@ def fetchPfamMSA(acc, alignment='full', compressed=False, **kwargs): :arg compressed: gzip the downloaded MSA file, default is **False** - *Alignment Options* - - :arg format: a Pfam supported MSA file format, one of ``'selex'``, - (default), ``'stockholm'`` or ``'fasta'`` - - :arg order: ordering of sequences, ``'tree'`` (default) or - ``'alphabetical'`` - - :arg inserts: letter case for inserts, ``'upper'`` (default) or ``'lower'`` - - :arg gaps: gap character, one of ``'dashes'`` (default), ``'dots'``, - ``'mixed'`` or **None** for unaligned - - *Other Options* - :arg timeout: timeout for blocking connection attempt in seconds, default is 60 @@ -344,69 +191,21 @@ def fetchPfamMSA(acc, alignment='full', compressed=False, **kwargs): import requests - # url = prefix + 'family/acc?id=' + acc - # handle = openURL(url, timeout=int(kwargs.get('timeout', 60))) - orig_acc = acc - # acc = handle.readline().strip() - # if PY3K: - # acc = acc.decode() - url_flag = False - if not re.search('(?<=PF)[0-9]{5}$', acc): raise ValueError('{0} is not a valid Pfam ID or Accession Code' - .format(repr(orig_acc))) + .format(repr(acc))) if alignment not in DOWNLOAD_FORMATS: - raise ValueError('alignment must be one of full, seed, ncbi or' - ' metagenomics') - if alignment == 'ncbi' or alignment == 'metagenomics' or alignment == 'uniprot': - url = (prefix + 'family/' + acc + '/alignment/' + - alignment + '/gzipped') - url_flag = True - extension = '.sth' - else: - if not kwargs: - url = (prefix + 'family/' + acc + '/alignment/' + - alignment + '/gzipped') - url_flag = True - extension = '.sth' - else: - align_format = kwargs.get('format', 'selex').lower() - - if align_format not in FORMAT_OPTIONS['format']: - raise ValueError('alignment format must be of type selex' - ' stockholm or fasta. MSF not supported') - - if align_format == SELEX: - align_format, extension = 'pfam', '.slx' - elif align_format == FASTA: - extension = '.fasta' - else: - extension = '.sth' - - gaps = str(kwargs.get('gaps', 'dashes')).lower() - if gaps not in FORMAT_OPTIONS['gaps']: - raise ValueError('gaps must be of type mixed, dots, dashes, ' - 'or None') + raise ValueError('alignment must be one of full, seed, or uniprot') - inserts = kwargs.get('inserts', 'upper').lower() - if(inserts not in FORMAT_OPTIONS['inserts']): - raise ValueError('inserts must be of type lower or upper') - - order = kwargs.get('order', 'tree').lower() - if order not in FORMAT_OPTIONS['order']: - raise ValueError('order must be of type tree or alphabetical') - - url = (prefix + 'family/' + acc + '/alignment/' - + alignment + '/format?format=' + align_format + - '&alnType=' + alignment + '&order=' + order[0] + - '&case=' + inserts[0] + '&gaps=' + gaps + '&download=1') + url = (prefix + "/pfam/" + acc + + '/?annotation=alignment:' + alignment + '&download') + extension = '.sth' LOGGER.timeit('_pfam') timeout = kwargs.get('timeout', 60) response = None sleep = 2 - try_error = 3 while LOGGER.timing('_pfam') < timeout: try: response = requests.get(url, verify=False).content @@ -418,32 +217,22 @@ def fetchPfamMSA(acc, alignment='full', compressed=False, **kwargs): sleep = 20 if int(sleep * 1.5) >= 20 else int(sleep * 1.5) LOGGER.sleep(int(sleep), '. Trying to reconnect...') - # response = openURL(url, timeout=int(kwargs.get('timeout', 60))) outname = kwargs.get('outname', None) if not outname: - outname = orig_acc + outname = acc folder = str(kwargs.get('folder', '.')) filepath = join(makePath(folder), outname + '_' + alignment + extension) if compressed: filepath = filepath + '.gz' - if url_flag: - f_out = open(filepath, 'wb') - else: - f_out = openFile(filepath, 'wb') - # f_out.write(response.read()) + f_out = open(filepath, 'wb') f_out.write(response) f_out.close() else: - if url_flag: - gunzip(response, filepath) - else: - with open(filepath, 'wb') as f_out: - # f_out.write(response.read()) - f_out.write(response) + gunzip(response, filepath) filepath = relpath(filepath) LOGGER.info('Pfam MSA for {0} is written as {1}.' - .format(orig_acc, filepath)) + .format(acc, filepath)) return filepath @@ -471,6 +260,8 @@ def parsePfamPDBs(query, data=[], **kwargs): The PFAM domain that ends closest to this will be selected. :type end: int """ + + only_parse = kwargs.pop('only_parse', False) start = kwargs.pop('start', 1) end = kwargs.pop('end', None) @@ -478,6 +269,10 @@ def parsePfamPDBs(query, data=[], **kwargs): if len(query) > 4 and query.startswith('PF'): pfam_acc = query else: + if not isinstance(start, Integral) and not isinstance(end, Integral): + raise ValueError('Please provide an integer for start or end ' + 'when using a UniProt ID or PDB ID.') + pfam_matches = searchPfam(query, **kwargs) keys = list(pfam_matches.keys()) @@ -495,17 +290,13 @@ def parsePfamPDBs(query, data=[], **kwargs): start_diff = np.array(start_diff) pfam_acc = keys[np.where(abs(start_diff) == min(abs(start_diff)))[0][0]] - elif isinstance(end, Integral): + if isinstance(end, Integral): end_diff = [] for i, key in enumerate(pfam_matches): end_diff.append(int(pfam_matches[key]['locations'][0]['end']) - end) end_diff = np.array(end_diff) pfam_acc = keys[np.where(abs(end_diff) == min(abs(end_diff)))[0][0]] - else: - raise ValueError('Please provide an integer for start or end ' - 'when using a UniProt ID or PDB ID.') - from ftplib import FTP from .uniprot import queryUniprot @@ -552,10 +343,16 @@ def parsePfamPDBs(query, data=[], **kwargs): else: results = ags + if only_parse: + return [result for result in results if result is not None] + LOGGER.progress('Extracting Pfam domains...', len(ags)) comma_splitter = re.compile(r'\s*,\s*').split no_info = [] for i, ag in enumerate(ags): + if ag is None: + continue + LOGGER.update(i) data_dict = data_dicts[i] pfamRange = data_dict['UniprotResnumRange'].split('-') @@ -575,7 +372,7 @@ def parsePfamPDBs(query, data=[], **kwargs): pdbid = value['PDB'] except: continue - if pdbid != data_dict['PDB_ID']: + if pdbid.lower() != data_dict['PDB_ID'].lower(): continue pdbchains = value['chains'] @@ -635,5 +432,5 @@ def parsePfamPDBs(query, data=[], **kwargs): else: LOGGER.warn('data should be a list in order to get output') - return results + return [result for result in results if result is not None] diff --git a/prody/tests/database/test_pfam.py b/prody/tests/database/test_pfam.py new file mode 100644 index 000000000..adaef5276 --- /dev/null +++ b/prody/tests/database/test_pfam.py @@ -0,0 +1,236 @@ +"""This module contains unit tests for :mod:`prody.database.pfam` module.""" + +from numpy import tile, array, arange, ones +from numpy.testing import assert_allclose + +from prody.tests import unittest +from prody.database.pfam import searchPfam +from prody.database.pfam import fetchPfamMSA +from prody.database.pfam import parsePfamPDBs + +from prody.atomic.selection import Selection + +import os +import shutil + +from prody import LOGGER +LOGGER.verbosity = 'none' + +class TestSearchPfam(unittest.TestCase): + + @classmethod + def setUpClass(self): + self.workdir = 'pfam_search_tests' + if not os.path.exists(self.workdir): + os.mkdir(self.workdir) + os.chdir(self.workdir) + + self.queries = ['P19491', '6qkcB', '6qkcI'] + + def testUniprotAccMulti(self): + """Test the outcome of a simple search scenario using a Uniprot Accession + for a multi-domain protein, AMPAR GluA2.""" + + a = searchPfam(self.queries[0]) + + self.assertIsInstance(a, dict, + 'searchPfam failed to return a dict instance') + + self.assertEqual(sorted(list(a.keys())), + ['PF00060', 'PF00497', 'PF01094', 'PF10613'], + 'searchPfam failed to return the right domain family IDs') + + def testPdbIdChMulti(self): + """Test the outcome of a simple search scenario using a PDB ID + and chain ID for the same multi-domain protein from specifying chain B.""" + + a = searchPfam(self.queries[1]) + + self.assertIsInstance(a, dict, + 'searchPfam failed to return a dict instance') + + self.assertEqual(sorted(list(a.keys())), ['PF00060', 'PF00497', 'PF01094', 'PF10613'], + 'searchPfam failed to return the right domain family IDs for AMPAR') + + def testPdbIdChSingle(self): + """Test the outcome of a simple search scenario using a PDB ID + and chain ID to get the single domain protein TARP g8 from chain I.""" + + a = searchPfam(self.queries[2]) + + self.assertIsInstance(a, dict, + 'searchPfam failed to return a dict instance') + + self.assertEqual(sorted(list(a.keys())), + ['PF00822'], + 'searchPfam failed to return the right domain family IDs for TARP') + + @classmethod + def tearDownClass(self): + os.chdir('..') + shutil.rmtree(self.workdir) + + +class TestFetchPfamMSA(unittest.TestCase): + + @classmethod + def setUpClass(self): + self.query = 'PF00822' + + self.workdir = 'pfam_msa_tests' + if not os.path.exists(self.workdir): + os.mkdir(self.workdir) + os.chdir(self.workdir) + + def testDefault(self): + """Test the outcome of fetching the domain MSA for claudins + with default parameters.""" + + b = fetchPfamMSA(self.query) + + self.assertIsInstance(b, str, + 'fetchPfamMSA failed to return a str instance') + + self.assertEqual(b, 'PF00822_full.sth') + + self.assertTrue(os.path.exists(b)) + + + def testSeed(self): + """Test the outcome of fetching the domain MSA for claudins + with the alignment type argument set to seed""" + + b = fetchPfamMSA(self.query, "seed") + + self.assertIsInstance(b, str, + 'fetchPfamMSA failed to return a str instance') + + self.assertEqual(b, 'PF00822_seed.sth') + + self.assertTrue(os.path.exists(b)) + + def testFolder(self): + """Test the outcome of fetching the domain MSA for claudins + with keyword folder set to a folder that is made especially.""" + + folder = "new_folder" + os.mkdir(folder) + b = fetchPfamMSA(self.query, folder=folder) + + self.assertIsInstance(b, str, + 'fetchPfamMSA failed to return a str instance') + + self.assertEqual(b, 'new_folder/PF00822_full.sth') + + self.assertTrue(os.path.exists(b)) + + @classmethod + def tearDownClass(self): + os.chdir('..') + shutil.rmtree(self.workdir) + + +class TestParsePfamPDBs(unittest.TestCase): + + @classmethod + def setUpClass(self): + self.queries = ['PF20446', 'Q57ZF2', 'P40682'] + + self.workdir = 'pfam_pdb_tests' + if not os.path.exists(self.workdir): + os.mkdir(self.workdir) + os.chdir(self.workdir) + + def testPfamIdDefault(self): + """Test the outcome of parsing PDBs for a tiny family + of ABC class ATPase N-terminal domains (5 members) + with the Pfam ID and default parameters.""" + + b = parsePfamPDBs(self.queries[0]) + + self.assertIsInstance(b, list, + 'parsePfamPDBs failed to return a list instance') + + self.assertIsInstance(b[0], Selection, + 'parsePfamPDBs failed to return a list of Selection instances') + + self.assertEqual(len(b), 5, + 'parsePfamPDBs failed to return a list of length 5') + + + def testUniprotDefault(self): + """Test the outcome of parsing PDBs for a tiny family + of ABC class ATPase N-terminal domains (5 members) + with the Uniprot long ID and default parameters.""" + + b = parsePfamPDBs(self.queries[1]) + + self.assertIsInstance(b, list, + 'parsePfamPDBs failed to return a list instance') + + self.assertIsInstance(b[0], Selection, + 'parsePfamPDBs failed to return a list of Selection instances') + + self.assertEqual(len(b), 5, + 'parsePfamPDBs failed to return a list of length 5') + + + def testMultiDomainDefault(self): + """Test the outcome of parsing PDBs using a V-type proton ATPase subunit S1, + which has two domains but few relatives. Default parameters should + return Selection objects containing the first domain.""" + + b = parsePfamPDBs(self.queries[2]) + + self.assertIsInstance(b, list, + 'parsePfamPDBs failed to return a list instance') + + self.assertIsInstance(b[0], Selection, + 'parsePfamPDBs failed to return a list of Selection instances') + + self.assertEqual(len(b), 7, + 'parsePfamPDBs failed to return a list of length 7') + + self.assertEqual(b[0].getResnums()[0], 262, + 'parsePfamPDBs failed to return a first Selection with first resnum 262') + + def testMultiDomainStart1(self): + """Test the outcome of parsing PDBs using a V-type proton ATPase subunit S1, + which has two domains but few relatives. Using start=1 should be like default and + return Selection objects containing the first domain.""" + + b = parsePfamPDBs(self.queries[2], start=1) + + self.assertIsInstance(b, list, + 'parsePfamPDBs failed to return a list instance') + + self.assertIsInstance(b[0], Selection, + 'parsePfamPDBs failed to return a list of Selection instances') + + self.assertEqual(len(b), 7, + 'parsePfamPDBs failed to return a list of length 7') + + self.assertEqual(b[0].getResnums()[0], 262, + 'parsePfamPDBs failed to return a first Selection with first resnum 262') + + def testMultiDomainStart2(self): + """Test the outcome of parsing PDBs using a V-type proton ATPase subunit S1, + which has two domains but few relatives. Setting start to 418 should + return Selection objects containing the second domain.""" + + b = parsePfamPDBs(self.queries[2], start=418) + + self.assertIsInstance(b, list, + 'parsePfamPDBs failed to return a list instance') + + self.assertIsInstance(b[0], Selection, + 'parsePfamPDBs failed to return a list of Selection instances') + + self.assertEqual(b[0].getResnums()[0], 418, + 'parsePfamPDBs failed to return a first Selection with first resnum 418') + + @classmethod + def tearDownClass(self): + os.chdir('..') + shutil.rmtree(self.workdir) +