diff --git a/python/lvmdrp/core/fit_profile.py b/python/lvmdrp/core/fit_profile.py index d760d9c8..93b719a7 100644 --- a/python/lvmdrp/core/fit_profile.py +++ b/python/lvmdrp/core/fit_profile.py @@ -7,11 +7,19 @@ import numpy import bottleneck as bn from scipy import interpolate, optimize, special +from astropy.modeling.models import Voigt1D, Lorentz1D +from scipy.special import wofz +def Voigt(x, x_0=0, amplitude_L=1, sigma_L=0.5, sigma_G=0.001): + if sigma_L < 0: + return x * numpy.nan + return amplitude_L * numpy.real(wofz((x - x_0 + 1j*sigma_L)/sigma_G /numpy.sqrt(2))) / sigma_G /numpy.sqrt(2*numpy.pi) fact = numpy.sqrt(2.0 * numpy.pi) +amp_fwhm = 2*numpy.sqrt(2*numpy.log(2)) + class SpectralResolution(object): def __init__(self, res=None): @@ -739,6 +747,203 @@ def _guess_par(self, x, y): def __init__(self, par): fit_profile1D.__init__(self, par, self._profile, self._guess_par) +class Voigts(fit_profile1D): + """ + A class to fit data with the Voigt function. + + ... + + Methods + ------- + _profile(x): + Returns a list of all the parameters: centroids, amplitudes, fwhm gaussians and fwhm lorentzians + """ + def _profile(self, x): + ''' + Fit data with the Voigt profile + + Parameters + ---------- + x : float + the point where the fit is going to be evaluated + + Returns + ------- + bn.nansum(y, axis=0) : list + a list of the parameters for all the x points + ''' + ncomp = len(self._par) // 4 + y = numpy.zeros((ncomp, len(x)), dtype=numpy.float32) + for i in range(ncomp): + y[i] = Voigt(x, amplitude_L=self._par[i],x_0=self._par[i+ncomp], + sigma_G=self._par[i+2*ncomp], sigma_L=self._par[i+3*ncomp]) + return bn.nansum(y, axis=0) + + def __init__(self, par): + ''' + Constructs the necessary attributes for the Voigt object. + + Parameters + ---------- + par : list + list of inicial parameters for the Voigt profile + + Returns + ------- + None + ''' + fit_profile1D.__init__(self, par, self._profile) + +class Lorentzs(fit_profile1D): + ''' + A class to fit data with the Lorentz function. + + ... + + Methods + ------- + _profile(x): + Returns a list of all the parameters: centroids, amplitudes and fwhm lorentzians + ''' + def _profile(self, x): + ''' + Fit data with the Lorentz profile + + Parameters + ---------- + x : float + the point where the fit is going to be evaluated + + Returns + ------- + bn.nansum(y, axis=0) : list + a list of the parameters for all the x points + ''' + ncomp = len(self._par) // 3 + y = numpy.zeros((ncomp, len(x)), dtype=numpy.float32) + for i in range(ncomp): + v = Lorentz1D(amplitude=self._par[i],x_0=self._par[i+ncomp], fwhm=self._par[i+2*ncomp]*amp_fwhm) + y[i] = v(x) + return bn.nansum(y, axis=0) + + def __init__(self, par): + ''' + Constructs the necessary attributes for the Lorentz object. + + Parameters + ---------- + par : list + list of inicial parameters for the Lorentz profile + + Returns + ------- + None + ''' + fit_profile1D.__init__(self, par, self._profile) + +class Voigts_width(fit_profile1D): + ''' + A class to fit the fwhms with the Voigt function. It keeps the centroids constant + + ... + + Methods + ------- + _profile(x): + Returns a list of all the parameters: centroids, amplitudes, fwhm gaussians and fwhm lorentzians + ''' + def _profile(self, x): + ''' + Fit data with the Voigt profile + + Parameters + ---------- + x : float + the point where the fit is going to be evaluate + + Returns + ------- + bn.nansum(y, axis=0) : list + a list of the parameters for all the x points. The centroids still constant + ''' + ncomp = len(self._par) // 4 + y = numpy.zeros((ncomp, len(x)), dtype=numpy.float32) + ncomp = len(self._args) + for i in range(ncomp): + self._par[i+2*ncomp] *= amp_fwhm + self._par[i+3*ncomp] *= amp_fwhm + v = Voigt1D(amplitude_L=self._par[i], x_0=self._arg[i+ncomp], fwhm_G=self._par[i+2*ncomp], fwhm_L=self._par[i+3*ncomp]) + y[i] = v(x) + return bn.nansum(y, axis=0) + + def __init__(self, par, args): + ''' + Constructs the necessary attributes for the Voigt object. + + Parameters + ---------- + par : list + list of inicial parameters for the Voigt profile that are going to be fitted + args : list + list of inicial parameters for the Voigt profile that keep constant + + Returns + ------- + None + ''' + fit_profile1D.__init__(self, par, self._profile, args=args) + +class Voigts_flux(fit_profile1D): + ''' + A class to fit the amplitudes with the Voigt function. It keeps the centroids and fwhms constant + + ... + + Methods + ------- + _profile(x): + Returns a list of all the parameters: centroids, amplitudes, fwhm gaussians and fwhm lorentzians + ''' + def _profile(self, x): + ''' + Fit data with the Voigt profile + + Parameters + ---------- + x : float + the point where the fit is going to be evaluate + + Returns + ------- + bn.nansum(y, axis=0) : list + a list of the parameters for all the x points. The centroids and fwhms still constant + ''' + ncomp = len(self._par) // 4 + y = numpy.zeros((ncomp, len(x)), dtype=numpy.float32) + ncomp = len(self._par) + for i in range(ncomp): + self._par[i+2*ncomp] *= amp_fwhm + self._par[i+3*ncomp] *= amp_fwhm + v = Voigt1D(amplitude_L=self._par[i], x_0=self._arg[i+ncomp], fwhm_G=self._arg[i+2*ncomp], fwhm_L=self._arg[i+3*ncomp]) + y[i] = v(x) + return bn.nansum(y, axis=0) + + def __init__(self, par, args): + ''' + Constructs the necessary attributes for the Voigt object. + + Parameters + ---------- + par : list + list of inicial parameters for the Voigt profile that are going to be fitted + args : list + list of inicial parameters for the Voigt profile that keep constant + + Returns + ------- + None + ''' + fit_profile1D.__init__(self, par, self._profile, args=args) class Gaussians(fit_profile1D): def _profile(self, x): @@ -752,7 +957,7 @@ def __init__(self, par): fit_profile1D.__init__(self, par, self._profile) -class Gaussians_width(fit_profile1D): +class Gaussians_width(fit_profile1D): #el orden de los parametros esta malo y sigma esta mal definido def _profile(self, x): y = numpy.zeros(len(x)) ncomp = len(self._args) diff --git a/python/lvmdrp/core/spectrum1d.py b/python/lvmdrp/core/spectrum1d.py index e14830f6..11042b76 100644 --- a/python/lvmdrp/core/spectrum1d.py +++ b/python/lvmdrp/core/spectrum1d.py @@ -2551,6 +2551,30 @@ def fitMultiGauss(self, centres, init_fwhm): gauss_multi = fit_profile.Gaussians(par) gauss_multi.fit(self._wave[select], self._data[select], sigma=error[select]) return gauss_multi, gauss_multi.getPar() + + def fitMultiVoigt(self, centres, init_fwhm_G, init_fwhm_L): + select = numpy.zeros(self._dim, dtype = "bool") + flux_in = numpy.zeros(len(centres), dtype = numpy.float32) + sig_in_G = numpy.ones_like(flux_in) * init_fwhm_G / 2.354 + sig_in_L = numpy.ones_like(flux_in) * init_fwhm_L / 2.354 + cent = numpy.zeros(len(centres), dtype = numpy.float32) + if self._error is not None: + error = self._error + else: + error = numpy.ones_like(self._dim, dtype = numpy.float32) + for i in range(len(centres)): + init_fwhm = max(init_fwhm_G, init_fwhm_L) + select_line = numpy.logical_and( + self._wave > centres[i] - 2 * init_fwhm, + self._wave < centres[i] + 2 * init_fwhm, + ) + flux_in[i] = numpy.sum(self._data[select_line]) + select = numpy.logical_or(select, select_line) + cent[i] = centres[i] + par = numpy.concatenate([flux_in, cent, sig_in_G, sig_in_L]) + voigt_multi = fit_profile.Voigts(par) + voigt_multi.fit(self._wave[select], self._data[select], sigma=error[select], maxfev = 1000000) + return voigt_multi, voigt_multi.getPar() def fitParFile( self, par, err_sim=0, ftol=1e-8, xtol=1e-8, method="leastsq", parallel="auto" diff --git a/python/lvmdrp/functions/imageMethod.py b/python/lvmdrp/functions/imageMethod.py index 587cb049..e244d62f 100644 --- a/python/lvmdrp/functions/imageMethod.py +++ b/python/lvmdrp/functions/imageMethod.py @@ -24,10 +24,13 @@ from typing import List, Tuple +from lvmdrp.core import fit_profile as fp + from lvmdrp import log from lvmdrp.utils.decorators import skip_on_missing_input_path, drop_missing_input_paths, skip_if_drpqual_flags from lvmdrp.utils.bitmask import QualityFlag from lvmdrp.core.fit_profile import Gaussians +from lvmdrp.core.fit_profile import Voigts from lvmdrp.core.fiberrows import FiberRows, _read_fiber_ypix from lvmdrp.core.image import ( Image, @@ -4320,6 +4323,7 @@ def trace_fibers( fig.supxlabel("Y (pixel)") colors = plt.cm.coolwarm(numpy.linspace(0, 1, len(columns))) + #colors = plt.cm.Spectral(numpy.linspace(0, 1, len(columns))) idx = numpy.argsort(columns) for i in idx: icolumn = columns[i] @@ -4329,10 +4333,9 @@ def trace_fibers( img_slice = img.getSlice(icolumn, axis="y") img_slice._data[(img_slice._mask)|(joint_mod<=0)] = numpy.nan - # weights = img_slice._data / bn.nansum(img_slice._data) + weights = img_slice._data / bn.nansum(img_slice._data) * 500 residuals = (joint_mod - img_slice._data) / img_slice._data * 100 - # residuals *= weights - ax.plot(img_slice._pixels, residuals, ".", ms=2, mew=0, mfc=colors[i]) + ax.scatter(img_slice._pixels, residuals, s=weights, lw=0, color=colors[i]) ax.set_ylim(-50, 50) save_fig( fig, @@ -4343,3 +4346,667 @@ def trace_fibers( ) return centroids, trace_cent, trace_amp, trace_fwhm + +def trace_Voigt_fibers( + in_image: str, + out_trace_amp: str, + out_trace_cent: str, + out_trace_fwhm_G: str, + out_trace_fwhm_L: str, + out_trace_cent_guess: str = None, + correct_ref: bool = False, + median_box: tuple = (1, 10), + coadd: int = 5, + method: str = "voigt", + guess_fwhm_G: float = 3.0, + guess_fwhm_L: float = 3.0, + counts_threshold: float = 0.5, + max_diff: int = 1.5, + ncolumns: int | Tuple[int] = 18, + nblocks: int = 18, + iblocks: list = [], + fwhm_G_limits: Tuple[float] = (1.0, 3.5), + fwhm_L_limits: Tuple[float] = (1.0, 3.5), + fit_poly: bool = False, + poly_deg: int | Tuple[int] = 6, + interpolate_missing: bool = True, + display_plots: bool = True +) -> Tuple[TraceMask, TraceMask, TraceMask]: + ''' + Parameters + ---------- + in_image : str + path to input image + out_trace_amp : str + path to output amplitude trace + out_trace_cent : str + path to output centroid trace + out_trace_fwhm_G : str + path to output FWHM gaussian trace + out_trace_fwhm_L : str + path to output FWHM lorentz trace + out_trace_cent_guess : str, optional + path to output centroid guess trace, by default None + correct_ref : bool, optional + whether to correct reference fiber positions, by default False + median_box : tuple, optional + median box to use for cleaning cosmic rays, by default (1, 10) + coadd : int, optional + number of pixels to coadd along dispersion axis, by default 5 + method : str, optional + method to use for tracing, by default "voigt" + guess_fwhm_G : float, optional + guess FWHM gaussian of fiber profiles, by default 3.0 + guess_fwhm_L : float, optional + guess FWHM lorentzian of fiber profiles, by default 3.0 + counts_threshold : float, optional + threshold to use for fiber detection, by default 0.5 + max_diff : int, optional + maximum difference between consecutive fiber positions, by default 1.5 + ncolumns : int or 2-tuple, optional + number of columns to use for tracing, by default 18 + nblocks : int, optional + number of blocks to use for tracing, by default 18 + iblocks : list, optional + list of blocks to trace, by default [] + fwhm_G_limits: tuple, optional + limits to use for FWHM fitting, by default (1.0, 3.5) + fwhm_L_limits: tuple, optional + limits to use for FWHM fitting, by default (1.0, 3.5) + fit_poly : bool, optional + whether to fit a polynomial to the dispersion solution, by default False (interpolate in X axis) + poly_deg : int or 3-tuple, optional + degree of polynomial(s) to use when fitting amplitude, centroid FWHM gaussian and FWHM lorentzian, by default 8 + intrpolate_missing : bool, optional + whether to interpolate bad/missing fibers, by default True + display_plots : bool, optional + whether to show plots on display or not, by default True + + Returns + ------- + centroids : TraceMask + fiber centroids + flux : TraceMask + fiber flux + fwhm_G : TraceMask + fiber FWHM gaussian + fwhm_L : TraceMask + fiber FWHM lorentzian + + Raises + ------ + ValueError + invalid polynomial degree + ValueError + invalid number of columns + """ + ''' + + # parse polynomial degrees + if isinstance(poly_deg, (list, tuple)) and len(poly_deg) == 4: # si poly_def es una lista o tupla y si tiene len = 3 + deg_amp, deg_cent, deg_fwhm_G, deg_fwhm_L = poly_deg + elif isinstance(poly_deg, int): # si es un entero + deg_amp = deg_cent = deg_fwhm_G = deg_fwhm_L= poly_deg + else: + raise ValueError(f"invalid polynomial degree: {poly_deg}") + + if isinstance(ncolumns, (list, tuple)) and len(ncolumns) == 2: + ncolumns_cent, ncolumns_full = ncolumns + elif isinstance(ncolumns, int): + ncolumns_cent = ncolumns_full = ncolumns + else: + raise ValueError(f"invalid number of columns: {ncolumns}") + + # load continuum image from file + log.info(f"using flat image {os.path.basename(in_image)} for tracing") + img = loadImage(in_image) + img.setData(data=numpy.nan_to_num(img._data), error=numpy.nan_to_num(img._error)) + + # extract usefull metadata from the image + channel = img._header["CCD"][0] + unit = img._header["BUNIT"] + + # read slitmap extension + slitmap = img.getSlitmap() + slitmap = slitmap[slitmap["spectrographid"] == int(img._header["CCD"][1])] + + # perform median filtering along the dispersion axis to clean cosmic rays + median_box = tuple(map(lambda x: max(x, 1), median_box)) + if median_box != (1, 1): + log.info(f"performing median filtering with box {median_box} pixels") + img = img.replaceMaskMedian(*median_box) + img = img.medianImg(median_box) + + # coadd images along the dispersion axis to increase the S/N of the peaks + if coadd != 0: + log.info(f"coadding {coadd} pixels along the dispersion axis") + coadd_kernel = numpy.ones((1, coadd), dtype="uint8") + img = img.convolveImg(coadd_kernel) + counts_threshold = counts_threshold * coadd + + # extract guess positions from fibermap + ref_cent = slitmap[f"ypix_{channel}"].data + # correct reference fiber positions + profile = img.getSlice(LVM_REFERENCE_COLUMN, axis="y") + pixels = profile._pixels + if correct_ref: + pixels = numpy.arange(pixels.size) + guess_heights = numpy.ones_like(ref_cent) * numpy.nanmax(profile._data) + ref_profile = _spec_from_lines(ref_cent, sigma=1.2, wavelength=pixels, heights=guess_heights) # independiente del modelo de ajuste + log.info(f"correcting guess positions for column {LVM_REFERENCE_COLUMN}") + cc, bhat, mhat = _cross_match( + ref_spec=ref_profile, + obs_spec=profile._data, + stretch_factors=numpy.linspace(0.7,1.3,5000), + shift_range=[-100, 100]) + log.info(f"stretch factor: {mhat:.3f}, shift: {bhat:.3f}") + ref_cent = ref_cent * mhat + bhat + # set mask + fibers_status = slitmap["fibstatus"] + bad_fibers = (fibers_status == 1) | (profile._data[ref_cent.round().astype(int)] < counts_threshold) + good_fibers = numpy.where(numpy.logical_not(bad_fibers))[0] + + # create empty traces mask for the image + fibers = ref_cent.size + dim = img.getDim() + centroids = TraceMask() + centroids.createEmpty(data_dim=(fibers, dim[1]), mask_dim=(fibers, dim[1])) + centroids.setFibers(fibers) + centroids._good_fibers = good_fibers + centroids.setHeader(img._header.copy()) + centroids._header["IMAGETYP"] = "trace_centroid" + + # initialize flux and FWHMs traces + trace_cent = copy(centroids) + trace_amp = copy(centroids) + trace_amp._header["IMAGETYP"] = "trace_amplitude" + trace_fwhm_G = copy(centroids) + trace_fwhm_G._header["IMAGETYP"] = "trace_fwhm_G" + trace_fwhm_L = copy(centroids) + trace_fwhm_L._header["IMAGETYP"] = "trace_fwhm_L" + + # set positions of fibers along reference column + centroids.setSlice(LVM_REFERENCE_COLUMN, axis="y", data=ref_cent, mask=numpy.zeros_like(ref_cent, dtype="bool")) + + # select columns to measure centroids + step = img._dim[1] // ncolumns_cent + columns = numpy.concatenate((numpy.arange(LVM_REFERENCE_COLUMN, 0, -step), numpy.arange(LVM_REFERENCE_COLUMN, img._dim[1], step))) + log.info(f"tracing centroids in {len(columns)-1} columns: {','.join(map(str, numpy.unique(columns)))}") + + # trace centroids in each column + mod_columns, residuals = [], [] + iterator = tqdm(enumerate(columns), total=len(columns), desc="tracing centroids", unit="column", ascii=True) + for i, icolumn in iterator: + # extract column profile + img_slice = img.getSlice(icolumn, axis="y") + + # get fiber positions along previous column + if icolumn == LVM_REFERENCE_COLUMN: + # trace reference column first or skip if already traced + if i == 0: + cent_guess, _, mask_guess = centroids.getSlice(LVM_REFERENCE_COLUMN, axis="y") + else: + continue + else: + cent_guess, _, mask_guess = centroids.getSlice(columns[i-1], axis="y") + + # update masked fibers + mask_guess |= numpy.isnan(cent_guess) + + # fix masked fibers from last iteration + cent_guess[mask_guess] = copy(ref_cent)[mask_guess] + # cast fiber positions to integers + cent_guess = cent_guess.round().astype("int16") + + # measure fiber positions + + cen_slice, msk_slice = img_slice.measurePeaks(cent_guess, method, init_sigma = guess_fwhm_G / 2.354, threshold=counts_threshold, max_diff=max_diff) + + centroids.setSlice(icolumn, axis="y", data=cen_slice, mask=msk_slice) + + # smooth all trace by a polynomial + log.info(f"fitting centroid guess trace with {deg_cent}-deg polynomial") + centroids.fit_polynomial(deg_cent, poly_kind="poly") + # set bad fibers in trace mask + centroids._mask[bad_fibers] = True + + # linearly interpolate coefficients at masked fibers + log.info(f"interpolating coefficients at {bad_fibers.sum()} masked fibers") + centroids.interpolate_coeffs() + + # select columns to fit for amplitudes, centroids and FWHMs per fiber block + step = img._dim[1] // ncolumns_full + columns = numpy.concatenate((numpy.arange(LVM_REFERENCE_COLUMN, 0, -step), numpy.arange(LVM_REFERENCE_COLUMN+step, img._dim[1], step))) + log.info(f"tracing fibers in {len(columns)} columns: {','.join(map(str, columns))}") + + # fit peaks, centroids and FWHMs in each column + for i, icolumn in enumerate(columns): + log.info(f"tracing column {icolumn} ({i+1}/{len(columns)})") + # get slice of data and trace + cen_slice, _, msk_slice = centroids.getSlice(icolumn, axis="y") + img_slice = img.getSlice(icolumn, axis="y") + + # define fiber blocks + if iblocks and isinstance(iblocks, (list, tuple, numpy.ndarray)): + cen_blocks = numpy.split(cen_slice, LVM_NBLOCKS) + cen_blocks = numpy.asarray(cen_blocks)[iblocks] + msk_blocks = numpy.split(msk_slice, LVM_NBLOCKS) + msk_blocks = numpy.asarray(msk_blocks)[iblocks] + else: + cen_blocks = numpy.split(cen_slice, nblocks) + msk_blocks = numpy.split(msk_slice, nblocks) + + # fit each block + par_blocks = [] + for j, (cen_block, msk_block) in enumerate(zip(cen_blocks, msk_blocks)): + # apply flux threshold + cen_idx = cen_block.round().astype("int16") + msk_block |= (img_slice._data[cen_idx] < counts_threshold) + + # mask bad fibers + cen_block = cen_block[~msk_block] + # initialize parameters with the full block size + par_block = numpy.ones(4 * msk_block.size) * numpy.nan + par_mask = numpy.tile(msk_block, 4) + + # skip block if all fibers are masked + if msk_block.sum() > 0.5 * msk_block.size: + log.info(f"skipping fiber block {j+1}/{nblocks} (most fibers masked)") + else: + # fit voigt models to each fiber profile + log.info(f"fitting fiber block {j+1}/{nblocks} ({cen_block.size}/{msk_block.size} good fibers)") + _, par_block[~par_mask] = img_slice.fitMultiVoigt(cen_block, init_fwhm_G=guess_fwhm_G, init_fwhm_L = guess_fwhm_L) + + par_blocks.append(par_block) + + # combine all parameters in a single array + par_joint = numpy.asarray([numpy.split(par_block, 4) for par_block in par_blocks]) + par_joint = par_joint.transpose(1, 0, 2).reshape(4, -1) + # define joint gaussian model + mod_joint = Voigts(par=par_joint.ravel()) + + # store joint model + mod_columns.append(mod_joint) + + # get parameters of joint model + amp_slice = par_joint[0] + cent_slice = par_joint[1] + fwhm_G_slice = par_joint[2] * 2.354 + fwhm_L_slice = par_joint[3] * 2.354 + + # mask fibers with invalid values + amp_off = (amp_slice <= counts_threshold) + log.info(f"masking {amp_off.sum()} samples with amplitude < {counts_threshold} {unit}") + cent_off = numpy.abs(1 - cent_slice / numpy.concatenate(cen_blocks)) > 0.01 + log.info(f"masking {cent_off.sum()} samples with centroids refined by > 1 %") + fwhm_G_off = (fwhm_G_slice < fwhm_G_limits[0]) | (fwhm_G_slice > fwhm_G_limits[1]) + log.info(f"masking {fwhm_G_off.sum()} samples with FWHM_G outside {fwhm_G_limits} pixels") + fwhm_L_off = (fwhm_L_slice < fwhm_L_limits[0]) | (fwhm_L_slice > fwhm_L_limits[1]) + log.info(f"masking {fwhm_L_off.sum()} samples with FWHM_L outside {fwhm_L_limits} pixels") + amp_mask = numpy.isnan(amp_slice) | amp_off | cent_off | fwhm_G_off | fwhm_L_off + cent_mask = numpy.isnan(cent_slice) | amp_off | cent_off | fwhm_G_off | fwhm_L_off + fwhm_G_mask = numpy.isnan(fwhm_G_slice) | amp_off | cent_off | fwhm_G_off | fwhm_L_off + fwhm_L_mask = numpy.isnan(fwhm_L_slice) | amp_off | cent_off | fwhm_G_off | fwhm_L_off + + if amp_slice.size != trace_amp._data.shape[0]: + dummy_amp = numpy.split(numpy.zeros(trace_amp._data.shape[0]), LVM_NBLOCKS) + dummy_cent = numpy.split(numpy.zeros(trace_cent._data.shape[0]), LVM_NBLOCKS) + dummy_fwhm_G = numpy.split(numpy.zeros(trace_fwhm_G._data.shape[0]), LVM_NBLOCKS) + dummy_fwhm_L = numpy.split(numpy.zeros(trace_fwhm_L._data.shape[0]), LVM_NBLOCKS) + dummy_amp_mask = numpy.split(numpy.ones(trace_amp._data.shape[0], dtype=bool), LVM_NBLOCKS) + dummy_cent_mask = numpy.split(numpy.ones(trace_cent._data.shape[0], dtype=bool), LVM_NBLOCKS) + dummy_fwhm_G_mask = numpy.split(numpy.ones(trace_fwhm_G._data.shape[0], dtype=bool), LVM_NBLOCKS) + dummy_fwhm_L_mask = numpy.split(numpy.ones(trace_fwhm_L._data.shape[0], dtype=bool), LVM_NBLOCKS) + + amp_split = numpy.split(amp_slice, len(iblocks)) + cent_split = numpy.split(cent_slice, len(iblocks)) + fwhm_G_split = numpy.split(fwhm_G_slice, len(iblocks)) + fwhm_L_split = numpy.split(fwhm_L_slice, len(iblocks)) + amp_mask_split = numpy.split(amp_mask, len(iblocks)) + cent_mask_split = numpy.split(cent_mask, len(iblocks)) + fwhm_G_mask_split = numpy.split(fwhm_G_mask, len(iblocks)) + fwhm_L_mask_split = numpy.split(fwhm_L_mask, len(iblocks)) + for j, iblock in enumerate(iblocks): + dummy_amp[iblock] = amp_split[j] + dummy_cent[iblock] = cent_split[j] + dummy_fwhm_G[iblock] = fwhm_G_split[j] + dummy_fwhm_L[iblock] = fwhm_L_split[j] + dummy_amp_mask[iblock] = amp_mask_split[j] + dummy_cent_mask[iblock] = cent_mask_split[j] + dummy_fwhm_G_mask[iblock] = fwhm_G_mask_split[j] + dummy_fwhm_L_mask[iblock] = fwhm_L_mask_split[j] + + # update traces + trace_amp.setSlice(icolumn, axis="y", data=numpy.concatenate(dummy_amp), mask=numpy.concatenate(dummy_amp_mask)) + trace_cent.setSlice(icolumn, axis="y", data=numpy.concatenate(dummy_cent), mask=numpy.concatenate(dummy_cent_mask)) + trace_fwhm_G.setSlice(icolumn, axis="y", data=numpy.concatenate(dummy_fwhm_G), mask=numpy.concatenate(dummy_fwhm_G_mask)) + trace_fwhm_L.setSlice(icolumn, axis="y", data=numpy.concatenate(dummy_fwhm_L), mask=numpy.concatenate(dummy_fwhm_L_mask)) + trace_amp._good_fibers = numpy.arange(trace_amp._fibers)[~numpy.all(trace_amp._mask, axis=1)] + trace_cent._good_fibers = numpy.arange(trace_cent._fibers)[~numpy.all(trace_cent._mask, axis=1)] + trace_fwhm_G._good_fibers = numpy.arange(trace_fwhm_G._fibers)[~numpy.all(trace_fwhm_G._mask, axis=1)] + trace_fwhm_L._good_fibers = numpy.arange(trace_fwhm_L._fibers)[~numpy.all(trace_fwhm_L._mask, axis=1)] + else: + # update traces + trace_amp.setSlice(icolumn, axis="y", data=amp_slice, mask=amp_mask) + trace_cent.setSlice(icolumn, axis="y", data=cent_slice, mask=cent_mask) + trace_fwhm_G.setSlice(icolumn, axis="y", data=fwhm_G_slice, mask=fwhm_G_mask) + trace_fwhm_L.setSlice(icolumn, axis="y", data=fwhm_L_slice, mask=fwhm_L_mask) + + # compute residuals + integral_mod = numpy.trapz(mod_joint(img_slice._pixels), img_slice._pixels) or numpy.nan + integral_dat = numpy.trapz(img_slice._data, img_slice._pixels) + residuals.append((integral_dat - integral_mod) / integral_dat * 100) + + # compute fitted model stats + chisq_red = bn.nansum((mod_joint(img_slice._pixels) - img_slice._data)[~img_slice._mask]**2 / img_slice._error[~img_slice._mask]**2) / (img._dim[0] - 1 - 3) + log.info(f"joint model {chisq_red = :.2f}") + if amp_mask.all() or cent_mask.all() or fwhm_G_mask.all() or fwhm_L_mask.all(): + continue + min_amp, max_amp, median_amp = bn.nanmin(amp_slice[~amp_mask]), bn.nanmax(amp_slice[~amp_mask]), bn.nanmedian(amp_slice[~amp_mask]) + min_cent, max_cent, median_cent = bn.nanmin(cent_slice[~cent_mask]), bn.nanmax(cent_slice[~cent_mask]), bn.nanmedian(cent_slice[~cent_mask]) + min_fwhm_G, max_fwhm_G, median_fwhm_G = bn.nanmin(fwhm_G_slice[~fwhm_G_mask]), bn.nanmax(fwhm_G_slice[~fwhm_G_mask]), bn.nanmedian(fwhm_G_slice[~fwhm_G_mask]) + min_fwhm_L, max_fwhm_L, median_fwhm_L = bn.nanmin(fwhm_L_slice[~fwhm_L_mask]), bn.nanmax(fwhm_L_slice[~fwhm_L_mask]), bn.nanmedian(fwhm_L_slice[~fwhm_L_mask]) + log.info(f"joint model amplitudes: {min_amp = :.2f}, {max_amp = :.2f}, {median_amp = :.2f}") + log.info(f"joint model centroids: {min_cent = :.2f}, {max_cent = :.2f}, {median_cent = :.2f}") + log.info(f"joint model FWHM_Gs: {min_fwhm_G = :.2f}, {max_fwhm_G = :.2f}, {median_fwhm_G = :.2f}") + log.info(f"joint model FWHM_Ls: {min_fwhm_L = :.2f}, {max_fwhm_L = :.2f}, {median_fwhm_L = :.2f}") + + # smooth all trace by a polynomial + if fit_poly: + log.info(f"fitting peak trace with {deg_amp}-deg polynomial") + trace_amp.fit_polynomial(deg_amp, poly_kind="poly") + log.info(f"fitting centroid trace with {deg_cent}-deg polynomial") + trace_cent.fit_polynomial(deg_cent, poly_kind="poly") + log.info(f"fitting FWHM_G trace with {deg_fwhm_G}-deg polynomial") + trace_fwhm_G.fit_polynomial(deg_fwhm_G, poly_kind="poly") + log.info(f"fitting FWHM_L trace with {deg_fwhm_L}-deg polynomial") + trace_fwhm_L.fit_polynomial(deg_fwhm_L, poly_kind="poly") + # set bad fibers in trace mask + trace_amp._mask[bad_fibers] = True + trace_cent._mask[bad_fibers] = True + trace_fwhm_G._mask[bad_fibers] = True + trace_fwhm_L._mask[bad_fibers] = True + + # linearly interpolate coefficients at masked fibers + if interpolate_missing: + log.info(f"interpolating coefficients at {bad_fibers.sum()} masked fibers") + trace_amp.interpolate_coeffs() + trace_cent.interpolate_coeffs() + trace_fwhm_G.interpolate_coeffs() + trace_fwhm_L.interpolate_coeffs() + else: + # interpolate traces along X axis to fill in missing data + log.info("interpolating traces along X axis to fill in missing data") + trace_amp.interpolate_data(axis="X") + trace_cent.interpolate_data(axis="X") + trace_fwhm_G.interpolate_data(axis="X") + trace_fwhm_L.interpolate_data(axis="X") + # set bad fibers in trace mask + trace_amp._mask[bad_fibers] = True + trace_cent._mask[bad_fibers] = True + trace_fwhm_G._mask[bad_fibers] = True + trace_fwhm_L._mask[bad_fibers] = True + + if interpolate_missing: + log.info("interpolating bad fibers") + trace_amp.interpolate_data(axis="Y") + trace_cent.interpolate_data(axis="Y") + trace_fwhm_G.interpolate_data(axis="Y") + trace_fwhm_L.interpolate_data(axis="Y") + + # write output traces + log.info(f"writing amplitude trace to '{os.path.basename(out_trace_amp)}'") + trace_amp.writeFitsData(out_trace_amp) + log.info(f"writing centroid trace to '{os.path.basename(out_trace_cent)}'") + trace_cent.writeFitsData(out_trace_cent) + log.info(f"writing FWHM_G trace to '{os.path.basename(out_trace_fwhm_G)}'") + trace_fwhm_G.writeFitsData(out_trace_fwhm_G) + log.info(f"writing FWHM_L trace to '{os.path.basename(out_trace_fwhm_L)}'") + trace_fwhm_L.writeFitsData(out_trace_fwhm_L) + if out_trace_cent_guess is not None: + log.info(f"writing guess centroids trace to '{os.path.basename(out_trace_cent_guess)}'") + centroids.writeFitsData(out_trace_cent_guess) + + # plot results + log.info("plotting results") + camera = img._header["CCD"] + # residuals + fig, ax = create_subplots(to_display=display_plots, nrows=1, ncols=1, figsize=(15,7)) + fig.suptitle(f"Residuals of joint model for {camera = }") + ax.plot(columns, residuals, "o", color="tab:red", ms=10) + ax.axhline(0, color="0.2", ls="--", lw=1) + ax.grid(ls="--", color="0.9", lw=0.5, zorder=0) + ax.set_xlabel("X (pixel)") + ax.set_ylabel("residuals (%)") + save_fig( + fig, + product_path=out_trace_amp, + to_display=display_plots, + figure_path="qa", + label="residuals_int_columns" + ) + + # profile models vs data + fig, ax = create_subplots(to_display=display_plots, figsize=(15,7)) + fig.suptitle(f"Profile fitting residuals for {camera = }") + fig.supylabel("residuals (%)") + fig.supxlabel("Y (pixel)") + + colors = plt.cm.coolwarm(numpy.linspace(0, 1, len(columns))) + idx = numpy.argsort(columns) + for i in idx: + icolumn = columns[i] + + joint_mod = mod_columns[i](img_slice._pixels) + + img_slice = img.getSlice(icolumn, axis="y") + img_slice._data[(img_slice._mask)|(joint_mod<=0)] = numpy.nan + + weights = img_slice._data / bn.nansum(img_slice._data) * 500 + residuals = (joint_mod - img_slice._data) / img_slice._data * 100 + ax.scatter(img_slice._pixels, residuals, s=weights, lw=0, color=colors[i]) + ax.set_ylim(-50, 50) + save_fig( + fig, + product_path=out_trace_amp, + to_display=display_plots, + figure_path="qa", + label="residuals_columns" + ) + return centroids, trace_cent, trace_amp, trace_fwhm_G, trace_fwhm_L + + +def image_gauss(image_path, amp_path, cent_path, fwhm_G_path, out_model, out_ratio): + ''' + This function make a model and a ratio image from parameters already calculate + + Parameters + ---------- + image_path : str + path to input image + amp_path : str + path to amplitud trace + cent_path : str + path to centroid trace + fwhm_G_path : str + path to fwhm gaussian trace + out_model : str + path for the model image + out_ratio : str + path for the ratio image + + Returns + ---------- + model_image : lvmdrp.core.image.Image + A recreation of the observational image + ratio_image : lvmdrp.core.image.Image + A reason between the observational and the model image + ''' + + if os.path.isfile(out_model) and os.path.isfile(out_ratio): + model_image = Image() # imagen vacio + model_image.loadFitsData(out_model) + + model_ratio = Image() # imagen vacio + model_ratio.loadFitsData(out_ratio) + + return model_ratio, model_image + + median_box = (1, 10) + coadd = 20 + + img = Image() # imagen vacio + img.loadFitsData(image_path) + + img.setData(data=numpy.nan_to_num(img._data), error=numpy.nan_to_num(img._error)) + + # perform median filtering along the dispersion axis to clean cosmic rays + median_box = tuple(map(lambda x: max(x, 1), median_box)) + + if median_box != (1, 1): + img = img.replaceMaskMedian(*median_box) # Replace bad pixels with the median value of pixel in a rectangular filter window + img = img.medianImg(median_box) # return median filtered image with the given kernel size + + # coadd images along the dispersion axis to increase the S/N of the peaks + if coadd != 0: + coadd_kernel = numpy.ones((1, coadd), dtype="uint8") #se especifica el axis + img = img.convolveImg(coadd_kernel) # Return a collapsed cut as a spectrum object along one axis + + + amp = TraceMask() + centroid = TraceMask() + fwhm_G = TraceMask() + # guarda en el objeto centroid el contenido del archivo + amp.loadFitsData(amp_path) + centroid.loadFitsData(cent_path) + fwhm_G.loadFitsData(fwhm_G_path) + + ncolumns = centroid._data.shape[1] + y_pixels = numpy.arange(img._data.shape[0]) + + gauss_list = [] + + for column in range(ncolumns): + c = centroid._data[:, column] + a = amp._data[:, column] + f_g = fwhm_G._data[:, column] + param = numpy.concatenate([a, c, f_g/2.354]) + gauss = fp.Gaussians(param) + + # evaluar Modelo + ev = gauss(y_pixels) + gauss_list.append(ev) + + gauss_array = numpy.array(gauss_list).T + + model_image = copy(img) + model_image._data = gauss_array # instancia de image + + model_ratio = model_image / img + + model_image.writeFitsData(out_model) + + model_ratio.writeFitsData(out_ratio) + + return model_ratio, model_image + + +def image_fitting_voigt(img_path, amp_path, cent_path, fwhm_G_path, fwhm_L_path, out_model, out_ratio): + ''' + This function make a model and a ratio image from parameters already calculate + + Parameters + ---------- + image_path : str + path to input image + amp_path : str + path to amplitud trace + cent_path : str + path to centroid trace + fwhm_G_path : str + path to fwhm gaussian trace + fwhm_L_path : str + path to fwhm lorentzian trace + out_model : str + path for the model image + out_ratio : str + path for the ratio image + + Returns + ---------- + model_image : lvmdrp.core.image.Image + A recreation of the observational image + ratio_image : lvmdrp.core.image.Image + A reason between the observational and the model image + ''' + + if os.path.isfile(out_model) and os.path.isfile(out_ratio): + model_image = Image() # imagen vacio + model_image.loadFitsData(out_model) + + model_ratio = Image() # imagen vacio + model_ratio.loadFitsData(out_ratio) + + return model_ratio, model_image + + median_box = (1, 10) + coadd = 20 + + img = Image() # imagen vacio + img.loadFitsData(img_path) + + img.setData(data=numpy.nan_to_num(img._data), error=numpy.nan_to_num(img._error)) + + # perform median filtering along the dispersion axis to clean cosmic rays + median_box = tuple(map(lambda x: max(x, 1), median_box)) + + if median_box != (1, 1): + img = img.replaceMaskMedian(*median_box) # Replace bad pixels with the median value of pixel in a rectangular filter window + img = img.medianImg(median_box) # return median filtered image with the given kernel size + + # coadd images along the dispersion axis to increase the S/N of the peaks + if coadd != 0: + coadd_kernel = numpy.ones((1, coadd), dtype="uint8") #se especifica el axis + img = img.convolveImg(coadd_kernel) # Return a collapsed cut as a spectrum object along one axis. + + amp = TraceMask() + centroid = TraceMask() + fwhm_G = TraceMask() + fwhm_L = TraceMask() + + #guarda en el objeto centroid el contenido del archivo + + amp.loadFitsData(amp_path) + centroid.loadFitsData(cent_path) + fwhm_G.loadFitsData(fwhm_G_path) + fwhm_L.loadFitsData(fwhm_L_path) + + ncolumns = centroid._data.shape[1] + y_pixels = numpy.arange(img._data.shape[0]) + + voigt_list = [] + + for column in range(ncolumns): + c = centroid._data[:, column] + a = amp._data[:, column] + f_g = fwhm_G._data[:, column] + f_l = fwhm_L._data[:, column] + param = numpy.concatenate([a, c, f_g/2.354, f_l/2.354]) + voigts = fp.Voigts(param) + + # Evaluar Modelo + ev = voigts(y_pixels) + voigt_list.append(ev) + + voigt_array = numpy.array(voigt_list).T + + model_image = copy(img) + model_image._data = voigt_array # instancia de image + + model_ratio = model_image / img + + model_image.writeFitsData(out_model) + + model_ratio.writeFitsData(out_ratio) + + return model_ratio, model_image \ No newline at end of file