-
Notifications
You must be signed in to change notification settings - Fork 0
/
Confounder_Correction_Classes.py
486 lines (368 loc) · 20.1 KB
/
Confounder_Correction_Classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# -*- coding: utf-8 -*-
"""confounder_correction_classes_updated_with_modmean.ipynb
Automatically generated by Colaboratory.
"""
#Drive
#from google.colab import drive
#drive.mount('/content/drive')
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
import neurocombat_modified_fun # Put this function in same path or import from specific location
from neurocombat_modified_fun import neuroCombat_estimate, neuroCombat_transform
from sklearn.preprocessing import StandardScaler
# 1) STANDARD SCALER CLASSE: to use with dictionaries {data, covariates}
class StandardScalerDict(BaseEstimator, TransformerMixin):
"""
StandardScaler class compatible with dictionary data format of type {data, covariates}
std_data : boolean, set True or False to standardize "data";
std_cov : boolean, set True or False to standardize "covariates";
output_dict : boolean, set True or False to output dictionary {data, covariates} or just the data;
ref_batch : numpy int, take a reference site ID (such as in M-ComBat) as the data to estimate standardization statistics
Example:
std = StandardScalerDict( std_data=True, std_cov= False, output_dict=True, ref_batch= 5 )
std.fit( data_dict)
stand_data_dict = std.transform( data_dict)
"""
def __init__(self,ref_batch,std_data,std_cov,output_dict):
self.std_data=std_data
self.std_cov=std_cov
self.output_dict=output_dict
self.ref_batch=ref_batch
def check_data(self,X):
if type(X) is not dict:
data=X.copy()
X={}
X["data"]=data
print(X)
print('Data was not type dictonary. The output will be set to numpy() data only')
self.output_dict=False
self.std_cov=None
return X
def fit(self, X, y=None):
X=self.check_data(X)
print('SS: Shape of the whole dataset {}'.format(X['data'].shape))
batch_col=['batch']
if self.ref_batch is not None:
X_fit={}
print('SS: Selecting only the reference data')
ref_batch_index=X["covariates"][X["covariates"][batch_col[0]]==self.ref_batch].index
X_fit["data"]=X["data"][ref_batch_index,:].copy()
print('SS: Shape of reference dataset {}'.format(X_fit['data'].shape))
if self.std_data==True:
self.scaler_data_ = StandardScaler().fit(X_fit["data"])
if self.std_cov==True:
X_fit["covariates"]=X["covariates"].iloc[ref_batch_index,:].copy()
self.scaler_cov_ = StandardScaler().fit(X_fit["covariates"])
else:
if self.std_data==True:
self.scaler_data_ = StandardScaler().fit(X["data"])
if self.std_cov==True:
self.scaler_cov_ = StandardScaler().fit(X["covariates"])
return self
def transform(self,X):
X=self.check_data(X)
print('SS: Transforming dataset shape {}'.format(X['data'].shape))
if self.std_data==True:
X_std= self.scaler_data_.transform(X["data"])
else:
X_std=X["data"]
if self.std_cov==True:
COV_std= self.scaler_cov_.transform(X["covariates"])
elif self.std_cov==False:
COV_std= X["covariates"]
if self.output_dict==True:
output={"data":X_std, "covariates": COV_std}
else:
output=X_std
return output
#2) COMBAT HARMONIZATION CLASSE: integration of neurocombat function into a python classe
# Limitations:
# Not yet compatible with Random/GridSearchCV from keras
# Cannot choose a CV method different from hold-out
# It can harmonize globals like Total Intracranial Volume (TIV) (which are included in covariates) if feat_of_no_interest is given,
# at the same step in which harmonizes features of interest, BUT only using the fit_transform method: using fit() method and then transform() method harmonized TIV 2xtimes
class ComBatHarmonization(BaseEstimator, TransformerMixin):
"""
Multi-site neurocombat harmonization compatible with sklearn Pipelines
cv_method : None # Under development
ref_batch : int ID of reference batch/site for M-ComBat OR None for normal ComBat
regression_fit : boolean, set True or False to output data as dictionary {data_harmonized, covariates} or just data_harmonized
feat_detail : a dictionary of dictionaries for describing features details for harmonization - first level the feature name to harmonize,
second level the corresponding columns or rows id's in data and specific categorical and continuous biological covariates to include in combat
eg. features_dictionary = {'cortical_thickness': {'id': [columns/row index array], 'categorical': ['Gender'], 'continuous':['Age']},
'volumes' : {'id': [columns/row index array], 'categorical': ['Gender'], 'continuous':['Age', 'TIV]}}
feat_of_no_interest : None - advice not to use this attribute, dictionary to include covariates of no interest that need prior harmonization
before harmonizing features of interest (eg Total intracranial volume should be included as biocovariate in the ComBat model
but first should be harmonized because it's a feature derived from the image itself, unlike age or sex.
Example:
combat_model = ComBatHarmonization(cv_method=None, ref_batch=6, regression_fit=False, feat_detail = features_dictionary , feat_of_no_interest=None)
training_harmonized = combat_model.fit( training_dict )
test_harmonized = combat_model.transform( test_dict)
"""
def __init__(self, cv_method, ref_batch, regression_fit, feat_detail, feat_of_no_interest):
""" Creates a new ComBat """
# Attribute that defines the CV method
# Dictionary: cv_strategy or holdout_strategy
self.cv_method=cv_method # for the moment only None can be given
self.ref_batch=ref_batch # reference batch/site if there is one, OR None
self.regression_fit=regression_fit # output data as dictionary {data, covariates}
self.feat_detail=feat_detail # information about features of interest (brain features: CT, volumes etc) to be harmonized
self.feat_of_no_interest=feat_of_no_interest # information about features of NO interest to be harmonized (globals: TIV, total surface area,etc)
def extract_data(self,X):
"""
Function is called to extract data since X.
X dictonary contains the data (samples x features) and
covariates (samples x covariates) .
"""
global X_data, covars_fit_sorted
if type(X) is dict: # data should be given as {'data': array_of_data ; 'covariates': dataframe_of_covariates}
X_data=X['data'].copy()
covars_fit_sorted=X['covariates']
elif self.cv_method: # For now only supports None -> next implementation to work with sklearn GridSearchCV and other CV objects
X_data=X.copy()
index=list(X_data.index.values)
X_data=X_data.to_numpy()
covars_fit_sorted=self.cv_method['covariates'].iloc[index,:]
return X_data, covars_fit_sorted
def check_feat_no_int_harmonization(self,X, covariates, batch=None): # the function to first harmonize features-of-no-interest which are covariates
if self.feat_of_no_interest: # Were given as input? Is there covariates to harmonize before harmonizing brain features of interest?
if hasattr(self, 'n_features_'): # If was already fitted
print('Applying estimations') # this step is applying combat parameters already estimated
cov_id=self.feat_of_no_interest['covariate']['id']
cov_to_harm=covars_fit_sorted[[cov_id]].to_numpy()
feat_concat=self.feat_of_no_interest['feat_concat']
X_new=np.concatenate((X[:,feat_concat].copy(),cov_to_harm),axis=1)
categorical_cols=self.feat_of_no_interest['covariate']['categorical']
continuous_cols=self.feat_of_no_interest['covariate']['continuous']
batch_col=['batch']
X_feat_harm=neuroCombat_transform(dat=np.transpose(X_new), covars=covariates, batch_col=batch_col,
cat_cols=categorical_cols, num_cols=continuous_cols,
estimates=self.combat_estimations_[cov_id])["data"]
cov_harm=np.transpose(X_feat_harm)[:,-cov_to_harm.shape[1]:]
covariates.loc[:,(cov_id)]=cov_harm # Covariates are changed 'inplace'
else: # Not fitted
print('Fitting the regressor')
self.combat_estimations_={} # initilize new attribute
cov_id=self.feat_of_no_interest['covariate']['id'] # extract id of covariate to harmonize
cov_to_harm=covars_fit_sorted[[cov_id]].to_numpy().copy()
feat_concat=self.feat_of_no_interest['feat_concat'] # extract id features to harmonize
X_new=np.concatenate((X[:,feat_concat].copy(),cov_to_harm),axis=1) # concat
categorical_cols=self.feat_of_no_interest['covariate']['categorical']
continuous_cols=self.feat_of_no_interest['covariate']['continuous']
batch_col=['batch']
if self.ref_batch is not None: # if a reference batch was given apply M-ComBat
cov_feat_combat=neuroCombat_estimate(dat=np.transpose(X_new),covars=covariates,
batch_col=batch_col, categorical_cols=categorical_cols,
continuous_cols=continuous_cols, ref_batch=self.ref_batch)
self.combat_estimations_[cov_id]=cov_feat_combat["estimates"]
else: # else apply original ComBat
cov_feat_combat=neuroCombat_estimate(dat=np.transpose(X_new),covars=covariates,
batch_col=batch_col, categorical_cols=categorical_cols,
continuous_cols=continuous_cols)
self.combat_estimations_[cov_id]=cov_feat_combat["estimates"]
#X_feat_harm=cov_feat_combat["data"]
#cov_harm=np.transpose(X_feat_harm)[:,-cov_to_harm.shape[1]:]
return X
def check_feat_harmonization(self,X,covariates, batch=None):
output=[]
if self.feat_detail:
if hasattr(self, 'n_features_'): # if it was fitted
('it was fitted, it enter in transform')
X_harm=[]
for key_1,val_1 in self.feat_detail.items():
id=self.feat_detail[key_1]['id']
X_feat=X[:,id].copy()
categorical_cols=self.feat_detail[key_1]['categorical']
continuous_cols=self.feat_detail[key_1]['continuous']
batch_col=['batch']
#dat,covars, batch_col, cat_cols, num_cols,estimates
X_feat_harm=neuroCombat_transform(dat=np.transpose(X_feat), covars=covariates, batch_col=batch_col,
cat_cols=categorical_cols,num_cols=continuous_cols,
estimates=self.combat_estimations_[key_1])["data"]
X_harm.append(np.transpose(X_feat_harm)) #list with (samples x feat_i) with final len = feat_of_int
output=np.concatenate(X_harm, axis=1) # harm data (samples x feat_all)
else: #if was not fitted
batch_col=['batch']
self.combat_estimations_={}
for key_1,val_1 in self.feat_detail.items():
id=self.feat_detail[key_1]['id']
X_feat=X[:,id].copy()
categorical_cols=self.feat_detail[key_1]['categorical']
continuous_cols=self.feat_detail[key_1]['continuous']
if self.ref_batch is not None:
combat=neuroCombat_estimate(dat=np.transpose(X_feat),covars=covariates,
batch_col=batch_col,
categorical_cols=categorical_cols,
continuous_cols=continuous_cols, ref_batch=self.ref_batch)
self.combat_estimations_[key_1]=combat["estimates"]
else:
combat=neuroCombat_estimate(dat=np.transpose(X_feat),covars=covariates,
batch_col=batch_col,
categorical_cols=categorical_cols,
continuous_cols=continuous_cols)
self.combat_estimations_[key_1]=combat["estimates"]
return output # If it was already fitted, returns harm data, otherwise empty
def fit(self, X, y=None):
X, covars_fit_sorted=self.extract_data(X)
X = check_array(X, accept_sparse=True)
if self.feat_of_no_interest: # To harmonize TIV or other globals in covariates
ouput=self.check_feat_no_int_harmonization(X, covars_fit_sorted)
if self.feat_detail:
output=self.check_feat_harmonization(X,covars_fit_sorted)
self.n_features_ = X.shape[1] # For the check_is_fitted method
return self
def transform(self, X):
# Check is fit had been called
check_is_fitted(self)
X, covars_fit_sorted=self.extract_data(X)
#batch_trans_sorted=covars_fit_sorted[['batch']] # We only need the batch
X = check_array(X, accept_sparse=True)
if self.feat_of_no_interest:
output=self.check_feat_no_int_harmonization(X, covars_fit_sorted)
#The output should be X -> the input data because the covariates are changed inplace
if self.feat_detail:
output=self.check_feat_harmonization(X, covars_fit_sorted)
# The ouput should be the X_harm
if self.regression_fit==1:
output={'data': output, 'covariates': covars_fit_sorted}
return output
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
import statsmodels.api as sm
from scipy import stats
#GIVE COVARIATES IN INPUT WITH DICTIONARY
"""
X_feat are not standardize here, why? because we fit the regression for
feature type separatly: Ycort= Bage + B_sex / Y_vol= B_age + B_sex + B_TIV
But we standardize before and after when defining the pipeline: is it necessary before?
To do: Include diagnosis in covariates to estimate Betas but don't remove it
Include the partial correlation option for it?
"""
class BiocovariatesRegression(BaseEstimator, TransformerMixin):
def __init__(self, ref_batch, cv_method, feat_detail, output_dict):
self.feat_detail=feat_detail
self.cv_method=cv_method
self.ref_batch=ref_batch
self.output_dict=output_dict
def extract_data(self,X):
"""
Function is called to extract data since X.
X dictonary containes the data (samples x features) and
covariates (samples x covariates).
"""
global X_data, covars_fit_sorted
if type(X) is dict:
X_data=X['data'].copy()
covars_fit_sorted=X['covariates']
elif self.cv_method:
X_data=X.copy()
index=list(X_data.index.values)
X_data=X_data.to_numpy()
covars_fit_sorted=self.cv_method['covariates'].iloc[index,:]
return X_data, covars_fit_sorted
def fit(self,X, y=None):
X, covars_fit_sorted=self.extract_data(X)
X = check_array(X, accept_sparse=True)
batch_col=['batch']
print('LR: Shape of the whole dataset {}'.format(X.shape))
# In case self.ref_batch exists we want to fit the LR only
# on the reference batch
if self.ref_batch is not None:
ref_batch_index=covars_fit_sorted[covars_fit_sorted[batch_col[0]]==self.ref_batch].index
X_ref=X[ref_batch_index,:].copy()
covars_fit_sorted_ref=covars_fit_sorted[covars_fit_sorted[batch_col[0]]==self.ref_batch].copy()
print('LR:Fitting on reference dataset of shape {}'.format(X_ref.shape))
self.scaler_ = StandardScaler().fit(X_ref)
#X=self.scaler_.transform(X)
self.beta_estimations_={}
self.mod_mean_={}
for key_1,val_1 in self.feat_detail.items():
print(key_1)
id=self.feat_detail[key_1]['id'] #cortical/volumes etc
X_feat=X_ref[:,id].copy()
categorical_cols=self.feat_detail[key_1]['categorical']
continuous_cols=self.feat_detail[key_1]['continuous']
biocov=categorical_cols + continuous_cols
C= sm.add_constant(covars_fit_sorted_ref[biocov]) #If we want to add a constant to our model
est = sm.OLS(X_feat,C)
estimates=est.fit()
self.beta_estimations_[key_1] = estimates.params.to_numpy() #.loc[biocov]
self.mod_mean_[key_1]=np.dot(C.to_numpy(), self.beta_estimations_[key_1])
else:
self.scaler_ = StandardScaler().fit(X)
#X=self.scaler_.transform(X)
self.beta_estimations_={}
self.mod_mean_={}
for key_1,val_1 in self.feat_detail.items():
print(key_1)
id=self.feat_detail[key_1]['id'] #cortical/volumes etc
X_feat=X[:,id].copy()
categorical_cols=self.feat_detail[key_1]['categorical']
continuous_cols=self.feat_detail[key_1]['continuous']
biocov=categorical_cols + continuous_cols
C= sm.add_constant(covars_fit_sorted[biocov]) #If we want to add a constant to our model
est = sm.OLS(X_feat,C)
estimates=est.fit()
self.beta_estimations_[key_1] = estimates.params.to_numpy() #.loc[biocov]
self.mod_mean_[key_1]=np.dot(C.to_numpy(), self.beta_estimations_[key_1])
return self
def transform(self, X):
# Check is fit had been called
check_is_fitted(self)
X, covars_fit_sorted=self.extract_data(X)
X = check_array(X, accept_sparse=True)
batch_col=['batch']
print('LR: Transforming dataset shape {}'.format(X.shape))
#X=self.scaler_.transform(X)
X_corr_partial=[]
for key_1,val_1 in self.feat_detail.items():
id=self.feat_detail[key_1]['id'] #cortical/volumes etc
X_feat=X[:,id].copy()
categorical_cols=self.feat_detail[key_1]['categorical']
continuous_cols=self.feat_detail[key_1]['continuous']
biocov=categorical_cols + continuous_cols
C= sm.add_constant(covars_fit_sorted[biocov], has_constant='add') #I need a way to have covariates here for either train or test
new_mod_mean=np.dot(C.to_numpy(), self.beta_estimations_[key_1]) # current mod_mean
# ref-LR
if self.ref_batch is not None:
print('Reference LR was used')
ref_batch_index=covars_fit_sorted[covars_fit_sorted[batch_col[0]]==self.ref_batch].index
if new_mod_mean[ref_batch_index,:].shape==self.mod_mean_[key_1].shape:
if np.all(new_mod_mean[ref_batch_index,:]==self.mod_mean_[key_1]):
#if the mod_mean_from_ref is the same as the one saved previously it means we are transforming ref
print('Correcting the data used for estimation and other sets')
X_corr=np.zeros((new_mod_mean.shape[0], new_mod_mean.shape[1]))
X_ref_corr=X_feat[ref_batch_index,:] - self.mod_mean_[key_1] # correcting the reference original data
mask = np.ones(len(X_feat), dtype=bool)
mask[ref_batch_index] = False
X_other=X_feat[mask] # selecting the other data not reference
mod_mean_from_ref=np.repeat(self.mod_mean_[key_1].mean(axis=0).reshape(1,-1),X_other.shape[0], axis=0)
X_other_corr= X_other - mod_mean_from_ref # correcting the other data with mod_mean_ref.mean()
X_corr[ref_batch_index,:]=X_ref_corr
X_corr[mask,:]=X_other_corr
print(X_corr)
X_corr_partial.append(X_corr)
else: # doesn't include original reference data used in estimation
print('Correcting new data')
mod_mean_from_ref=np.repeat(self.mod_mean_[key_1].mean(axis=0).reshape(1,-1),X_feat.shape[0], axis=0)
X_corr_partial.append(X_feat - mod_mean_from_ref)
else: # if not it means we are transforming a test set and so we have to use the mean_mod_mean
print('Correcting new data')
mod_mean_from_ref=np.repeat(self.mod_mean_[key_1].mean(axis=0).reshape(1,-1),X_feat.shape[0], axis=0)
X_corr_partial.append(X_feat - mod_mean_from_ref)
else: # Normal LR - no reference was used for estimations
print('Normal LR was used')
if np.all(new_mod_mean==self.mod_mean_[key_1]): # We are in the case of transforming the training set
print('Correcting the training set with its betas and covariates')
X_corr_partial.append(X_feat - np.dot(C.to_numpy(),self.beta_estimations_[key_1]))
else: # We are in the case of transforming the test set
print('Correcting new data with training set mod_mean.mean()')
mod_mean_from_training=np.repeat(self.mod_mean_[key_1].mean(axis=0).reshape(1,-1),X_feat.shape[0], axis=0)
X_corr_partial.append(X_feat - mod_mean_from_training)
X_corr=np.concatenate(X_corr_partial, axis=1)
if self.output_dict==True:
output={"data":X_corr, "covariates": covars_fit_sorted}
else:
output=X_corr
return output