-
Notifications
You must be signed in to change notification settings - Fork 1
/
CCSCalibrator.py
103 lines (86 loc) · 3.65 KB
/
CCSCalibrator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import pandas as pd
import numpy as np
from utils import is_in_tolerance
__N2__ = 28.01340
__He__ = 4.002602
"""
calibrate CCS from calibrants for ion mobility data (IMS and SLIM)
"""
class CCSCalibrator:
"""Calibrator to estimate CCS values"""
def __init__(self, calibrant_file, neutral_mass=28.01340):
self.neutral_mass = neutral_mass
self.calibrant_file = calibrant_file
self.calibrate_fn = None
self.calibrated_features = None
self.calibrants = self.ready()
def polyfit(self, x, y, deg=1):
"""curve fitting by the polynomial function to get the calibration function
"""
poly = np.poly1d(np.polyfit(x, y, deg))
self.calibrate_fn = poly
# r-squared (double-checked with sklearn.metrics.r2_score)
# fit values, and mean
yhat = poly(x)
ybar = np.sum(y) / len(y)
ssreg = np.sum((yhat - ybar) ** 2)
sstot = np.sum((y - ybar) ** 2)
r = ssreg / sstot
print("r2: {}".format(r))
return poly, r
def powerfit(self, x, y, t0, deg=1):
"""curve fitting by the linearized power function to get the calibration function
"""
lnx = np.log(x-t0)
lny = np.log(y)
poly = np.poly1d(np.polyfit(lnx, lny, deg))
self.calibrate_fn = poly
# r-squared
# fit values, and mean
yhat = poly(lnx)
ybar = np.sum(lny) / len(lny)
ssreg = np.sum((yhat - ybar) ** 2)
sstot = np.sum((lny - ybar) ** 2)
r = ssreg / sstot
print("r2: {}".format(r))
return poly, r
def ready(self):
"""rread calibrants and add reduced mass and ccs values based on the drift gas
"""
calibrants = pd.read_csv(self.calibrant_file, sep='\t')
calibrants['reduced_mass'] = (calibrants['m/z'] * calibrants.z * self.neutral_mass) / (
self.neutral_mass + calibrants['m/z'] * calibrants.z)
calibrants['reduced_ccs'] = calibrants.CCS / (calibrants.z / (calibrants.reduced_mass ** 0.5))
return calibrants
def find_calibrant_features(self, features, ppm=20, ionization='pos'):
"""find the features"""
selected = pd.DataFrame()
if self.calibrants.shape[0] > 0:
for row in self.calibrants[self.calibrants.Ionization.str.upper() == ionization.upper()].iterrows():
mz = row[1]['m/z']
reduced_ccs = row[1]['reduced_ccs']
ccs = row[1]['CCS']
_ff = features[is_in_tolerance(features.mz, mz, ppm) & (features.z == abs(row[1]['z']))]
_ff = _ff.assign(calibrants_mz=mz, ccs=ccs, reduced_ccs=reduced_ccs)
if selected.shape[0] == 0:
selected = _ff
else:
selected = selected.append(_ff)
if selected.empty:
return None
# TODO: now simply select the most intensive peaks
temp = selected.sort_values(by='intensity_org').drop_duplicates(subset=['calibrants_mz'], keep='last')
return temp
else:
return None
def calibrate(self, features, calibrate_fn):
"""calibrate CCS
"""
features['reduced_mass'] = (features['m/z'] * features.z * self.neutral_mass) / (
self.neutral_mass + features['m/z'] * features.z)
features['reduced_ccs'] = calibrate_fn(features.dt)
features['calibrated_ccs'] = features.reduced_ccs * (features.z / (features.reduced_mass ** 0.5))
self.calibrated_features = features
return features
def export(self, features, fout, export_fn):
export_fn(features, fout)