diff --git a/sotodlib/hwp/__init__.py b/sotodlib/hwp/__init__.py index d78991ee9..7128b11f0 100644 --- a/sotodlib/hwp/__init__.py +++ b/sotodlib/hwp/__init__.py @@ -6,7 +6,7 @@ """ from .g3thwp import G3tHWP -from .hwp import (get_hwpss, subtract_hwpss, demod_tod) +from .hwp import (get_hwpss, subtract_hwpss, demod_tod, get_hwp_freq) from .sim_hwp import I_to_P_param from .sim_hwp import sim_hwpss from .sim_hwp import sim_hwpss_2f4f diff --git a/sotodlib/hwp/hwp.py b/sotodlib/hwp/hwp.py index 0e3cc6ea4..2363d3221 100644 --- a/sotodlib/hwp/hwp.py +++ b/sotodlib/hwp/hwp.py @@ -1,7 +1,7 @@ import numpy as np from scipy.optimize import curve_fit from sotodlib import core, tod_ops -from sotodlib.tod_ops import bin_signal, filters, apodize +from sotodlib.tod_ops import filters, apodize import logging logger = logging.getLogger(__name__) @@ -258,7 +258,7 @@ def get_binned_hwpss(aman, signal=None, hwp_angle=None, else: weight_for_signal = None - binning_dict = bin_signal(aman, bin_by=hwp_angle, range=[0, 2*np.pi], + binning_dict = tod_ops.bin_signal(aman, bin_by=hwp_angle, range=[0, 2*np.pi], bins=bins, signal=signal, flags=flags, weight_for_signal=weight_for_signal) bin_centers = binning_dict['bin_centers'] @@ -562,6 +562,20 @@ def subtract_hwpss(aman, signal='signal', hwpss_template_name='hwpss_model', if remove_template: aman.move(hwpss_template_name, None) +def get_hwp_freq(timestamps, hwp_angle): + """ + Calculate the frequency of HWP rotation. + + Parameters: + timestamps (array-like): An array of timestamps. + hwp_angle (array-like): An array of HWP angles in radian + + Returns: + float: The frequency of the HWP rotation in Hz. + """ + hwp_freq = (np.sum(np.abs(np.diff(np.unwrap(hwp_angle)))) / + (timestamps[-1] - timestamps[0])) / (2 * np.pi) + return hwp_freq def demod_tod(aman, signal=None, demod_mode=4, bpf_cfg=None, lpf_cfg=None): @@ -611,8 +625,7 @@ def demod_tod(aman, signal=None, demod_mode=4, raise TypeError("Signal must be None, str, or ndarray") # HWP speed in Hz - speed = (np.sum(np.abs(np.diff(np.unwrap(aman.hwp_angle)))) / - (aman.timestamps[-1] - aman.timestamps[0])) / (2 * np.pi) + speed = get_hwp_freq(timestamps=aman.timestamps, hwp_angle=aman.hwp_angle) if bpf_cfg is None: bpf_center = demod_mode * speed diff --git a/sotodlib/tod_ops/fft_ops.py b/sotodlib/tod_ops/fft_ops.py index 1dc06eeaa..70a6a8aa6 100644 --- a/sotodlib/tod_ops/fft_ops.py +++ b/sotodlib/tod_ops/fft_ops.py @@ -4,11 +4,11 @@ import numpy as np import pyfftw import so3g +from so3g.proj import Ranges, RangesMatrix from scipy.optimize import minimize from scipy.signal import welch -from sotodlib import core - -from . import detrend_tod +from sotodlib import core, hwp +from sotodlib.tod_ops import detrend_tod def _get_num_threads(): @@ -211,6 +211,7 @@ def calc_psd( prefer='center', freq_spacing=None, merge=False, + merge_suffix=None, overwrite=True, **kwargs ): @@ -231,7 +232,8 @@ def calc_psd( If None the default nperseg of ~1/50th the signal length is used. If an nperseg is explicitly passed then that will be used. merge (bool): if True merge results into axismanager. - overwrite (bool): if true will overwrite f, pxx axes. + merge_suffix (str, optional): Suffix to append to the Pxx field name in aman. Defaults to None (merged as Pxx). + overwrite (bool): if true will overwrite f, Pxx axes. **kwargs: keyword args to be passed to signal.welch(). Returns: @@ -269,14 +271,23 @@ def calc_psd( freqs, Pxx = welch(signal[:, start:stop], fs, **kwargs) if merge: - aman.merge( core.AxisManager(core.OffsetAxis("nusamps", len(freqs)))) + if 'nusamps' not in list(aman._axes.keys()): + aman.merge(core.AxisManager(core.OffsetAxis("nusamps", len(freqs)))) + aman.wrap("freqs", freqs, [(0,"nusamps")]) + else: + if len(freqs) != aman.nusamps.count: + raise ValueError('New freqs does not match the shape of nusamps\ + To avoid this, use the same value for nperseg') + + if merge_suffix is None: + Pxx_name = 'Pxx' + else: + Pxx_name = f'Pxx_{merge_suffix}' + if overwrite: - if "freqs" in aman._fields: - aman.move("freqs", None) - if "Pxx" in aman._fields: - aman.move("Pxx", None) - aman.wrap("freqs", freqs, [(0,"nusamps")]) - aman.wrap("Pxx", Pxx, [(0,"dets"),(1,"nusamps")]) + if Pxx_name in aman._fields: + aman.move(Pxx_name, None) + aman.wrap(Pxx_name, Pxx, [(0,"dets"), (1,"nusamps")]) return freqs, Pxx @@ -323,21 +334,177 @@ def calc_wn(aman, pxx=None, freqs=None, low_f=5, high_f=10): return wn -def noise_model(f, p): +def noise_model(f, params, **fixed_param): """ Noise model for power spectrum with white noise, and 1/f noise. + If any fixed param is handed, that parameter is fixed in the fit. + 'alpha' or 'wn' can be fixed. + params = [wn, fknee, alpha] """ - fknee, w, alpha = p[0], p[1], p[2] - return w * (1 + (fknee / f) ** alpha) + if 'wn' in fixed_param.keys(): + if len(params)==2: + wn = fixed_param['wn'] + fknee, alpha = params[0], params[1] + else: + raise ValueError('The number of fit parameters are invalid.') + return + elif 'alpha' in fixed_param.keys(): + if len(params)==2: + alpha = fixed_param['alpha'] + wn, fknee = params[0], params[1] + else: + raise ValueError('The number of fit parameters are invalid.') + return + elif len(fixed_param)==0: + if len(params)==3: + wn, fknee, alpha = params[0], params[1], params[2] + else: + raise ValueError('The number of fit parameters are invalid.') + return + else: + raise ValueError('"alpha" or "wn" can be a fixed parameter.') + return + return wn**2 * (1 + (fknee / f) ** alpha) -def neglnlike(params, x, y): - model = noise_model(x, params) - output = np.sum(np.log(model) + y / model) +def neglnlike(params, x, y, bin_size=1, **fixed_param): + model = noise_model(x, params, **fixed_param) + output = np.sum((np.log(model) + y / model)*bin_size) if not np.isfinite(output): return 1.0e30 return output +def get_psd_mask(aman, psd_mask=None, f=None, pxx=None, + mask_hwpss=True, hwp_freq=None, max_hwpss_mode=10, hwpss_width=((-0.4, 0.6), (-0.2, 0.2)), + mask_peak=False, peak_freq=None, peak_width=(-0.002, +0.002), + merge=True, overwrite=True +): + """ + Function to get masks for hwpss or single peak in PSD. + + Arguments + --------- + aman : AxisManager + Axis manager which has 'nusamps' axis. + psd_mask : numpy.ndarray or Ranges + Existing psd_mask to be updated. If None, a new mask is created. + f : nparray + Frequency of PSD of signal. If None, aman.freqs are used. + pxx : nparray + PSD of signal. If None, aman.Pxx are used. + mask_hwpss : bool + If True, hwpss are masked. Defaults to True. + hwp_freq : float + HWP frequency. If None, calculated based on aman.hwp_angle + max_hwpss_mode : int + Maximum hwpss mode to subtract. + hwpss_width : array-like + If given in float, + nf-hwpss will be masked like (n * hwp_freq - width/2) < f < (n * hwp_freq + width/2). + If given in array like [1.0, 0.4], + 1f hwpss is masked like (hwp_freq - 0.5) < f < (hwp_freq + 0.5) and + nf hwpss are masked like (n * hwp_freq - 0.2) < f < (n * hwp_freq + 0.2). + If given in array like [[-0.4, 0.6], [-0.2, 0.3]], + 1f hwpss is masked like (hwp_freq - 0.4) < f < (hwp_freq + 0.6) and + nf are masked like (n * hwp_freq - 0.2) < f < (n * hwp_freq + 0.3). + mask_peak : bool + If True, single peak is masked. + peak_freq : float + Center frequency of the mask. + peak_width : tuple + Range to mask signal. The default masked range will be + (peak_freq - 0.002) < f < (peak_freq + 0.002). + merge : bool + if True merge results into axismanager. + mode: str + if "replace", existing PSD mask is replaced to new mask. + If "add", new mask range is added to the existing one. + overwrite: bool + if true will overwrite aman.psd_mask. + + Returns + ------- + psd_mask (nusamps): Ranges array. If merge == True, "psd_mask" is added to the aman. + """ + if psd_mask is None: + psd_mask = np.zeros(aman.nusamps.count, dtype=bool) + elif isinstance(psd_mask, so3g.RangesInt32): + psd_mask = psd_mask.mask() + if f is None: + f = aman.freqs + if pxx is None: + pxx = aman.Pxx + if mask_hwpss: + hwp_freq = hwp.get_hwp_freq(aman.timestamps, aman.hwp_solution.hwp_angle) + psd_mask = psd_mask | get_mask_for_hwpss(f, hwp_freq, max_mode=max_hwpss_mode, width=hwpss_width) + if mask_peak: + psd_mask = psd_mask | get_mask_for_single_peak(f, peak_freq, peak_width=peak_width) + + psd_mask = Ranges.from_bitmask(psd_mask) + if merge: + if overwrite: + if "psd_mask" in aman: + aman.move("psd_mask", None) + aman.wrap("psd_mask", psd_mask, [(0,"nusamps")]) + return psd_mask + +def get_binned_psd( + aman, + f=None, + pxx=None, + unbinned_mode=3, + base=1.05, + merge=False, + overwrite=True, +): + """ + Function that masks hwpss in PSD. + + Arguments + --------- + aman : AxisManager + Axis manager which has samps axis aligned with signal. + f : nparray + Frequency of PSD of signal. + pxx : nparray + PSD of signal. + mask : bool + if True calculate binned psd with mask. + unbinned_mode : int + First Fourier modes up to this number are left un-binned. + base : float (> 1) + Base of the logspace bins. + merge : bool + if True merge results into axismanager. + overwrite: bool + if true will overwrite f, pxx axes. + + Returns + ------- + f_binned, pxx_binned, bin_size: binned frequency and PSD. + """ + if f is None: + f = aman.freqs + if pxx is None: + pxx = aman.Pxx + + f_bin, bin_size = log_binning(f, unbinned_mode=unbinned_mode, base=base, return_bin_size=True) + pxx_bin = log_binning(pxx, unbinned_mode=unbinned_mode, base=base, return_bin_size=False) + + if merge: + aman.merge(core.AxisManager(core.OffsetAxis("nusamps_bin", len(f_bin)))) + if overwrite: + if "freqs_bin" in aman._fields: + aman.move("freqs_bin", None) + if "Pxx_bin" in aman._fields: + aman.move("Pxx_bin", None) + if "bin_size" in aman._fields: + aman.move("bin_size", None) + aman.wrap("freqs_bin", f_bin, [(0,"nusamps_bin")]) + aman.wrap("Pxx_bin", pxx_bin, [(0,"dets"),(1,"nusamps_bin")]) + aman.wrap("bin_size", bin_size, [(0,"dets"),(1,"nusamps_bin")]) + return f_bin, pxx_bin, bin_size + def fit_noise_model( aman, @@ -345,19 +512,24 @@ def fit_noise_model( f=None, pxx=None, psdargs=None, - fwhite=(10, 100), - lowf=1, + fknee_est=1, + wn_est=4E-5, + alpha_est=3.4, merge_fit=False, + f_min=None, f_max=100, merge_name="noise_fit_stats", merge_psd=True, + mask=False, + fixed_param=None, + binning=False, + unbinned_mode=3, + base=1.05, ): """ - Fits noise model with white and 1/f noise to the PSD of signal. - This uses a MLE method that minimizes a log likelihood. This is - better for chi^2 distributed data like the PSD. - - Reference: http://keatonb.github.io/archivers/powerspectrumfits + Fits noise model with white and 1/f noise to the PSD of binned signal. + This uses a least square method since the distrubition of binned points + is close to the standard distribution. Args ---- @@ -374,14 +546,20 @@ def fit_noise_model( Default is None which calculates f, pxx from signal. psdargs : dict Dictionary of optional argument for ``scipy.signal.welch`` - fwhite : tuple - Low and high frequency used to estimate white noise for initial - guess passed to ``scipy.signal.curve_fit``. - lowf : tuple - Frequency below which estimate of 1/f noise index and knee are estimated - for initial guess passed to ``scipy.signal.curve_fit``. + fknee_est : float or nparray + Initial estimation value of knee frequency. If it is given in array, + initial values are applied for each detector. + wn_est : float or nparray + Initial estimation value of white noise. If it is given in array, + initial values are applied for each detector. + alpha_est : float or nparray + Initial estimation value of alpha. If it is given in array, + initial values are applied for each detector. merge_fit : bool Merges fit and fit statistics into input axis manager. + f_min : float + Minimum frequency to include in the fitting. + Default is None which selects f_min as the second index of f. f_max : float Maximum frequency to include in the fitting. This is particularly important for lowpass filtered data such as that post demodulation @@ -389,7 +567,17 @@ def fit_noise_model( merge_name : bool If ``merge_fit`` is True then addes into axis manager with merge_name. merge_psd : bool - If ``merg_psd`` is True then adds fres and Pxx to the axis manager. + If ``merge_psd`` is True then adds fres and Pxx to the axis manager. + mask : bool + If ``mask`` is True then PSD is masked with ``aman.psd_mask``. + fixed_param : str + This accepts 'wn' or 'alpha' or None. If 'wn' ('alpha') is given, + white noise level (alpha) is fixed to the wn_est (alpha_est). + unbinned_mode : int + First Fourier modes up to this number are left un-binned. + base : float (> 1) + Base of the logspace bins. + Returns ------- noise_fit_stats : AxisManager @@ -413,35 +601,97 @@ def fit_noise_model( merge=merge_psd, **psdargs, ) - eix = np.argmin(np.abs(f - f_max)) - f = f[1:eix] - pxx = pxx[:, 1:eix] + if mask: + if 'psd_mask' in aman: + mask = ~aman.psd_mask.mask() + f = f[mask] + pxx = pxx[:, mask] + else: + print('"psd_mask" is not in aman. Masking is skipped.') + eix = np.argmin(np.abs(f - f_max)) + if f_min is None: + six = 1 + else: + six = np.argmin(np.abs(f - f_min)) + f = f[six:eix] + pxx = pxx[:, six:eix] + bin_size = 1 + # binning + if binning == True: + f, pxx, bin_size = get_binned_psd(aman, f=f, pxx=pxx, unbinned_mode=unbinned_mode, + base=base, merge=False) fitout = np.zeros((aman.dets.count, 3)) # This is equal to np.sqrt(np.diag(cov)) when doing curve_fit covout = np.zeros((aman.dets.count, 3, 3)) - for i in range(aman.dets.count): + if isinstance(wn_est, (int, float)): + wn_est = np.full(aman.dets.count, wn_est) + elif len(wn_est)!=aman.dets.count: + print('Size of wn_est must be equal to aman.dets.count or a single value.') + return + if isinstance(fknee_est, (int, float)): + fknee_est = np.full(aman.dets.count, fknee_est) + elif len(fknee_est)!=aman.dets.count: + print('Size of fknee_est must be equal to aman.dets.count or a single value.') + return + if isinstance(alpha_est, (int, float)): + alpha_est = np.full(aman.dets.count, alpha_est) + elif len(alpha_est)!=aman.dets.count: + print('Size of alpha_est must be equal to aman.dets.count or a single value.') + return + if fixed_param == None: + initial_params = np.array([wn_est, fknee_est, alpha_est]) + if fixed_param == "wn": + initial_params = np.array([fknee_est, alpha_est]) + fixed = wn_est + if fixed_param == "alpha": + initial_params = np.array([wn_est, fknee_est]) + fixed = alpha_est + + for i in range(len(pxx)): p = pxx[i] - wnest = np.median(p[((f > fwhite[0]) & (f < fwhite[1]))]) - pfit = np.polyfit(np.log10(f[f < lowf]), np.log10(p[f < lowf]), 1) - fidx = np.argmin(np.abs(10 ** np.polyval(pfit, np.log10(f)) - wnest)) - p0 = [f[fidx], wnest, -pfit[0]] - res = minimize(neglnlike, p0, args=(f, p), method="Nelder-Mead") + p0 = initial_params.T[i] + _fixed = {} + if fixed_param != None: + _fixed = {fixed_param: fixed[i]} + res = minimize(lambda params: neglnlike(params, f, p, bin_size=bin_size, **_fixed), + p0, method="Nelder-Mead") try: - Hfun = ndt.Hessian(lambda params: neglnlike(params, f, p), full_output=True) + Hfun = ndt.Hessian(lambda params: neglnlike(params, f, p, bin_size=bin_size, **_fixed), full_output=True) hessian_ndt, _ = Hfun(res["x"]) # Inverse of the hessian is an estimator of the covariance matrix # sqrt of the diagonals gives you the standard errors. - covout[i] = np.linalg.inv(hessian_ndt) + covout_i = np.linalg.inv(hessian_ndt) except np.linalg.LinAlgError: - covout[i] = np.full((3, 3), np.nan) - fitout[i] = res.x + print( + f"Cannot calculate Hessian for detector {aman.dets.vals[i]} skipping. (LinAlgError)" + ) + covout_i = np.full((len(p0), len(p0)), np.nan) + except IndexError: + print( + f"Cannot calculate Hessian for detector {aman.dets.vals[i]} skipping. (IndexError)" + ) + covout_i = np.full((len(p0), len(p0)), np.nan) + fitout_i = res.x + if fixed_param == "wn": + covout_i = np.insert(covout_i, 0, 0, axis=0) + covout_i = np.insert(covout_i, 0, 0, axis=1) + covout_i[0][0] = np.nan + fitout_i = np.insert(fitout_i, 0, wn_est[i]) + elif fixed_param == "alpha": + covout_i = np.insert(covout_i, 2, 0, axis=0) + covout_i = np.insert(covout_i, 2, 0, axis=1) + covout_i[2][2] = np.nan + fitout_i = np.insert(fitout_i, 2, alpha_est[i]) + covout[i] = covout_i + fitout[i] = fitout_i + + noise_model_coeffs = ["white_noise", "fknee", "alpha"] - noise_model_coeffs = ["fknee", "white_noise", "alpha"] noise_fit_stats = core.AxisManager( aman.dets, core.LabelAxis( - name="noise_model_coeffs", vals=np.array(noise_model_coeffs, dtype=" peak_freq + peak_width[0])&(f < peak_freq + peak_width[1]) + return mask + +def log_binning(psd, unbinned_mode=3, base=1.05, mask=None, + return_bin_size=False, drop_nan=False): + """ + Function to bin PSD or frequency. First several Fourier modes are left un-binned. + Fourier modes higher than that are averaged into logspace bins. + + Parameters + ---------- + psd : numpy.ndarray + PSD (or frequency) to be binned. Can be a 1D or 2D array. + unbinned_mode : int, optional + First Fourier modes up to this number are left un-binned. Defaults to 3. + base : float, optional + Base of the logspace bins. Must be greater than 1. Defaults to 1.05. + mask : numpy.ndarray, optional + Mask for psd. If all values in a bin are masked, the value becomes np.nan. + Should be a 1D array. + return_bin_size : bool, optional + If True, the number of data points in the bins are returned. Defaults to False. + drop_nan : bool, optional + If True, drop the indices where psd is NaN. Defaults to False. + + Returns + ------- + binned_psd : numpy.ndarray + The binned PSD. If the input is 2D, the output will also be 2D with the same number of rows. + bin_size : numpy.ndarray, optional + The number of data points in each bin, only returned if return_bin_size is True. + """ + if base <= 1: + raise ValueError("base must be greater than 1") + + is_1d = psd.ndim == 1 + + # Ensure psd is at least 2D for consistent processing + psd = np.atleast_2d(psd) + num_signals, num_samples = psd.shape + + if mask is not None: + # Ensure mask is at least 2D and has the same shape as psd + mask = np.atleast_2d(mask) + if mask.shape[1] != num_samples: + raise ValueError("Mask must have the same number of columns as psd") + psd = np.ma.masked_array(psd, mask=np.tile(mask, (num_signals, 1))) + + # Initialize the binned PSD and optionally the bin sizes + binned_psd = np.zeros((num_signals, unbinned_mode + 1)) + binned_psd[:, :unbinned_mode + 1] = psd[:, :unbinned_mode + 1] + bin_size = np.ones((num_signals, unbinned_mode + 1)) if return_bin_size else None + + # Determine the number of bins and their indices + N = int(np.ceil(np.emath.logn(base, num_samples - unbinned_mode))) + binning_idx = np.unique(np.logspace(base, N, N, base=base, dtype=int) + unbinned_mode - 1) + + # Bin the PSD values for each signal + new_binned_psd = [] + new_bin_size = [] + for start, end in zip(binning_idx[:-1], binning_idx[1:]): + bin_mean = np.nanmean(psd[:, start:end], axis=1) + new_binned_psd.append(bin_mean) + if return_bin_size: + new_bin_size.append(end - start) + + # Convert lists to numpy arrays and concatenate with initial values + new_binned_psd = np.array(new_binned_psd).T # Transpose to match dimensions + binned_psd = np.hstack([binned_psd, new_binned_psd]) + if return_bin_size: + new_bin_size = np.array(new_bin_size) + bin_size = np.hstack([bin_size, np.tile(new_bin_size, (num_signals, 1))]) + + if drop_nan: + valid_indices = ~np.isnan(binned_psd).any(axis=0) + binned_psd = binned_psd[:, valid_indices] + if return_bin_size: + bin_size = bin_size[:, valid_indices] + + if is_1d: + binned_psd = binned_psd.flatten() + if return_bin_size: + bin_size = bin_size.flatten() + + if return_bin_size: + return binned_psd, bin_size + return binned_psd + def build_hpf_params_dict( filter_name, @@ -526,4 +948,3 @@ def build_hpf_params_dict( filter_params = params_dict return filter_params - diff --git a/tests/test_tod_ops.py b/tests/test_tod_ops.py index c2bd93754..af8870b9c 100644 --- a/tests/test_tod_ops.py +++ b/tests/test_tod_ops.py @@ -9,7 +9,9 @@ import numpy as np import pylab as pl import scipy.signal - +from numpy.fft import rfftfreq,irfft +from numpy.fft import fftfreq,rfft +from scipy import interpolate from numpy.testing import assert_array_equal, assert_allclose from sotodlib import core, tod_ops, sim_flags @@ -298,11 +300,47 @@ def test_jumpfinder(self): class FFTTest(unittest.TestCase): def test_psd(self): - tod = get_tod("white") - f, Pxx = tod_ops.fft_ops.calc_psd(tod, nperseg=256) - self.assertEqual(len(f), 129) # nperseg/2 + 1 - f, Pxx = tod_ops.fft_ops.calc_psd(tod, freq_spacing=.1) - self.assertEqual(np.round(np.median(np.diff(f)), 1), .1) + fs = 200. + dets = core.LabelAxis('dets', [f'det{di:003}' for di in range(20)]) + nsamps = 200*3600 + + aman = core.AxisManager(dets) + ndets = aman.dets.count + + white_noise_amp_array_input = 50 + np.random.randn(ndets) #W/sqrt{Hz} + fknee_array_input = 1 + 0.1*np.random.randn(ndets) + alpha_array_input = 3 + 0.2*np.random.randn(ndets) + + freqs = rfftfreq(nsamps, d=1/fs) + params = [white_noise_amp_array_input[:, np.newaxis], + fknee_array_input[:, np.newaxis], + alpha_array_input[:, np.newaxis]] + pxx_input = tod_ops.fft_ops.noise_model(freqs, params) + + pxx_input[:, 0] = 0 + nusamps_input = core.OffsetAxis('nusamps_input', len(freqs)) + + T = nsamps/fs + ft_amps = np.sqrt(pxx_input * T * fs**2 / 2) + + ft_phases = np.random.uniform(0, 2*np.pi, size=ft_amps.shape) + ft_coefs = ft_amps * np.exp(1.0j*ft_phases) + realized_noise = irfft(ft_coefs) + timestamps = 1700000000 + np.arange(0, realized_noise.shape[1])/fs + aman.wrap('timestamps', timestamps, [(0, core.OffsetAxis('samps', len(timestamps)))]) + aman.wrap('signal', realized_noise, [(0, 'dets'), (1, 'samps')]) + + tod_ops.detrend_tod(aman) + freqs_output, Pxx_output = tod_ops.fft_ops.calc_psd(aman, nperseg=200*100) + fit_result = tod_ops.fft_ops.fit_noise_model(aman, wn_est=50, fknee_est=1.0, alpha_est=3.3, f_min=0.05, f_max=5, + binning=True, psdargs={'nperseg':200*1000}) + wnl_fit = fit_result.fit[:, 0] + fk_fit = fit_result.fit[:, 1] + alpha_fit = fit_result.fit[:, 2] + + self.assertTrue(np.abs(np.median(white_noise_amp_array_input - wnl_fit)) < 1) + self.assertTrue(np.abs(np.median(fknee_array_input - fk_fit)) < 0.1) + self.assertTrue(np.abs(np.median(alpha_array_input - alpha_fit)) < 0.1) if __name__ == '__main__': unittest.main()