Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to fitting for faint traces. #35

Open
wants to merge 3 commits into
base: same-block-cals
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
0.13.0 (2024-12-13)
-------------------
- Updated how we fit the profile center/width to better fit faint traces

0.12.0 (2024-12-11)
-------------------
- We now have prefer calibrations in the following order: same block, same proposal, any public calibration.
Expand Down
10 changes: 8 additions & 2 deletions banzai_floyds/matched_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def matched_filter_hessian(theta, data, error, weights_function, weights_jacobia


def optimize_match_filter(initial_guess, data, error, weights_function, x, weights_jacobian_function=None,
weights_hessian_function=None, args=None, minimize=False, bounds=None):
weights_hessian_function=None, args=None, minimize=False, bounds=None, covariance=False):
"""
Find the best fit parameters for a match filter model

Expand All @@ -329,6 +329,8 @@ def optimize_match_filter(initial_guess, data, error, weights_function, x, weigh
Any other static arguments that should be passed to the weights function.
minimize: Boolean
Minimize instead of maximize match filter signal?
covariance: Boolean
Return the covariance matrix of the fit?

Returns
-------
Expand Down Expand Up @@ -365,4 +367,8 @@ def optimize_match_filter(initial_guess, data, error, weights_function, x, weigh
hess=lambda *params: sign * matched_filter_hessian(*params),
jac=lambda *params: sign * matched_filter_jacobian(*params),
options={'eps': 1e-5}, bounds=bounds)
return best_fit.x

if covariance:
return best_fit.x, best_fit.hess_inv.todense()
else:
return best_fit.x
244 changes: 125 additions & 119 deletions banzai_floyds/profile.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import numpy as np
from astropy.modeling import fitting, models
from astropy.table import Table, vstack, join
from astropy.table import Table, vstack, join, Column
from numpy.polynomial.legendre import Legendre
from banzai_floyds.matched_filter import matched_filter_metric, optimize_match_filter
from banzai_floyds.utils.fitting_utils import fwhm_to_sigma, gauss
Expand All @@ -10,125 +9,133 @@
logger = get_logger()


def fit_profile_sigma(data, profile_fits, domains, poly_order=2, default_fwhm=6.0):
# In principle, this should be some big 2d fit where we fit the profile center, the profile width,
# and the background in one go
profile_width_table = {'wavelength': [], 'sigma': [], 'order': []}
fitter = fitting.LMLSQFitter()
for data_to_fit in data.groups:
wavelength_bin = data_to_fit['order_wavelength_bin'][0]
# Skip pixels that don't fall into a normal bin
if wavelength_bin == 0:
continue
order_id = data_to_fit['order'][0]
profile_center = profile_fits[order_id - 1](wavelength_bin)

peak = np.argmin(np.abs(profile_center - data_to_fit['y_order']))
peak_snr = (data_to_fit['data'][peak] - np.median(data_to_fit['data'])) / data_to_fit['uncertainty'][peak]
# Short circuit if the trace is not significantly brighter than the background in this bin
if peak_snr < 15.0:
continue

# Only fit the data close to the profile so that we can assume a low order background
peak_window = np.abs(data_to_fit['y_order'] - profile_center) <= 10.0 * fwhm_to_sigma(default_fwhm)

# Mask out any bad pixels
peak_window = np.logical_and(peak_window, data_to_fit['mask'] == 0)
# Pass a match filter (with correct s/n scaling) with a gaussian with a default width
initial_amplitude = np.max(data_to_fit['data'][peak_window])
initial_amplitude -= np.median(data_to_fit['data'][peak_window])

model = models.Gaussian1D(amplitude=initial_amplitude, mean=profile_center,
stddev=fwhm_to_sigma(default_fwhm))
model += models.Legendre1D(degree=2, domain=(np.min(data_to_fit['y_order'][peak_window]),
np.max(data_to_fit['y_order'][peak_window])),
c0=np.median(data_to_fit['data'][peak_window]))
# Force the profile center to be on the chip...(add 0.3 to pad the edge)
model.mean_0.min = np.min(data_to_fit['y_order'][peak_window]) + 0.3
model.mean_0.max = np.max(data_to_fit['y_order'][peak_window]) - 0.3

inv_variance = data_to_fit['uncertainty'][peak_window] ** -2.0

best_fit_model = fitter(model, x=data_to_fit['y_order'][peak_window],
y=data_to_fit['data'][peak_window], weights=inv_variance, maxiter=400,
acc=1e-4)
best_fit_sigma = best_fit_model.stddev_0.value

profile_width_table['wavelength'].append(wavelength_bin)
profile_width_table['sigma'].append(best_fit_sigma)
profile_width_table['order'].append(order_id)
profile_width_table = Table(profile_width_table)
# save the polynomial for the profile
profile_widths = [Legendre.fit(order_data['wavelength'], order_data['sigma'], deg=poly_order, domain=domain)
for order_data, domain in zip(profile_width_table.group_by('order').groups, domains)]
if len(profile_widths) != len(set(data['order'])):
for order in set(data['order']):
if order not in profile_width_table['order']:
missing_order = order
break
logger.warning(f'Data is too low of signal to noise in order {missing_order} to fit a profile width')
overlap_region = [max([np.min(data['wavelength'][data['order'] == order]) for order in set(data['order'])]),
min([np.max(data['wavelength'][data['order'] == order]) for order in set(data['order'])])]
in_overlap = np.logical_and(data['wavelength'] > overlap_region[0],
data['wavelength'] < overlap_region[1])
# This is always zero in the case of two orders because either the first or last is missing
average_sigma = profile_widths[0](np.mean(data['wavelength'][in_overlap]))
in_missing_order = data['order'] == missing_order
proxy_sigma_func = Legendre([average_sigma,], domain=[np.min(data['wavelength'][in_missing_order]),
np.max(data['wavelength'][in_missing_order])])
profile_widths.insert(missing_order - 1, proxy_sigma_func)
return profile_widths, profile_width_table
def profile_gauss_fixed_center(params, x, center, y_order, bkg_y_order, bkg_x_order, domain):
"""
Produce a guassian profile with a fixed center defined by the center polynomial.
Sigma will be set by a Legendre polynomial with coefficients in the params.
We include a polynomial background where the coefficients for y vary smoothly with wavelength.
The background is given by
Legendre([Legendre(wavelength), Legendre(wavelength)..])
"""
profile_params = params[:-(bkg_x_order + 1) * (bkg_y_order + 1)]
background_params = params[len(profile_params):]
background = np.zeros_like(x)
for y_i, x_j in enumerate(range(0, len(background_params), bkg_x_order + 1)):
unit_coeffs = np.zeros(bkg_y_order + 1)
unit_coeffs[y_i] = 1.0
background_term = Legendre(background_params[x_j:x_j+bkg_x_order+1], domain=domain)(x)
background_term *= Legendre(unit_coeffs, domain=domain)(y_order)
background += background_term
polynomial = Legendre(params, domain=domain)
return gauss(y_order, center, polynomial(x)) + background


def fit_profile_sigma(data, profile_centers, domains, poly_order=2, default_fwhm=6.0,
bkg_y_order=2, bkg_x_order=4):
profile_sigmas = []
for order_id, center, domain in zip([1, 2], profile_centers, domains):
order_data = data[np.logical_and(data['order'] == order_id, data['order_wavelength_bin'] != 0)]
order_data = order_data.group_by('order_wavelength_bin')
initial_guess = np.zeros(poly_order + 1 + (bkg_y_order + 1) * (bkg_x_order + 1))
initial_guess[0] = fwhm_to_sigma(default_fwhm)
# Start with the background only having a single linear term (no variation)
# and a value of 0.01
initial_guess[poly_order + 1] = 0.01
best_fit = optimize_match_filter(
initial_guess,
order_data['data'],
order_data['uncertainty'],
profile_gauss_fixed_center,
order_data['wavelength'],
args=(center, data['y_order'], bkg_y_order, bkg_x_order, domain),
)
profile_sigmas.append(Legendre(best_fit, domain=domain))
return profile_sigmas


def profile_gauss_fixed_width(params, x, sigma):
center, = params
return gauss(x, center, sigma)


def fit_profile_centers(data, domains, polynomial_order=5, profile_fwhm=6):
trace_points = Table({'wavelength': [], 'center': [], 'order': []})
for data_to_fit in data.groups:
# Skip pixels that don't fall into a normal bin
if data_to_fit['order_wavelength_bin'][0] == 0:
continue
# Pass a match filter (with correct s/n scaling) with a gaussian with a default width
# Find a rough estimate of the center by running across the whole slit (excluding 1-sigma on each side)
sigma = fwhm_to_sigma(profile_fwhm)
# Remove the edges of the slit because they do funny things numerically
upper_bound = np.max(data_to_fit['y_order']) - sigma
lower_bound = np.min(data_to_fit['y_order']) + sigma
non_edge = np.logical_and(data_to_fit['y_order'] > lower_bound, data_to_fit['y_order'] < upper_bound)
# Run a matched filter (don't fit yet) over all the centers
snrs = [matched_filter_metric([center,], data_to_fit['data'] - np.median(data_to_fit['data']),
data_to_fit['uncertainty'], profile_gauss_fixed_width, None, None,
data_to_fit['y_order'], sigma)
for center in data_to_fit['y_order'][non_edge]]

# If the peak pixel of the match filter is < 2 times the median (ish) move on
if np.max(snrs) < 2.0 * np.median(snrs):
continue

initial_guess = (data_to_fit['y_order'][non_edge][np.argmax(snrs)],)

# Put s/n check first
# stay at least 1 sigma away from the edges using fitting bounds

best_fit_center, = optimize_match_filter(initial_guess, data_to_fit['data'] - np.median(data_to_fit['data']),
data_to_fit['uncertainty'],
profile_gauss_fixed_width, data_to_fit['y_order'],
args=(fwhm_to_sigma(profile_fwhm),),
bounds=[(lower_bound, upper_bound,),])

new_trace_table = Table({'wavelength': [data_to_fit['order_wavelength_bin'][0]],
'center': [best_fit_center],
'order': [data_to_fit['order'][0]]})
trace_points = vstack([trace_points, new_trace_table])

# save the polynomial for the profile
trace_centers = [Legendre.fit(order_data['wavelength'], order_data['center'],
deg=polynomial_order, domain=domain)
for order_data, domain in zip(trace_points.group_by('order').groups, domains)]
def profile_fixed_width_full_order(params, x, sigma, y_order, domain):
polynomial = Legendre(params, domain=domain)
return gauss(y_order, polynomial(x), sigma)


def fit_profile_centers(data, domains, polynomial_order=5, profile_fwhm=6):
trace_points = Table({'wavelength': [], 'center': [], 'order': [], 'error': []})
trace_centers = []
for order_id, domain in zip([1, 2], domains):
order_data = data[np.logical_and(data['order'] == order_id, data['order_wavelength_bin'] != 0)]
order_data = order_data.group_by('order_wavelength_bin')
# Group the data in bins of 25 columns
for left_index, right_index in zip(order_data.groups.indices[0:-24:25],
order_data.groups.indices[25::25]):
data_to_fit = order_data[left_index: right_index]

# Pass a match filter (with correct s/n scaling) with a gaussian with a default width
# Find a rough estimate of the center by running across the whole slit (excluding 1-sigma on each side)
sigma = fwhm_to_sigma(profile_fwhm)
# Remove the edges of the slit because they do funny things numerically
upper_bound = np.max(data_to_fit['y_order']) - sigma
lower_bound = np.min(data_to_fit['y_order']) + sigma
non_edge = np.logical_and(data_to_fit['y_order'] > lower_bound, data_to_fit['y_order'] < upper_bound)
# Run a matched filter (don't fit yet) over all the centers
snrs = []
centers = np.arange(int(np.min(data_to_fit['y_order'][non_edge])),
int(np.max(data_to_fit['y_order'][non_edge])))
for center in centers:
metric = matched_filter_metric([center,],
data_to_fit['data'] - np.median(data_to_fit['data']),
data_to_fit['uncertainty'],
profile_gauss_fixed_width,
None,
None,
data_to_fit['y_order'], sigma)
snrs.append(metric)

initial_guess = (centers[np.argmax(snrs)],)

(best_fit_center,), covariance = optimize_match_filter(
initial_guess,
data_to_fit['data'] - np.median(data_to_fit['data']),
data_to_fit['uncertainty'],
profile_gauss_fixed_width,
data_to_fit['y_order'],
args=(fwhm_to_sigma(profile_fwhm),),
bounds=[(lower_bound, upper_bound,),],
covariance=True,
)
# Do an weighted sum that mirrors the fit to use for our wavelength
wavelength_point = np.sum(data_to_fit['wavelength'] * data_to_fit['uncertainty'] ** -2)
wavelength_point /= np.sum(data_to_fit['uncertainty'] ** -2)
new_trace_table = Table({'wavelength': [wavelength_point],
'center': [best_fit_center],
'order': [order_id],
'error': [np.sqrt(covariance[0, 0])]})
trace_points = vstack([trace_points, new_trace_table])
this_order = trace_points['order'] == order_id
initial_order_polynomial = Legendre.fit(trace_points['wavelength'][this_order],
trace_points['center'][this_order],
w=trace_points['error'][this_order] ** -2,
deg=polynomial_order, domain=domain)

# refine the fit using a continuous fit for the whole detector
simple_background = order_data['data'].groups.aggregate(np.median)
simple_background.name = 'background'
wavelength_column = Column(order_data['order_wavelength_bin'][order_data.groups.indices[:-1]],
name='order_wavelength_bin')
order_data = join(order_data, Table([simple_background, wavelength_column]), keys='order_wavelength_bin')
best_fit = optimize_match_filter(
initial_order_polynomial.coef,
order_data['data'] - order_data['background'],
order_data['uncertainty'],
profile_fixed_width_full_order,
order_data['wavelength'],
args=(fwhm_to_sigma(profile_fwhm), order_data['y_order'], domain),
)
trace_centers.append(Legendre(best_fit, domain=domain))
return trace_centers, trace_points


Expand All @@ -137,13 +144,12 @@ class ProfileFitter(Stage):

def do_stage(self, image):
logger.info('Fitting profile centers', image=image)
profile_centers, center_fitted_points = fit_profile_centers(image.binned_data,
image.wavelengths.wavelength_domains,
polynomial_order=self.POLYNOMIAL_ORDER)
profile_centers, fitted_points = fit_profile_centers(image.binned_data,
image.wavelengths.wavelength_domains,
polynomial_order=self.POLYNOMIAL_ORDER)
logger.info('Fitting profile widths', image=image)
profile_widths, width_fitted_points = fit_profile_sigma(image.binned_data, profile_centers,
image.wavelengths.wavelength_domains)
profile_widths = fit_profile_sigma(image.binned_data, profile_centers,
image.wavelengths.wavelength_domains)
logger.info('Storing profile fits', image=image)
fitted_points = join(center_fitted_points, width_fitted_points, join_type='inner')
image.profile = profile_centers, profile_widths, fitted_points
return image
4 changes: 2 additions & 2 deletions banzai_floyds/tests/test_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_tracing():
binned_data = bin_data(fake_frame.data, fake_frame.uncertainty, fake_frame.wavelengths,
fake_frame.orders)
domains = [center.domain for center in fake_frame.input_profile_centers]
fitted_profile_centers, measured_points = fit_profile_centers(binned_data, domains, profile_fwhm=4)
fitted_profile_centers, fitted_points = fit_profile_centers(binned_data, domains, profile_fwhm=4)
for fitted_center, input_center in zip(fitted_profile_centers, fake_frame.input_profile_centers):
x = np.arange(fitted_center.domain[0], fitted_center.domain[1] + 1)
np.testing.assert_allclose(fitted_center(x), input_center(x), atol=0.025, rtol=0.02)
Expand All @@ -23,7 +23,7 @@ def test_profile_width_fitting():
binned_data = bin_data(fake_frame.data, fake_frame.uncertainty, fake_frame.wavelengths,
fake_frame.orders)
domains = [center.domain for center in fake_frame.input_profile_centers]
fitted_widths, measured_points = fit_profile_sigma(binned_data, fake_frame.input_profile_centers, domains)
fitted_widths = fit_profile_sigma(binned_data, fake_frame.input_profile_centers, domains)
for fitted_width in fitted_widths:
x = np.arange(fitted_width.domain[0], fitted_width.domain[1] + 1)
np.testing.assert_allclose(fitted_width(x), fake_frame.input_profile_sigma, rtol=0.03)
Loading