-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New functions for fft_ops #869
base: master
Are you sure you want to change the base?
Changes from 3 commits
7969bed
5e88252
3fc19ae
307ab16
3c96965
9575bf9
81fbde8
5bbde3d
8116be2
f5673c8
912ac2e
97d2127
63b9608
c946071
65f4125
f138b70
15a83a9
d3eaabd
b57326e
57f5c24
3d02df5
91c4381
f10e0f0
db40980
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -269,12 +269,13 @@ def calc_psd( | |
|
||
freqs, Pxx = welch(signal[:, start:stop], fs, **kwargs) | ||
if merge: | ||
aman.merge( core.AxisManager(core.OffsetAxis("nusamps", len(freqs)))) | ||
if overwrite: | ||
|
||
if "freqs" in aman._fields: | ||
aman.move("freqs", None) | ||
if "Pxx" in aman._fields: | ||
aman.move("Pxx", None) | ||
aman.merge( core.AxisManager(core.OffsetAxis("nusamps", len(freqs)))) | ||
aman.wrap("freqs", freqs, [(0,"nusamps")]) | ||
aman.wrap("Pxx", Pxx, [(0,"dets"),(1,"nusamps")]) | ||
return freqs, Pxx | ||
|
@@ -338,6 +339,142 @@ def neglnlike(params, x, y): | |
return 1.0e30 | ||
return output | ||
|
||
def calc_masked_psd( | ||
aman, | ||
f=None, | ||
pxx=None, | ||
hwpss=False, | ||
hwp_freq=None, | ||
f_max=100, | ||
width_for_1f=(-0.5, +0.7), | ||
width_for_Nf=(-0.3, +0.3), | ||
peak=False, | ||
peak_freq=None, | ||
peak_width=(-0.002, +0.002), | ||
merge=False, | ||
overwrite=True, | ||
): | ||
""" | ||
Function that masks hwpss or single peak in PSD. | ||
|
||
Arguments | ||
--------- | ||
aman : AxisManager | ||
Axis manager which has samps axis aligned with signal. | ||
f : nparray | ||
Frequency of PSD of signal. | ||
pxx : nparray | ||
PSD of signal. | ||
hwpss : bool | ||
If True, hwpss are masked. | ||
hwp_freq : float | ||
HWP frequency. | ||
f_max : float | ||
Maximum frequency to include in the hwpss masking. | ||
width_for_1f : tuple | ||
Range to mask 1f hwpss. The default masked range will be | ||
(hwp_freq - 0.4) < f < (hwp_freq + 0.6). | ||
Usually 1f hwpss distrubites wider than other hwpss. | ||
width_for_Nf : tuple | ||
Range to mask Nf hwpss. The default masked range will be | ||
(hwp_freq*N - 0.2) < f < (hwp_freq*N + 0.2). | ||
peak : bool | ||
If True, single peak is masked. | ||
peak_freq : float | ||
Center frequency of the mask. | ||
peak_width : tuple | ||
Range to mask signal. The default masked range will be | ||
(peak_freq - 0.002) < f < (peak_freq + 0.002). | ||
merge : bool | ||
if True merge results into axismanager. | ||
overwrite: bool | ||
if true will overwrite f, pxx axes. You cannot overwrite with data | ||
longer than before. | ||
|
||
Returns | ||
------- | ||
p_mask: MaskedArray of the input PSD. p_mask.data returns the PSD before masking, | ||
while p_mask.mask returns the bool array of the mask. | ||
""" | ||
if f is None or pxx is None: | ||
f = aman.freqs | ||
pxx = aman.Pxx | ||
if hwpss: | ||
pxx_masked = [] | ||
if hwp_freq is None: | ||
hwp_freq = np.median(aman['hwp_solution']['raw_approx_hwp_freq_1'][1]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use the code I suggested above. |
||
mask_idx = hwpss_mask(f, hwp_freq, f_max=f_max, width_for_1f=width_for_1f, width_for_Nf=width_for_Nf) | ||
f = f[mask_idx] | ||
pxx = pxx[:, mask_idx] | ||
if peak: | ||
mask_idx = peak_mask(f, peak_freq, peak_width=peak_width) | ||
f = f[mask_idx] | ||
pxx = pxx[:, mask_idx] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not good to discard noise spectrum. |
||
if merge: | ||
aman.merge( core.AxisManager(core.OffsetAxis("nusamps", len(f)))) | ||
if overwrite: | ||
if "freqs" in aman._fields: | ||
aman.move("freqs", None) | ||
if "Pxx" in aman._fields: | ||
aman.move("Pxx", None) | ||
aman.wrap("freqs", f, [(0,"nusamps")]) | ||
aman.wrap("Pxx", pxx, [(0,"dets"),(1,"nusamps")]) | ||
return f, pxx | ||
|
||
def calc_binned_psd( | ||
aman, | ||
f=None, | ||
pxx=None, | ||
unbinned_mode=10, | ||
base=2, | ||
merge=False, | ||
overwrite=True, | ||
): | ||
""" | ||
Function that masks hwpss in PSD. | ||
|
||
Arguments | ||
--------- | ||
aman : AxisManager | ||
Axis manager which has samps axis aligned with signal. | ||
f : nparray | ||
Frequency of PSD of signal. | ||
pxx : nparray | ||
PSD of signal. | ||
hwpss : bool | ||
If True, hwpss are masked. | ||
unbinned_mode : int | ||
First Fourier modes up to this number are left un-binned. | ||
base : float (> 1) | ||
Base of the logspace bins. | ||
merge : bool | ||
if True merge results into axismanager. | ||
overwrite: bool | ||
if true will overwrite f, pxx axes. | ||
|
||
Returns | ||
------- | ||
p_mask: MaskedArray of the input PSD. p_mask.data returns the PSD before masking, | ||
while p_mask.mask returns the bool array of the mask. | ||
""" | ||
if f is None or pxx is None: | ||
f = aman.freqs | ||
pxx = aman.Pxx | ||
f_binned = binning_psd(f, unbinned_mode=unbinned_mode, base=base) | ||
pxx_binned = [] | ||
for i in range(aman.dets.count): | ||
pxx_binned.append(binning_psd(pxx[i], unbinned_mode=unbinned_mode, base=base)) | ||
pxx_binned = np.array(pxx_binned) | ||
if merge: | ||
aman.merge( core.AxisManager(core.OffsetAxis("nusamps_bin", len(f_binned)))) | ||
if overwrite: | ||
if "freqs_bin" in aman._fields: | ||
aman.move("freqs_bin", None) | ||
if "Pxx_bin" in aman._fields: | ||
aman.move("Pxx_bin", None) | ||
aman.wrap("freqs_bin", f_binned, [(0,"nusamps_bin")]) | ||
aman.wrap("Pxx_bin", pxx_binned, [(0,"dets"),(1,"nusamps_bin")]) | ||
return f_binned, pxx_binned | ||
|
||
def fit_noise_model( | ||
aman, | ||
|
@@ -348,6 +485,7 @@ def fit_noise_model( | |
fwhite=(10, 100), | ||
lowf=1, | ||
merge_fit=False, | ||
f_min=None, | ||
f_max=100, | ||
merge_name="noise_fit_stats", | ||
merge_psd=True, | ||
|
@@ -382,6 +520,9 @@ def fit_noise_model( | |
for initial guess passed to ``scipy.signal.curve_fit``. | ||
merge_fit : bool | ||
Merges fit and fit statistics into input axis manager. | ||
f_min : float | ||
Minimum frequency to include in the fitting. | ||
Default is None which selects f_min as the second index of f. | ||
f_max : float | ||
Maximum frequency to include in the fitting. This is particularly | ||
important for lowpass filtered data such as that post demodulation | ||
|
@@ -414,9 +555,12 @@ def fit_noise_model( | |
**psdargs, | ||
) | ||
eix = np.argmin(np.abs(f - f_max)) | ||
f = f[1:eix] | ||
pxx = pxx[:, 1:eix] | ||
|
||
if f_min is None: | ||
six = 1 | ||
else: | ||
six = np.argmin(np.abs(f - f_min)) | ||
f = f[six:eix] | ||
pxx = pxx[:, six:eix] | ||
fitout = np.zeros((aman.dets.count, 3)) | ||
# This is equal to np.sqrt(np.diag(cov)) when doing curve_fit | ||
covout = np.zeros((aman.dets.count, 3, 3)) | ||
|
@@ -438,6 +582,11 @@ def fit_noise_model( | |
f"Cannot calculate Hessian for detector {aman.dets.vals[i]} skipping." | ||
) | ||
covout[i] = np.full((3, 3), np.nan) | ||
except IndexError: | ||
print( | ||
f"Cannot fit 1/f curve for detector {aman.dets.vals[i]} skipping." | ||
) | ||
covout[i] = np.full((3, 3), np.nan) | ||
fitout[i] = res.x | ||
|
||
noise_model_coeffs = ["fknee", "white_noise", "alpha"] | ||
|
@@ -457,3 +606,105 @@ def fit_noise_model( | |
if merge_fit: | ||
aman.wrap(merge_name, noise_fit_stats) | ||
return noise_fit_stats | ||
|
||
def hwpss_mask(f, hwp_freq, f_max=100, width_for_1f=(-0.4, +0.6), width_for_Nf=(-0.2, +0.2)): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can implement it more straight forward. def get_mask_for_hwpss(freq, hwp_freq, modes=[1,2,3,4,5,6], width=0.4):
if isinstance(width, float):
"apply (-width/2, width/2) for all nf"
elif isinstance(width, [np.array, list, tuple]):
width = np.array(width)
if len(width.shape) == 1:
"apply symmetric mask for each nf"
elif len(width.shape) == 2:
"apply asymmetric mask for each nf" |
||
""" | ||
Function that returns boolean array to mask hwpss in PSD. | ||
|
||
Arguments | ||
--------- | ||
f : nparray | ||
Frequency of PSD of signal. | ||
|
||
hwp_freq : float | ||
HWP frequency. | ||
|
||
f_max : float | ||
Maximum frequency to include in the fitting. | ||
|
||
width_for_1f : tuple | ||
Range to mask 1f hwpss. The default masked range will be | ||
(hwp_freq - 0.4) < f < (hwp_freq + 0.6). | ||
Usually 1f hwpss distrubites wider than other hwpss. | ||
width_for_Nf : tuple | ||
Range to mask Nf hwpss. The default masked range will be | ||
(hwp_freq*N - 0.2) < f < (hwp_freq*N + 0.2). | ||
|
||
Returns | ||
------- | ||
mask: Boolean array to mask frequency and power of the given PSD. | ||
False in this array stands for the index of hwpss to mask. | ||
""" | ||
mask_arrays = [((f < hwp_freq + width_for_1f[0])|(f > hwp_freq + width_for_1f[1]))] | ||
for n in range(int(f_max//hwp_freq-1)): | ||
mask_arrays.append(((f < hwp_freq*(n+2) + width_for_Nf[0])|(f > hwp_freq*(n+2) + width_for_Nf[1]))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use |
||
mask = np.all(np.array(mask_arrays), axis=0) | ||
return mask | ||
|
||
def peak_mask(f, peak_freq, peak_width=(-0.002, +0.002)): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
""" | ||
Function that returns boolean array to masks single peak (e.g. scan synchronous signal) in PSD. | ||
|
||
Arguments | ||
--------- | ||
f : nparray | ||
Frequency of PSD of signal. | ||
|
||
peak_freq : float | ||
Center frequency of the mask. | ||
|
||
peak_width : tuple | ||
Range to mask signal. The default masked range will be | ||
(peak_freq - 0.002) < f < (peak_freq + 0.002). | ||
|
||
Returns | ||
------- | ||
mask: Boolean array to mask the given PSD. | ||
False in this array stands for the index of the single peak to mask. | ||
""" | ||
mask = (f < peak_freq + peak_width[0])|(f > peak_freq + peak_width[1]) | ||
return mask | ||
|
||
def binning_psd(psd, unbinned_mode=10, base=2, drop_nan=False): | ||
""" | ||
Function to bin PSD. | ||
First several Fourier modes are left un-binned. | ||
Fourier modes higher than that are averaged into logspace bins. | ||
|
||
Arguments | ||
--------- | ||
psd : nparray | ||
PSD of signal. | ||
|
||
unbinned_mode : int | ||
First Fourier modes up to this number are left un-binned. | ||
|
||
base : float (> 1) | ||
Base of the logspace bins. | ||
|
||
drop_nan : bool | ||
If True, drop the index where p is nan. | ||
|
||
Returns | ||
------- | ||
binned_psd: nparray | ||
Binned PSD. | ||
""" | ||
binned_psd = [] | ||
for i in range(0, unbinned_mode+1): | ||
binned_psd.append(psd[i]) | ||
N = int(np.emath.logn(base, len(psd)-unbinned_mode)) | ||
binning_idx = np.logspace(base, N, N, base=base, dtype = int)+unbinned_mode-1 | ||
if base >= 2: | ||
for i in range(N-1): | ||
binned_psd.append(np.mean(psd[binning_idx[i]:binning_idx[i+1]])) | ||
else: | ||
for i in range(N-1): | ||
if binning_idx[i] == binning_idx[i+1]: | ||
continue | ||
else: | ||
binned_psd.append(np.mean(psd[binning_idx[i]:binning_idx[i+1]])) | ||
binned_psd = np.array(binned_psd) | ||
if drop_nan: | ||
binned_psd = binned_psd[~np.isnan(binned_psd)] | ||
return binned_psd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you implement below and use here and elsewhere?