From 3ab8c35e8cd38adcd8da36f3e059df8ca2062bb3 Mon Sep 17 00:00:00 2001 From: Thomas N Lawson Date: Tue, 23 Jun 2020 16:15:26 +0100 Subject: [PATCH] Time limit signal (#15) * testing time limits * futher checks for time * added signal * rm unwanted files * fixed indention bug * fixes * close signal * added more checks for test --- msnpy/__init__.py | 2 +- msnpy/__main__.py | 70 ++-- msnpy/annotation.py | 765 ++++++++++++++++++++------------------- tests/test_annotation.py | 41 ++- 4 files changed, 461 insertions(+), 417 deletions(-) diff --git a/msnpy/__init__.py b/msnpy/__init__.py index 1118545..bb8aaf7 100644 --- a/msnpy/__init__.py +++ b/msnpy/__init__.py @@ -22,5 +22,5 @@ __author__ = 'Ralf Weber (r.j.weber@bham.ac.uk)' __credits__ = 'Ralf Weber (r.j.weber@bham.ac.uk)' -__version__ = '1.0.0a5' +__version__ = '1.0.0a6' __license__ = 'GPLv3' diff --git a/msnpy/__main__.py b/msnpy/__main__.py index b848879..4cb7e94 100644 --- a/msnpy/__main__.py +++ b/msnpy/__main__.py @@ -226,8 +226,8 @@ def main(): # pragma: no cover action='store_true', required=False, help="Filter the spectral tree annotations.") - parser_ast.add_argument('-t', '--time_limit', - default=None, type=float, required=False, + parser_ast.add_argument('-t', '--time-limit', + default=None, type=int, required=False, help="Time limit (seconds) for each tree to be processed for annotation") ################################# @@ -399,32 +399,48 @@ def main(): # pragma: no cover save_trees(spectral_trees, args.output, format="json") if args.step == "convert-spectral-trees": + print('converting trees to dimspy peaklists') - if args.input_type=='json': - non_merged_pls, merged_pls, ms1_precursor_pl = tree2peaklist(tree_pth=args.input, - out_pth=args.output, - name=args.name, - adjust_mz=args.adjust_mz, - merge=args.merge, - ppm=args.ppm) - if args.msp: - print('Converting dimspy peaklists to MSP files') - if non_merged_pls: - peaklist2msp(non_merged_pls, - os.path.join(args.output, '{}_non_merged.msp'.format(args.name)), - msp_type=args.msp_type, - polarity=args.polarity) - if merged_pls: - peaklist2msp(merged_pls, - os.path.join(args.output, '{}_merged.msp'.format(args.name)), - msp_type=args.msp_type, - polarity=args.polarity) - if ms1_precursor_pl: - peaklist2msp(ms1_precursor_pl, - os.path.join(args.output, '{}_ms1_precursors.msp'.format(args.name)), - msp_type=args.msp_type, - polarity=args.polarity, - include_ms1=True) + if args.input_type == 'json': + + if os.stat(args.input).st_size == 0: + def touch(path): + with open(path, 'a'): + os.utime(path, None) + # Needed to keep Galaxy workflows from failing in Galaxy + touch(os.path.join(args.output, '{}_non_merged_pls.hdf5'.format(args.name))) + touch(os.path.join(args.output, '{}_merged_pls.hdf5'.format(args.name))) + touch(os.path.join(args.output, '{}_ms1_precursors_pl.hdf5'.format(args.name))) + touch(os.path.join(args.output, '{}_non_merged.msp'.format(args.name))) + touch(os.path.join(args.output, '{}_merged.msp'.format(args.name))) + touch(os.path.join(args.output, '{}_ms1_precursors.msp'.format(args.name))) + else: + non_merged_pls, merged_pls, ms1_precursor_pl = tree2peaklist(tree_pth=args.input, + out_pth=args.output, + name=args.name, + adjust_mz=args.adjust_mz, + merge=args.merge, + ppm=args.ppm) + + if args.msp: + print('Converting dimspy peaklists to MSP files') + if non_merged_pls: + peaklist2msp(non_merged_pls, + os.path.join(args.output, '{}_non_merged.msp'.format(args.name)), + msp_type=args.msp_type, + polarity=args.polarity) + if merged_pls: + peaklist2msp(merged_pls, + os.path.join(args.output, '{}_merged.msp'.format(args.name)), + msp_type=args.msp_type, + polarity=args.polarity) + if ms1_precursor_pl: + peaklist2msp(ms1_precursor_pl, + os.path.join(args.output, '{}_ms1_precursors.msp'.format(args.name)), + msp_type=args.msp_type, + polarity=args.polarity, + include_ms1=True) + else: pls = hdf5_portal.load_peaklists_from_hdf5(args.input) peaklist2msp(pls, diff --git a/msnpy/annotation.py b/msnpy/annotation.py index 8a987cb..c31d168 100644 --- a/msnpy/annotation.py +++ b/msnpy/annotation.py @@ -18,26 +18,24 @@ # You should have received a copy of the GNU General Public License # along with MSnPy. If not, see . # - - +import signal import collections import sqlite3 import time from typing import Sequence - import networkx as nx import pandas as pd import requests - from .processing import mz_tol, mz_pair_diff_tol -def is_time_left(start_time, time_limit): - if time_limit and start_time and time.time() - start_time >= time_limit: - return False - else: - return True +def signal_handler(signum, frame): + raise TimeoutException() +class TimeoutException(Exception): + def __init__(self, *args, **kwargs): + pass + class ApiMfdb: def __init__(self, url="https://mfdb.bham.ac.uk"): @@ -94,144 +92,151 @@ def select_mf(self, min_tol: float, max_tol: float, adducts: dict = None, rules: def annotate_mf(spectral_trees: Sequence[nx.classes.ordered.OrderedDiGraph], db_out: str, ppm: float, adducts: dict = {"[M+H]+": 1.0072764}, rules: bool = True, mf_db: str = "http://mfdb.bham.ac.uk", - prefix_inp: str = "", time_limit: float = None): - - db = ApiMfdb(url=mf_db) - - conn = sqlite3.connect(db_out) - cursor = conn.cursor() + prefix_inp: str = "", time_limit: int = ''): for G in spectral_trees: - start_time = time.time() + signal.signal(signal.SIGALRM, signal_handler) + if time_limit: + signal.alarm(int(time_limit)) # Time Limit + try: + annotate_mf_single(G, db_out, ppm, adducts, rules, mf_db, prefix_inp) + except TimeoutException as e: + print("Time out ", e) - print("Annotate precursors and fragments for Group {}".format(G.graph["id"])) - if prefix_inp == "": - prefix = "_{}".format(G.graph["id"]) - else: - prefix = "_{}_{}".format(G.graph["id"], prefix_inp) - - cursor.execute("""DROP TABLE IF EXISTS MF{}""".format(prefix)) - cursor.execute(""" - CREATE TABLE MF{} ( - MZ_ID TEXT, - MF_ID INTEGER, - C INTEGER, - H INTEGER, - N INTEGER, - O INTEGER, - P INTEGER, - S INTEGER, - DBE INTEGER, - LEWIS INTEGER, - SENIOR INTEGER, - HC INTEGER, - NOPSC INTEGER, - ADDUCT TEXT, - MASS FLOAT, - MZ FLOAT, - PPM_ERROR FLOAT, - PRECURSOR INTEGER, - MSLEVEL TEXT, - FLAG INTEGER, - PRIMARY KEY (MZ_ID, MF_ID) - )""".format(prefix)) + if time_limit: + signal.alarm(0) - mf_id = 1 - nodes_done = [] - rows = [] - for edge in G.edges(data=True): - if not is_time_left(start_time, time_limit): - break - - if edge[2]['mzdiff'] > 0.5 and edge[2]['type'] == "e": + return spectral_trees - for node_id in [edge[0], edge[1]]: - node = G.nodes[node_id] +def annotate_mf_single(G, db_out, ppm: float, adducts: dict = {"[M+H]+": 1.0072764}, rules: bool = True, + mf_db: str = "http://mfdb.bham.ac.uk", prefix_inp: str = ""): + conn = sqlite3.connect(db_out) + cursor = conn.cursor() - if node_id not in nodes_done: + db = ApiMfdb(url=mf_db) - nodes_done.append(node_id) - min_tol, max_tol = mz_tol(node["mz"], ppm) # ppm tol + print("Annotate precursors and fragments for Group {}".format(G.graph["id"])) + if prefix_inp == "": + prefix = "_{}".format(G.graph["id"]) + else: + prefix = "_{}_{}".format(G.graph["id"], prefix_inp) - records_mf = db.select_mf(min_tol, max_tol, adducts, rules) - # print(min_tol, max_tol, adducts, rules, len(records_mf)) - if len(records_mf) > 0: - for mf in records_mf: - ppm_error = round((node["mz"] - mf["mass"]) / (mf["mass"] * 0.000001), 2) - values = ( + cursor.execute("""DROP TABLE IF EXISTS MF{}""".format(prefix)) + cursor.execute(""" + CREATE TABLE MF{} ( + MZ_ID TEXT, + MF_ID INTEGER, + C INTEGER, + H INTEGER, + N INTEGER, + O INTEGER, + P INTEGER, + S INTEGER, + DBE INTEGER, + LEWIS INTEGER, + SENIOR INTEGER, + HC INTEGER, + NOPSC INTEGER, + ADDUCT TEXT, + MASS FLOAT, + MZ FLOAT, + PPM_ERROR FLOAT, + PRECURSOR INTEGER, + MSLEVEL TEXT, + FLAG INTEGER, + PRIMARY KEY (MZ_ID, MF_ID) + )""".format(prefix)) + + mf_id = 1 + nodes_done = [] + rows = [] + for edge in G.edges(data=True): + + if edge[2]['mzdiff'] > 0.5 and edge[2]['type'] == "e": + + for node_id in [edge[0], edge[1]]: + + node = G.nodes[node_id] + + if node_id not in nodes_done: + + nodes_done.append(node_id) + min_tol, max_tol = mz_tol(node["mz"], ppm) # ppm tol + + records_mf = db.select_mf(min_tol, max_tol, adducts, rules) + # print(min_tol, max_tol, adducts, rules, len(records_mf)) + if len(records_mf) > 0: + for mf in records_mf: + ppm_error = round((node["mz"] - mf["mass"]) / (mf["mass"] * 0.000001), 2) + values = ( node_id, mf_id, mf["atoms"]["C"], mf["atoms"]["H"], mf["atoms"]["N"], mf["atoms"]["O"], - mf["atoms"]["P"], mf["atoms"]["S"], mf["adduct"], mf["mass"], node["mz"], ppm_error, int(node["precursor"]), + mf["atoms"]["P"], mf["atoms"]["S"], mf["adduct"], mf["mass"], node["mz"], ppm_error, + int(node["precursor"]), node["mslevel"], mf["DBE"], mf["LEWIS"], mf["SENIOR"], mf["HC"], mf["NOPSC"], 0,) - rows.append(values) - mf_id += 1 - else: - values = (node_id, None, None, None, None, None, None, None, None, None, node["mz"], None, - int(node["precursor"]), node["mslevel"], None, None, None, None, None, None,) rows.append(values) + mf_id += 1 + else: + values = (node_id, None, None, None, None, None, None, None, None, None, node["mz"], None, + int(node["precursor"]), node["mslevel"], None, None, None, None, None, None,) + rows.append(values) - node_i = G.nodes[edge[0]] - node_j = G.nodes[edge[1]] + node_i = G.nodes[edge[0]] + node_j = G.nodes[edge[1]] - min_tol, max_tol = mz_pair_diff_tol(node_j["mz"], node_i["mz"], ppm, "ppm") + min_tol, max_tol = mz_pair_diff_tol(node_j["mz"], node_i["mz"], ppm, "ppm") - records_mf = db.select_mf(min_tol, max_tol, {None: 0.0}, False) + records_mf = db.select_mf(min_tol, max_tol, {None: 0.0}, False) - if len(records_mf) > 0: - for mf in records_mf: - ppm_error = round((mf["mass"] - edge[2]['mzdiff']) / (mf["mass"] * 0.000001), 2) - values = ("{}__{}".format(edge[0], edge[1]), mf_id, - mf["atoms"]["C"], mf["atoms"]["H"], - mf["atoms"]["N"], mf["atoms"]["O"], - mf["atoms"]["P"], mf["atoms"]["S"], - None, mf["mass"], edge[2]['mzdiff'], ppm_error, - None, - "{}_{}".format(node_i["mslevel"], node_j["mslevel"]), - mf["DBE"], mf["LEWIS"], mf["SENIOR"], mf["HC"], mf["NOPSC"], 0) - rows.append(values) - mf_id += 1 - else: - values = ( + if len(records_mf) > 0: + for mf in records_mf: + ppm_error = round((mf["mass"] - edge[2]['mzdiff']) / (mf["mass"] * 0.000001), 2) + values = ("{}__{}".format(edge[0], edge[1]), mf_id, + mf["atoms"]["C"], mf["atoms"]["H"], + mf["atoms"]["N"], mf["atoms"]["O"], + mf["atoms"]["P"], mf["atoms"]["S"], + None, mf["mass"], edge[2]['mzdiff'], ppm_error, + None, + "{}_{}".format(node_i["mslevel"], node_j["mslevel"]), + mf["DBE"], mf["LEWIS"], mf["SENIOR"], mf["HC"], mf["NOPSC"], 0) + rows.append(values) + mf_id += 1 + else: + values = ( "{}__{}".format(edge[0], edge[1]), None, None, None, None, None, None, None, None, None, None, edge[2]['mzdiff'], None, "{}_{}".format(node_i["mslevel"], node_j["mslevel"]), None, None, None, None, None, 0,) - rows.append(values) - #################################### - # else: - # values = ("{}__{}".format(edge[0], edge[1]), None, None, None, None, None, None, None, None, None, None, - # edge[2]['mzdiff'], None, "{}_{}".format(node_i["mslevel"], node_j["mslevel"]), - # None, None, None, None, None, 0,) - # rows.append(values) - - if not is_time_left(start_time, time_limit): - print("Could not complete - time out for group {}".format(G.graph["id"])) - continue - - cursor.executemany(""" - INSERT INTO MF{} - (MZ_ID, MF_ID, C, H, N, O, P, S, ADDUCT, MASS, MZ, PPM_ERROR, PRECURSOR, MSLEVEL, DBE, LEWIS, SENIOR, HC, NOPSC, flag) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """.format(prefix), rows) - - cursor.execute(""" - CREATE INDEX CHNOPS_ADDUCT_{}_IDX - ON MF{} (MZ_ID, PRECURSOR, ADDUCT, C, H, N, O, P, S, FLAG);""".format(prefix, prefix)) - cursor.execute(""" - CREATE INDEX MZ_ID_{}_IDX - ON MF{} (MZ_ID);""".format(prefix, prefix)) - cursor.execute(""" - CREATE INDEX MF_MSL_P_F_{}_IDX - ON MF{} (MSLEVEL, PRECURSOR, FLAG)""".format(prefix, prefix)) - cursor.execute(""" - CREATE INDEX MF_FLAG_{}_IDX - ON MF{} (FLAG)""".format(prefix, prefix)) - conn.commit() + rows.append(values) + #################################### + # else: + # values = ("{}__{}".format(edge[0], edge[1]), None, None, None, None, None, None, None, None, None, None, + # edge[2]['mzdiff'], None, "{}_{}".format(node_i["mslevel"], node_j["mslevel"]), + # None, None, None, None, None, 0,) + # rows.append(values) + + cursor.executemany(""" + INSERT INTO MF{} + (MZ_ID, MF_ID, C, H, N, O, P, S, ADDUCT, MASS, MZ, PPM_ERROR, PRECURSOR, MSLEVEL, DBE, LEWIS, SENIOR, HC, NOPSC, flag) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """.format(prefix), rows) - return spectral_trees + cursor.execute(""" + CREATE INDEX CHNOPS_ADDUCT_{}_IDX + ON MF{} (MZ_ID, PRECURSOR, ADDUCT, C, H, N, O, P, S, FLAG);""".format(prefix, prefix)) + cursor.execute(""" + CREATE INDEX MZ_ID_{}_IDX + ON MF{} (MZ_ID);""".format(prefix, prefix)) + cursor.execute(""" + CREATE INDEX MF_MSL_P_F_{}_IDX + ON MF{} (MSLEVEL, PRECURSOR, FLAG)""".format(prefix, prefix)) + cursor.execute(""" + CREATE INDEX MF_FLAG_{}_IDX + ON MF{} (FLAG)""".format(prefix, prefix)) + conn.commit() + conn.close() -def mf_tree(G: nx.classes.ordered.OrderedDiGraph, path_db: str, max_mslevel: int, prefix: str, time_limit: float = None, - start_time=None): +def mf_tree(G: nx.classes.ordered.OrderedDiGraph, path_db: str, max_mslevel: int, prefix: str): conn = sqlite3.connect(path_db) cursor = conn.cursor() @@ -273,9 +278,6 @@ def mf_tree(G: nx.classes.ordered.OrderedDiGraph, path_db: str, max_mslevel: int record = tuple(filter(lambda x: x is not None, record)) for i in range(0, len(record), 6): - if not is_time_left(start_time, time_limit): - print('Time limit reached - no annotated tree for graph {}'.format(G.graph["id"])) - return False ids = record[i:i + 6] cursor.execute(""" @@ -317,317 +319,318 @@ def mf_tree(G: nx.classes.ordered.OrderedDiGraph, path_db: str, max_mslevel: int return mft -def filter_mf(trees: Sequence[nx.classes.ordered.OrderedDiGraph], path_db: str, time_limit: float = None): +def filter_mf(trees: Sequence[nx.classes.ordered.OrderedDiGraph], path_db: str, time_limit: int = ''): #http://www.sqlstyle.guide/ print('filter mf') + annotated_trees = [] + + for G in trees: + signal.signal(signal.SIGALRM, signal_handler) + if time_limit: + signal.alarm(time_limit) + + try: + filtered_tree = filter_mf_single_tree(G, path_db) + if filtered_tree: + annotated_trees.extend(filtered_tree) + except TimeoutException as e: + print("Time out ", e) + + if time_limit: + signal.alarm(0) + + return annotated_trees +def filter_mf_single_tree(G, path_db): conn = sqlite3.connect(path_db) cursor = conn.cursor() - atoms = ["C", "H", "N", "O", "P", "S"] + start_time = time.time() - annotated_trees = [] - for G in trees: - start_time = time.time() + prefix = "_{}".format(G.graph["id"]) + print("Filtering group {}".format(G.graph["id"])) - prefix = "_{}".format(G.graph["id"]) - print("Filtering group {}".format(G.graph["id"])) + ################################################################ + # TABLES: EDGES AND MZ_PREC_FRAG + ################################################################ + cursor.execute("""DROP TABLE IF EXISTS EDGES{}""".format(prefix)) + cursor.execute(""" + CREATE TABLE EDGES{} ( + MF_ID_PREC INTERGER, + MF_ID_NL INTERGER, + MF_ID_FRAG INTERGER, + MF_ID_PREC_MASS FLOAT, + MF_ID_NL_MASS FLOAT, + MF_ID_FRAG_MASS FLOAT, + MZ_ID_PREC TEXT, + MZ_ID_NL TEXT, + MZ_ID_FRAG TEXT, + MF_ID_PREC_FLAG INTEGER, + MF_ID_NL_FLAG INTEGER, + MF_ID_FRAG_FLAG INTERGER, + PRIMARY KEY(MF_ID_PREC, MF_ID_NL, MF_ID_FRAG) + )""".format(prefix)) - ################################################################ - # TABLES: EDGES AND MZ_PREC_FRAG - ################################################################ - cursor.execute("""DROP TABLE IF EXISTS EDGES{}""".format(prefix)) - cursor.execute(""" - CREATE TABLE EDGES{} ( - MF_ID_PREC INTERGER, - MF_ID_NL INTERGER, - MF_ID_FRAG INTERGER, - MF_ID_PREC_MASS FLOAT, - MF_ID_NL_MASS FLOAT, - MF_ID_FRAG_MASS FLOAT, - MZ_ID_PREC TEXT, - MZ_ID_NL TEXT, - MZ_ID_FRAG TEXT, - MF_ID_PREC_FLAG INTEGER, - MF_ID_NL_FLAG INTEGER, - MF_ID_FRAG_FLAG INTERGER, - PRIMARY KEY(MF_ID_PREC, MF_ID_NL, MF_ID_FRAG) - )""".format(prefix)) + cursor.execute(""" + CREATE INDEX EDGES_IDS{}_IDX + ON EDGES{} (MF_ID_PREC, MF_ID_NL, MF_ID_FRAG); + """.format(prefix, prefix)) - cursor.execute(""" - CREATE INDEX EDGES_IDS{}_IDX - ON EDGES{} (MF_ID_PREC, MF_ID_NL, MF_ID_FRAG); - """.format(prefix, prefix)) + cursor.execute(""" + CREATE INDEX EDGES_FLAGS{}_IDX + ON EDGES{} (MF_ID_PREC_FLAG, MF_ID_NL_FLAG, MF_ID_FRAG_FLAG) + """.format(prefix, prefix)) - cursor.execute(""" - CREATE INDEX EDGES_FLAGS{}_IDX - ON EDGES{} (MF_ID_PREC_FLAG, MF_ID_NL_FLAG, MF_ID_FRAG_FLAG) - """.format(prefix, prefix)) + cursor.execute("""DROP TABLE IF EXISTS MZ_PREC_FRAG{}""".format(prefix)) + + cursor.execute(""" + CREATE TABLE MZ_PREC_FRAG{} ( + MZ_ID_PREC INTERGER, + MZ_ID_FRAG INTERGER, + PRIMARY KEY (MZ_ID_PREC, MZ_ID_FRAG) + )""".format(prefix)) + conn.commit() + + cursor.execute(""" + SELECT COUNT(MF_ID) + FROM MF{} + WHERE FLAG >= 0 + """.format(prefix)) + + improvement = [cursor.fetchone()[0]] + + if G.number_of_nodes() == 0: + return '' + + # Insert all inital edges + for edge in [edge for edge in G.edges(data=True) if edge[2]["type"] == "e" and edge[2]["mzdiff"] > 0.5]: + + # Set IDs to search sqlite database + mz_id_prec, mz_id_frag, mz_id_nl = edge[0], edge[1], "{}__{}".format(edge[0], edge[1]) + + sub_queries = ['MF1.MZ_ID = "{}" AND MF2.MZ_ID = "{}" AND MF3.MZ_ID = "{}" ' + 'AND MF1.PRECURSOR = 1 AND MF1.ADDUCT = MF3.ADDUCT'.format(mz_id_prec, mz_id_nl, mz_id_frag), + 'MF1.FLAG >= 0 AND MF2.FLAG >= 0 AND MF3.FLAG >= 0'] - cursor.execute("""DROP TABLE IF EXISTS MZ_PREC_FRAG{}""".format(prefix)) + # Number of C in the fragment MF should be smaller or equal to the number of C in the precursor MF + # Number of C in the fragment MF plus the number of C in the neutral mass should be equal to the number of C in the precursor MF + # Apply for N, O, P and S. + for atom in atoms: + sub_queries.append("MF3.{} <= MF1.{} AND MF3.{} + MF2.{} = MF1.{}".format(atom, atom, atom, atom, atom)) + + # APPLY CONSTRAINS & INSERT EDGES cursor.execute(""" - CREATE TABLE MZ_PREC_FRAG{} ( - MZ_ID_PREC INTERGER, - MZ_ID_FRAG INTERGER, - PRIMARY KEY (MZ_ID_PREC, MZ_ID_FRAG) - )""".format(prefix)) - conn.commit() + INSERT INTO MZ_PREC_FRAG{} (MZ_ID_PREC, MZ_ID_FRAG) + VALUES ('{}','{}')""".format(prefix, mz_id_prec, mz_id_frag)) cursor.execute(""" - SELECT COUNT(MF_ID) - FROM MF{} - WHERE FLAG >= 0 - """.format(prefix)) + INSERT INTO EDGES{} + SELECT MF1.MF_ID, MF2.MF_ID, MF3.MF_ID, MF1.MASS, MF2.MASS, MF3.MASS, MF1.MZ_ID, MF2.MZ_ID, MF3.MZ_ID, 0, 0, 0 + FROM MF{} as MF1, MF{} as MF2, MF{} as MF3 + WHERE {} + """.format(prefix, prefix, prefix, prefix, " AND ".join(map(str, sub_queries)))) + conn.commit() + + cursor.execute(""" + SELECT MAX(CAST(mslevel as int)) + FROM MF{} + WHERE mslevel + NOT LIKE '%\_%' + """.format(prefix)) + max_mslevel = cursor.fetchone()[0] - improvement = [cursor.fetchone()[0]] + if max_mslevel is None: max_mslevel = 1 - if G.number_of_nodes() == 0: - continue + for loop in range(1, max_mslevel + 1): - # Insert all inital edges + # loop through neutral losses (i.e. type is e) for edge in [edge for edge in G.edges(data=True) if edge[2]["type"] == "e" and edge[2]["mzdiff"] > 0.5]: - if not is_time_left(start_time, time_limit): - break - # Set IDs to search sqlite database + # Define constrains as above mz_id_prec, mz_id_frag, mz_id_nl = edge[0], edge[1], "{}__{}".format(edge[0], edge[1]) sub_queries = ['MF1.MZ_ID = "{}" AND MF2.MZ_ID = "{}" AND MF3.MZ_ID = "{}" ' 'AND MF1.PRECURSOR = 1 AND MF1.ADDUCT = MF3.ADDUCT'.format(mz_id_prec, mz_id_nl, mz_id_frag), 'MF1.FLAG >= 0 AND MF2.FLAG >= 0 AND MF3.FLAG >= 0'] - # Number of C in the fragment MF should be smaller or equal to the number of C in the precursor MF - # Number of C in the fragment MF plus the number of C in the neutral mass should be equal to the number of C in the precursor MF - # Apply for N, O, P and S. for atom in atoms: - if not is_time_left(start_time, time_limit): - break sub_queries.append("MF3.{} <= MF1.{} AND MF3.{} + MF2.{} = MF1.{}".format(atom, atom, atom, atom, atom)) - # APPLY CONSTRAINS & INSERT EDGES + # Apply constrains & Update edges cursor.execute(""" - INSERT INTO MZ_PREC_FRAG{} (MZ_ID_PREC, MZ_ID_FRAG) - VALUES ('{}','{}')""".format(prefix, mz_id_prec, mz_id_frag)) + UPDATE EDGES{} + SET MF_ID_PREC_FLAG = {}, MF_ID_NL_FLAG = {}, MF_ID_FRAG_FLAG = {} + WHERE + EXISTS( + SELECT MF1.MF_ID, MF2.MF_ID, MF3.MF_ID + FROM MF{} AS MF1, MF{} as MF2, MF{} AS MF3 + WHERE {} + AND EDGES{}.MF_ID_PREC = MF1.MF_ID + AND EDGES{}.MF_ID_NL = MF2.MF_ID + AND EDGES{}.MF_ID_FRAG = MF3.MF_ID + ) + """.format(prefix, loop, loop, loop, prefix, prefix, prefix, " AND ".join(map(str, sub_queries)), prefix, + prefix, prefix)) # AND MF1.FLAG >= 1 AND MF1.FLAG > 1 AND MF1.FLAG >= 1; + conn.commit() - cursor.execute(""" - INSERT INTO EDGES{} - SELECT MF1.MF_ID, MF2.MF_ID, MF3.MF_ID, MF1.MASS, MF2.MASS, MF3.MASS, MF1.MZ_ID, MF2.MZ_ID, MF3.MZ_ID, 0, 0, 0 - FROM MF{} as MF1, MF{} as MF2, MF{} as MF3 - WHERE {} - """.format(prefix, prefix, prefix, prefix," AND ".join(map(str, sub_queries)))) conn.commit() + # Update flag MF table based on the flag in table edges and the number of loops + values = (loop, loop, loop, loop,) cursor.execute(""" - SELECT MAX(CAST(mslevel as int)) - FROM MF{} - WHERE mslevel - NOT LIKE '%\_%' - """.format(prefix)) - max_mslevel = cursor.fetchone()[0] + UPDATE MF{} + SET FLAG = ? + WHERE MF_ID + IN ( + SELECT MF_ID_PREC + FROM EDGES{} + WHERE MF_ID_PREC_FLAG = ? + ) + OR MF_ID + IN ( + SELECT MF_ID_NL + FROM EDGES{} + WHERE MF_ID_NL_FLAG = ? + ) + OR MF_ID + IN ( + SELECT MF_ID_FRAG + FROM EDGES{} + WHERE MF_ID_PREC_FLAG = ? + ) + """.format(prefix, prefix, prefix, prefix), values) + + ########################################################################### + # 1a. Select all fragment MF that do not occur as a valid precursor or are part of a 'loose' edge + # Example: C9H6NO Not valid for all MF rules, see http://link.springer.com/article/10.1007/s11419-016-0319-8 + # Example: C9H6NO Not valid for all MF rules, see http://metabolomics.jp/wiki/Ojima:KOX00444n + + # pos_5-methoxy-3-indoleacetic_acid__run_1 + + # No annotation for lower MS level fragments + # pos_DL_3_indolelactic_acid__run_1 + # e.g.m/z 170.06 > 142.07 > 115.05 > 67.70 (electrical noise peak) + + # Count number of edges related to a fragment + # Should have a minimum of two edges if not a MS level 1 precursor - if max_mslevel is None: max_mslevel = 1 - - for loop in range(1, max_mslevel + 1): - if not is_time_left(start_time, time_limit): - break - - # loop through neutral losses (i.e. type is e) - for edge in [edge for edge in G.edges(data=True) if edge[2]["type"] == "e" and edge[2]["mzdiff"] > 0.5]: - if not is_time_left(start_time, time_limit): - break - - # Define constrains as above - mz_id_prec, mz_id_frag, mz_id_nl = edge[0], edge[1], "{}__{}".format(edge[0], edge[1]) - - sub_queries = ['MF1.MZ_ID = "{}" AND MF2.MZ_ID = "{}" AND MF3.MZ_ID = "{}" ' - 'AND MF1.PRECURSOR = 1 AND MF1.ADDUCT = MF3.ADDUCT'.format(mz_id_prec, mz_id_nl, mz_id_frag), - 'MF1.FLAG >= 0 AND MF2.FLAG >= 0 AND MF3.FLAG >= 0'] + cursor.execute(""" + SELECT MF_ID, MZ_ID, count(*) as C + FROM ( + SELECT DISTINCT E1.MF_ID_PREC AS MF_ID, E1.MZ_ID_PREC AS MZ_ID + FROM EDGES{} AS E1 + WHERE E1.MF_ID_PREC_FLAG = ? AND E1.MF_ID_FRAG_FLAG = ? + UNION ALL + SELECT DISTINCT E2.MF_ID_FRAG AS MF_ID, E2.MZ_ID_FRAG AS MZ_ID + FROM EDGES{} AS E2 + WHERE E2.MF_ID_PREC_FLAG = ? AND E2.MF_ID_FRAG_FLAG = ? + ) + WHERE MF_ID NOT + IN ( + SELECT MF_ID + FROM MF{} + WHERE MSLEVEL = 1 + ) + AND MZ_ID + IN ( + SELECT MZ_ID_PREC + FROM MZ_PREC_FRAG{} + ) + GROUP BY MF_ID + HAVING C < 2 + """.format(prefix, prefix, prefix, prefix), (loop, loop, loop, loop,)) + records = cursor.fetchall() + for record in records: - for atom in atoms: - if not is_time_left(start_time, time_limit): - break - sub_queries.append("MF3.{} <= MF1.{} AND MF3.{} + MF2.{} = MF1.{}".format(atom, atom, atom, atom, atom)) + # Set the flag for the fragment/precursor MF to a minus value (i.e. -flag) + # If all related fragment masses (if exists) have no MF annotation - # Apply constrains & Update edges - cursor.execute(""" - UPDATE EDGES{} - SET MF_ID_PREC_FLAG = {}, MF_ID_NL_FLAG = {}, MF_ID_FRAG_FLAG = {} - WHERE - EXISTS( - SELECT MF1.MF_ID, MF2.MF_ID, MF3.MF_ID - FROM MF{} AS MF1, MF{} as MF2, MF{} AS MF3 - WHERE {} - AND EDGES{}.MF_ID_PREC = MF1.MF_ID - AND EDGES{}.MF_ID_NL = MF2.MF_ID - AND EDGES{}.MF_ID_FRAG = MF3.MF_ID - ) - """.format(prefix, loop, loop, loop, prefix, prefix, prefix, " AND ".join(map(str, sub_queries)), prefix, prefix, prefix)) # AND MF1.FLAG >= 1 AND MF1.FLAG > 1 AND MF1.FLAG >= 1; - conn.commit() - - conn.commit() - - # Update flag MF table based on the flag in table edges and the number of loops - values = (loop, loop, loop, loop,) + values = (-loop, record[0], loop,) cursor.execute(""" UPDATE MF{} SET FLAG = ? - WHERE MF_ID - IN ( - SELECT MF_ID_PREC - FROM EDGES{} - WHERE MF_ID_PREC_FLAG = ? - ) - OR MF_ID - IN ( - SELECT MF_ID_NL - FROM EDGES{} - WHERE MF_ID_NL_FLAG = ? - ) - OR MF_ID - IN ( - SELECT MF_ID_FRAG - FROM EDGES{} - WHERE MF_ID_PREC_FLAG = ? - ) - """.format(prefix, prefix, prefix, prefix), values) - - ########################################################################### - # 1a. Select all fragment MF that do not occur as a valid precursor or are part of a 'loose' edge - # Example: C9H6NO Not valid for all MF rules, see http://link.springer.com/article/10.1007/s11419-016-0319-8 - # Example: C9H6NO Not valid for all MF rules, see http://metabolomics.jp/wiki/Ojima:KOX00444n - - # pos_5-methoxy-3-indoleacetic_acid__run_1 - - # No annotation for lower MS level fragments - # pos_DL_3_indolelactic_acid__run_1 - # e.g.m/z 170.06 > 142.07 > 115.05 > 67.70 (electrical noise peak) - - # Count number of edges related to a fragment - # Should have a minimum of two edges if not a MS level 1 precursor - - cursor.execute(""" - SELECT MF_ID, MZ_ID, count(*) as C - FROM ( - SELECT DISTINCT E1.MF_ID_PREC AS MF_ID, E1.MZ_ID_PREC AS MZ_ID - FROM EDGES{} AS E1 - WHERE E1.MF_ID_PREC_FLAG = ? AND E1.MF_ID_FRAG_FLAG = ? - UNION ALL - SELECT DISTINCT E2.MF_ID_FRAG AS MF_ID, E2.MZ_ID_FRAG AS MZ_ID - FROM EDGES{} AS E2 - WHERE E2.MF_ID_PREC_FLAG = ? AND E2.MF_ID_FRAG_FLAG = ? - ) - WHERE MF_ID NOT - IN ( - SELECT MF_ID - FROM MF{} - WHERE MSLEVEL = 1 - ) - AND MZ_ID - IN ( - SELECT MZ_ID_PREC - FROM MZ_PREC_FRAG{} - ) - GROUP BY MF_ID - HAVING C < 2 - """.format(prefix, prefix, prefix, prefix), (loop, loop, loop, loop, )) - records = cursor.fetchall() - for record in records: - if not is_time_left(start_time, time_limit): - break + WHERE MF{}.MF_ID = ? AND MF{}.PRECURSOR = 1 AND MF{}.FLAG = ? + """.format(prefix, prefix, prefix, prefix, prefix), values) - # Set the flag for the fragment/precursor MF to a minus value (i.e. -flag) - # If all related fragment masses (if exists) have no MF annotation + conn.commit() + # UPDATE COLUMN FLAG IN TABLE EDGES FOR PRECURSOR, NEUTRAL LOSS AND FRAGMENT BASED ON MF TABLE - values = (-loop, record[0], loop, ) - cursor.execute(""" - UPDATE MF{} - SET FLAG = ? - WHERE MF{}.MF_ID = ? AND MF{}.PRECURSOR = 1 AND MF{}.FLAG = ? - """.format(prefix, prefix, prefix, prefix, prefix), values) + cursor.execute(""" + UPDATE EDGES{} + SET MF_ID_FRAG_FLAG = (SELECT FLAG FROM MF{} WHERE MF_ID = MF_ID_FRAG) + WHERE + EXISTS ( + SELECT FLAG + FROM MF{} + WHERE MF_ID = MF_ID_FRAG + ); + """.format(prefix, prefix, prefix)) - conn.commit() - # UPDATE COLUMN FLAG IN TABLE EDGES FOR PRECURSOR, NEUTRAL LOSS AND FRAGMENT BASED ON MF TABLE - if not is_time_left(start_time, time_limit): - break + elapsed_time = time.time() - start_time - cursor.execute(""" - UPDATE EDGES{} - SET MF_ID_FRAG_FLAG = (SELECT FLAG FROM MF{} WHERE MF_ID = MF_ID_FRAG) - WHERE - EXISTS ( - SELECT FLAG - FROM MF{} - WHERE MF_ID = MF_ID_FRAG - ); - """.format(prefix, prefix, prefix)) + cursor.execute(""" + UPDATE EDGES{} + SET MF_ID_NL_FLAG = ( + SELECT FLAG + FROM MF{} + WHERE MF_ID = MF_ID_NL + ) + WHERE + EXISTS ( + SELECT FLAG + FROM MF{} + WHERE MF_ID = MF_ID_NL + ); + """.format(prefix, prefix, prefix)) - elapsed_time = time.time() - start_time + cursor.execute(""" + UPDATE EDGES{} + SET MF_ID_PREC_FLAG = ( + SELECT FLAG + FROM MF{} + WHERE MF_ID = MF_ID_PREC + ) + WHERE + EXISTS( + SELECT FLAG + FROM MF{} + WHERE MF_ID = MF_ID_PREC + ); + """.format(prefix, prefix, prefix)) + conn.commit() + ################################################################################################### - cursor.execute(""" - UPDATE EDGES{} - SET MF_ID_NL_FLAG = ( - SELECT FLAG - FROM MF{} - WHERE MF_ID = MF_ID_NL - ) - WHERE - EXISTS ( - SELECT FLAG - FROM MF{} - WHERE MF_ID = MF_ID_NL - ); - """.format(prefix, prefix, prefix)) + values = (-loop, loop - 1,) + cursor.execute(""" + UPDATE MF{} + SET FLAG = ? + WHERE FLAG = ?""".format(prefix), values) + conn.commit() + cursor.execute(""" + SELECT COUNT(MF_ID) + FROM MF{} + WHERE FLAG >= 0 + """.format(prefix)) + improvement.append(cursor.fetchone()[0]) - cursor.execute(""" - UPDATE EDGES{} - SET MF_ID_PREC_FLAG = ( - SELECT FLAG - FROM MF{} - WHERE MF_ID = MF_ID_PREC - ) - WHERE - EXISTS( - SELECT FLAG - FROM MF{} - WHERE MF_ID = MF_ID_PREC - ); - """.format(prefix, prefix, prefix)) - conn.commit() - ################################################################################################### - - values = (-loop, loop - 1, ) - cursor.execute(""" - UPDATE MF{} - SET FLAG = ? - WHERE FLAG = ?""".format(prefix), values) - conn.commit() + print("MF AFTER CONSTRAINS: ", ", ".join(map(str, improvement))) - cursor.execute(""" - SELECT COUNT(MF_ID) - FROM MF{} - WHERE FLAG >= 0 - """.format(prefix)) - improvement.append(cursor.fetchone()[0]) - - if not is_time_left(start_time, time_limit): - print('Time limit reached - no annotated tree for graph {}'.format(G.graph["id"])) - continue + mf_tree_out = mf_tree(G, path_db, max_mslevel, prefix) - print("MF AFTER CONSTRAINS: ", ", ".join(map(str, improvement))) + conn.close() - mf_tree_out = mf_tree(G, path_db, max_mslevel, prefix, time_limit, start_time) + if mf_tree_out: + return mf_tree_out + else: + return '' - if mf_tree_out: - annotated_trees.extend(mf_tree_out) - conn.close() - return annotated_trees def print_formula(atom_counts: dict): diff --git a/tests/test_annotation.py b/tests/test_annotation.py index ce7d258..9b03dcb 100644 --- a/tests/test_annotation.py +++ b/tests/test_annotation.py @@ -49,13 +49,14 @@ def setUpClass(cls): # zip_ref.close() filename_01 = to_test_data("pos_21-hydroxyprogesterone_subset.mzML") - groups = group_scans(to_test_results(filename_01), 2, min_replicates=1, + cls.groups = group_scans(to_test_results(filename_01), 2, min_replicates=1, max_injection_time=0.0, merge_ms1=False) - pls = process_scans(to_test_results(filename_01), groups=groups, function_noise="median", snr_thres=3.0, + cls.pls = process_scans(to_test_results(filename_01), groups=cls.groups, function_noise="median", snr_thres=3.0, ppm=5.0, min_fraction=0.5, rsd_thres=30.0, normalise=True, report=to_test_results("processing_01.txt"), block_size=5000, ncpus=None) - cls.trees = create_spectral_trees(groups, pls) + + cls.trees = create_spectral_trees(cls.groups, cls.pls) adducts = {"[M+H]+": 1.0072764} cls.db_single_adduct = to_test_results("test_mf_single_adduct.sqlite") @@ -146,16 +147,40 @@ def test_filter_mf(self): def test_time_limit_annotate(self): - ann_time = annotate_mf(spectral_trees=self.trees, db_out=self.db_single_adduct, ppm=10.0, - adducts={"[M+H]+": 1.0072764}, rules=True, - mf_db="http://mfdb.bham.ac.uk", time_limit=0.001) + trees = create_spectral_trees(self.groups, self.pls) + db_out = to_test_results("test_time_limit_annotate.sqlite") + ann_time = annotate_mf(spectral_trees=trees, db_out=db_out, + ppm=10.0, adducts={"[M+H]+": 1.0072764}, rules=True, + mf_db="http://mfdb.bham.ac.uk", time_limit=1) # check no annotation performed and we output un-annotated trees + self.assertEqual(len(ann_time), 1) + self.assertDictEqual(ann_time[0].nodes["331.2271_0_7"], {'mz': 331.22706604003906, 'intensity': 205494704.0, + 'header': 'FTMS + c NSI Full ms [327.22-333.22]', + 'mslevel': 1, 'precursor': True}) - def test_time_limit_mf_filter(self): - filter_ann_time = filter_mf(self.trees, self.db_single_adduct_filtered, time_limit=0.001) + conn = connect(db_out) + cursor = conn.cursor() + cursor.execute("select * from MF_1") + records = cursor.fetchall() + + self.assertEqual(records, []) + def test_time_limit_mf_filter(self): + trees = create_spectral_trees(self.groups, self.pls) + db_out = to_test_results("test_time_limit_filter.sqlite") + ann_time = annotate_mf(spectral_trees=trees, db_out=db_out, ppm=10.0, + adducts={"[M+H]+": 1.0072764}, rules=True, + mf_db="http://mfdb.bham.ac.uk") + filter_ann_time = filter_mf(ann_time, db_out, time_limit=1) # check no annotation performed and we output un-annotated trees + self.assertEqual(len(filter_ann_time), 6) + + self.assertEqual(filter_ann_time[0].number_of_nodes(), 9) + self.assertEqual(filter_ann_time[0].number_of_edges(), 8) + + self.assertEqual(filter_ann_time[5].number_of_nodes(), 37) + self.assertEqual(filter_ann_time[5].number_of_edges(), 36) def test_mf_tree(self):