-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Lan Le
committed
Aug 27, 2024
1 parent
12bff6f
commit 2ee2616
Showing
7 changed files
with
360 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
import tempfile # noqa: E402 | ||
|
||
from chem_spectra.lib.composer.base import BaseComposer # noqa: E402 | ||
|
||
|
||
TEXT_SPECTRUM_ORIG = '$$ === CHEMSPECTRA SPECTRUM ORIG ===\n' | ||
TEXT_MS_DATA_TABLE = '##DATA TABLE= (XY..XY), PEAKS\n' # '##XYDATA= (X++(Y..Y))\n' # noqa | ||
|
||
class LCMSComposer(BaseComposer): | ||
def __init__(self, core): | ||
super().__init__(core) | ||
self.title = core.fname | ||
self.meta = self.__compose() | ||
|
||
def __gen_headers_spectrum_orig(self): | ||
return [ | ||
'\n', | ||
TEXT_SPECTRUM_ORIG, | ||
'##TITLE={}\n'.format(self.title), | ||
'##JCAMP-DX=5.00\n', | ||
'##DATA TYPE={}\n'.format('LC/MS'), | ||
'##DATA CLASS= NTUPLES\n', | ||
'##ORIGIN=\n', | ||
'##OWNER=\n', | ||
'##SPECTROMETER/DATA SYSTEM=\n', | ||
# '##.SPECTROMETER TYPE={}\n'.format(self.core.dic.get('SPECTROMETER TYPE', '')), # TRAP # noqa: E501 | ||
# '##.INLET={}\n'.format(self.core.dic.get('INLET', '')), # GC | ||
# '##.IONIZATION MODE={}\n'.format(self.core.dic.get('IONIZATION MODE', '')), # EI+ # noqa: E501 | ||
'##$CSCATEGORY=SPECTRUM\n', | ||
# '##$CSSCANAUTOTARGET={}\n'.format(self.core.auto_scan), | ||
# '##$CSSCANEDITTARGET={}\n'.format( | ||
# self.core.edit_scan or self.core.auto_scan | ||
# ), | ||
# '##$CSSCANCOUNT={}\n'.format(len(self.core.datatables)), | ||
# '##$CSTHRESHOLD={}\n'.format(self.core.thres / 100), | ||
] | ||
|
||
def __gen_ntuples_begin(self): | ||
return ['##NTUPLES={}\n'.format('MASS SPECTRUM')] | ||
|
||
def __gen_ntuples_end(self): | ||
return ['##END NTUPLES={}\n'.format('MASS SPECTRUM')] | ||
|
||
def __gen_config(self): | ||
return [ | ||
'##VAR_NAME= MASS, INTENSITY, RETENTION TIME\n', | ||
'##SYMBOL= X, Y, T\n', | ||
'##VAR_TYPE= INDEPENDENT, DEPENDENT, INDEPENDENT\n', | ||
'##VAR_FORM= AFFN, AFFN, AFFN\n', | ||
'##VAR_DIM= , , 3\n', | ||
'##UNITS= M/Z, RELATIVE ABUNDANCE, SECONDS\n', | ||
'##FIRST= , , 1\n', | ||
# '##LAST= , , {}\n'.format(len(self.core.datatables)), | ||
] | ||
|
||
def __gen_ms_spectra(self): | ||
msspcs = [] | ||
ms_tempfile = tempfile.TemporaryFile() | ||
for time, value in self.core.data.items(): | ||
xs, ys = value['mz'], value['intensities'] | ||
msspc = [ | ||
'##PAGE={}\n'.format(time), | ||
'##NPOINTS={}\n'.format(len(value['mz'])), | ||
'##DATA TABLE= (XY..XY), PEAKS\n', | ||
] | ||
for idx in range(len(xs)): | ||
my_content = '{}, {};\n'.format(xs[idx], ys[idx]) | ||
msspc += my_content | ||
file_content = ''.join(msspc) | ||
ms_tempfile.write(file_content.encode('utf-8')) | ||
|
||
ms_tempfile.seek(0) | ||
lines = ms_tempfile.readlines() | ||
decoded_lines = [line.decode('utf-8').strip() for line in lines] | ||
msspcs = '\n'.join(decoded_lines) | ||
ms_tempfile.close() | ||
return msspcs | ||
|
||
def __compose(self): | ||
meta = [] | ||
meta.extend(self.__gen_headers_spectrum_orig()) | ||
|
||
meta.extend(self.__gen_ntuples_begin()) | ||
meta.extend(self.__gen_config()) | ||
meta.extend(self.__gen_ms_spectra()) | ||
meta.extend(self.__gen_ntuples_end()) | ||
|
||
# meta.extend(self.generate_original_metadata()) | ||
|
||
meta.extend(self.gen_ending()) | ||
return meta | ||
|
||
# def __prism(self, spc): | ||
# blues_x, blues_y, greys_x, greys_y = [], [], [], [] | ||
# thres = 0 | ||
# if spc.shape[0] > 0: # RESOLVE_VSMBNAN2 | ||
# thres = spc[:, 1].max() * (self.core.thres / 100) | ||
|
||
# for pt in spc: | ||
# x, y = pt[0], pt[1] | ||
# if y >= thres: | ||
# blues_x.append(x) | ||
# blues_y.append(y) | ||
# else: | ||
# greys_x.append(x) | ||
# greys_y.append(y) | ||
# return blues_x, blues_y, greys_x, greys_y | ||
|
||
# def prism_peaks(self): | ||
# idx = (self.core.edit_scan or self.core.auto_scan) - 1 | ||
# spc = self.core.spectra[idx] | ||
# return self.__prism(spc) + tuple([idx+1]) | ||
|
||
def tf_img(self): | ||
# plt.rcParams['figure.figsize'] = [16, 9] | ||
# plt.rcParams['font.size'] = 14 | ||
# # PLOT data | ||
# blues_x, blues_y, greys_x, greys_y, _ = self.prism_peaks() | ||
# plt.bar(greys_x, greys_y, width=0, edgecolor='#dddddd') | ||
# plt.bar(blues_x, blues_y, width=0, edgecolor='#1f77b4') | ||
|
||
# # PLOT label | ||
# plt.xlabel('X (m/z)', fontsize=18) | ||
# plt.ylabel('Y (Relative Abundance)', fontsize=18) | ||
# plt.grid(False) | ||
|
||
# # Save | ||
# tf = tempfile.NamedTemporaryFile(suffix='.png') | ||
# plt.savefig(tf, format='png') | ||
# tf.seek(0) | ||
# plt.clf() | ||
# plt.cla() | ||
# return tf | ||
return None | ||
|
||
def tf_csv(self): | ||
return None | ||
|
||
def generate_nmrium(self): | ||
return None |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import os | ||
import pandas as pd | ||
from chem_spectra.lib.converter.share import parse_params | ||
|
||
class LCMSBaseConverter: | ||
def __init__(self, target_dir, params=False, fname=''): | ||
self.params = parse_params(params) | ||
self.typ = None | ||
self.fname = fname | ||
if target_dir is None: | ||
self.data = None | ||
else: | ||
self.data = self.__read(target_dir, fname) | ||
|
||
def __read(self, target_dir, fname): | ||
spectra_file_path = os.path.join(target_dir, 'MZ_Spectra.csv') | ||
data_frame = pd.read_csv(spectra_file_path, index_col='time', header=0) | ||
grouped_df = data_frame.groupby('time').agg(list) | ||
grouped_dict = {time: {'mz': group['mz'], 'intensities': group['intensities']} for time, group in grouped_df.iterrows()} | ||
return grouped_dict | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import json | ||
import pytest | ||
import tempfile | ||
import zipfile | ||
from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter | ||
from chem_spectra.lib.composer.lcms import LCMSComposer | ||
|
||
source = './tests/fixtures/source/lcms/lcms.zip' | ||
|
||
@pytest.fixture | ||
def zip_file(): | ||
return source | ||
|
||
def test_init_lcms_composer_failed(): | ||
with pytest.raises(Exception) as error: | ||
_ = LCMSComposer(None) | ||
|
||
assert error is not None | ||
|
||
def test_init_lcms_composer_success(zip_file): | ||
with tempfile.TemporaryDirectory() as td: | ||
with zipfile.ZipFile(zip_file, 'r') as z: | ||
z.extractall(td) | ||
|
||
lcms_converter = LCMSBaseConverter(td) | ||
lcms_composer = LCMSComposer(core=lcms_converter) | ||
|
||
assert lcms_composer is not None | ||
assert lcms_composer.core == lcms_converter | ||
|
||
# def test_ms_composer_original_metadata(jcamp_file): | ||
# base_converter = JcampBaseConverter(jcamp_file) | ||
# ms_converter = JcampMSConverter(base=base_converter) | ||
# ms_composer = MSComposer(core=ms_converter) | ||
|
||
# assert ms_composer is not None | ||
# assert '$$ === CHEMSPECTRA ORIGINAL METADATA ===\n' in ms_composer.meta |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter as LCMSConveter | ||
import tempfile | ||
import zipfile | ||
import mimetypes | ||
import base64 | ||
|
||
target_dir = './tests/fixtures/source/lcms/lcms.zip' | ||
|
||
def assertFileType(file, mimeStr): | ||
assert mimetypes.guess_type(file.name)[0] == mimeStr | ||
|
||
def assertJcampContent(jcamp, field): | ||
assertFileType(jcamp, 'chemical/x-jcamp-dx') | ||
jcamp_content = str(jcamp.read()) | ||
assert field in jcamp_content | ||
|
||
def isBase64(encodeString): | ||
plainStr = base64.b64decode(encodeString) | ||
encodedStr = base64.b64encode(plainStr).decode("utf-8") | ||
assert encodedStr == encodeString | ||
|
||
def test_lcms_converter_failed(): | ||
converter = LCMSConveter(None) | ||
assert converter.data is None | ||
|
||
def test_bagit_convert_to_jcamp(): | ||
with tempfile.TemporaryDirectory() as td: | ||
with zipfile.ZipFile(target_dir, 'r') as z: | ||
z.extractall(td) | ||
|
||
converter = LCMSConveter(td, fname='lcms') | ||
assert converter.data is not None | ||
# assert len(converter.data) == 1 | ||
|
||
# def test_bagit_convert_to_jcamp_cv_layout(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(cv_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# jcamp = converter.data[0] | ||
# assertJcampContent(jcamp, '##DATA TYPE=CYCLIC VOLTAMMETRY') | ||
|
||
# def test_bagit_convert_to_jcamp_aif_layout(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(aif_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# jcamp = converter.data[0] | ||
# assertJcampContent(jcamp, '##DATA TYPE=SORPTION-DESORPTION MEASUREMENT') | ||
|
||
# def test_bagit_convert_to_jcamp_emissions_layout(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(emissions_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# jcamp = converter.data[0] | ||
# assertJcampContent(jcamp, '##DATA TYPE=Emissions') | ||
|
||
# def test_bagit_convert_to_jcamp_dls_acf_layout(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(dls_acf_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# jcamp = converter.data[0] | ||
# assertJcampContent(jcamp, '##DATA TYPE=DLS ACF') | ||
|
||
# def test_bagit_convert_to_jcamp_dls_intensity_layout(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(dls_intensity_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# jcamp = converter.data[0] | ||
# assertJcampContent(jcamp, '##DATA TYPE=DLS intensity') | ||
|
||
# def test_bagit_convert_to_images(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(cv_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# assert converter.images is not None | ||
# assert len(converter.images) == 3 | ||
# pngImage = converter.images[0] | ||
# assertFileType(pngImage, 'image/png') | ||
|
||
# def test_bagit_convert_to_csv(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(cv_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# assert converter.list_csv is not None | ||
# assert len(converter.list_csv) == 3 | ||
# csvFile = converter.list_csv[0] | ||
# assertFileType(csvFile, 'text/csv') | ||
|
||
# def test_get_base64_data_failed(): | ||
# converter = BagItConveter(None) | ||
# data = converter.get_base64_data() | ||
# assert data is None | ||
|
||
# def test_get_base64_data_succeed(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(cv_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# list_base64 = converter.get_base64_data() | ||
# assert len(list_base64) == 3 | ||
# for base64Str in list_base64: | ||
# isBase64(base64Str) | ||
|
||
# def test_get_combined_image_failed(): | ||
# converter = BagItConveter(None) | ||
# combined_image = converter.combined_image | ||
# assert combined_image is None | ||
|
||
# def test_get_combined_image(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(cv_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# combined_image = converter.combined_image | ||
# assertFileType(combined_image, 'image/png') | ||
|
||
# def test_bagit_has_one_file_no_combined_image(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(dls_acf_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# assert converter.combined_image is None | ||
|
||
# def test_bagit_convert_to_jcamp_dsc_layout(): | ||
# with tempfile.TemporaryDirectory() as td: | ||
# with zipfile.ZipFile(dsc_layout_path, 'r') as z: | ||
# z.extractall(td) | ||
|
||
# converter = BagItConveter(td) | ||
# jcamp = converter.data[0] | ||
# assertJcampContent(jcamp, '##DATA TYPE=DIFFERENTIAL SCANNING CALORIMETRY') |