Skip to content

Commit

Permalink
Merge pull request #95 from punch-mission/polarized_stars
Browse files Browse the repository at this point in the history
added synthetic polarized starfield
  • Loading branch information
jmbhughes authored Nov 24, 2024
2 parents 454d7d9 + 92d05d8 commit 598392a
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ exclude: "\\.asdf$"
repos:
# This should be before any formatting hooks like isort
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.7.4"
rev: "v0.8.0"
hooks:
- id: ruff
args: ["--fix"]
Expand Down
49 changes: 29 additions & 20 deletions simpunch/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
from sunpy.coordinates import sun
from tqdm import tqdm

from simpunch.level2 import add_starfield_clear
from simpunch.level2 import add_starfield_clear, add_starfield_polarized
from simpunch.util import update_spacecraft_location

CURRENT_DIR = os.path.dirname(__file__)


def generate_spacecraft_wcs(spacecraft_id: str, rotation_stage: int, time: astropy.time.Time) -> WCS:
"""Generate the spacecraft world coordinate system."""
angle_step = 30
Expand All @@ -46,7 +47,7 @@ def generate_spacecraft_wcs(spacecraft_id: str, rotation_stage: int, time: astro
out_wcs.wcs.cdelt = 88 / 3600 * 0.9, 88 / 3600 * 0.9

out_wcs.wcs.pc = calculate_pc_matrix(angle_wfi, out_wcs.wcs.cdelt)
out_wcs.wcs.set_pv([(2, 1, (-sun.earth_distance(time)/sun.constants.radius).decompose().value)])
out_wcs.wcs.set_pv([(2, 1, (-sun.earth_distance(time) / sun.constants.radius).decompose().value)])

out_wcs.wcs.ctype = "HPLN-AZP", "HPLT-AZP"
out_wcs.wcs.cunit = "deg", "deg"
Expand Down Expand Up @@ -133,16 +134,16 @@ def deproject_clear(input_data: NDCube, output_wcs: WCS, adaptive_reprojection:

if adaptive_reprojection:
reprojected_data[:, :] = reproject.reproject_adaptive((input_data.data[:, :],
reconstructed_wcs),
output_wcs,
(2048, 2048),
roundtrip_coords=False, return_footprint=False,
kernel="Gaussian", boundary_mode="ignore")
reconstructed_wcs),
output_wcs,
(2048, 2048),
roundtrip_coords=False, return_footprint=False,
kernel="Gaussian", boundary_mode="ignore")
else:
reprojected_data[:, :] = reproject.reproject_interp((input_data.data[:, :],
reconstructed_wcs),
output_wcs, (2048, 2048),
roundtrip_coords=False, return_footprint=False)
reconstructed_wcs),
output_wcs, (2048, 2048),
roundtrip_coords=False, return_footprint=False)

reprojected_data[np.isnan(reprojected_data)] = 0

Expand All @@ -159,16 +160,16 @@ def remix_polarization(input_data: NDCube) -> NDCube:
# Unpack data into a NDCollection object
w = WCS(naxis=2)
data_collection = NDCollection([("M", NDCube(data=input_data.data[0], wcs=w, meta={})),
("Z", NDCube(data=input_data.data[1], wcs=w, meta={})),
("P", NDCube(data=input_data.data[2], wcs=w, meta={}))])
("Z", NDCube(data=input_data.data[1], wcs=w, meta={})),
("P", NDCube(data=input_data.data[2], wcs=w, meta={}))])

data_collection["M"].meta["POLAR"] = -60. * u.degree
data_collection["Z"].meta["POLAR"] = 0. * u.degree
data_collection["P"].meta["POLAR"] = 60. * u.degree

# TODO - Remember that this needs to be the instrument frame MZP, not the mosaic frame
resolved_data_collection = solpolpy.resolve(data_collection, "npol",
out_angles=[-60, 0, 60]*u.deg, imax_effect=False)
out_angles=[-60, 0, 60] * u.deg, imax_effect=False)

# Repack data
data_list = []
Expand Down Expand Up @@ -281,13 +282,21 @@ def generate_l1_pmzp(input_file: str, path_output: str, rotation_stage: int, spa
output_pdata.meta["POLAR"] = 60

# Add distortion
# output_mdata = add_distortion(output_mdata) # noqa: ERA001
# output_zdata = add_distortion(output_zdata) # noqa: ERA001
# output_pdata = add_distortion(output_pdata) # noqa: ERA001

output_mdata = add_starfield_clear(output_mdata)
output_zdata = add_starfield_clear(output_zdata)
output_pdata = add_starfield_clear(output_pdata)
output_mdata = add_distortion(output_mdata)
output_zdata = add_distortion(output_zdata)
output_pdata = add_distortion(output_pdata)

# Add polarized starfield
output_collection = NDCollection(
[("-60.0 deg", output_mdata),
("0.0 deg", output_zdata),
("60.0 deg", output_pdata)],
aligned_axes="all")

output_mzp = add_starfield_polarized(output_collection)
output_mdata = output_mzp["M"]
output_zdata = output_mzp["Z"]
output_pdata = output_mzp["P"]

output_pdata = update_spacecraft_location(output_pdata, output_pdata.meta.astropy_time)
output_mdata = update_spacecraft_location(output_mdata, output_mdata.meta.astropy_time)
Expand Down
108 changes: 94 additions & 14 deletions simpunch/level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
PTM - PUNCH Level-2 Polarized (MZP) Mosaic
"""
import copy
import glob
import os
from math import floor

import astropy.time
import astropy.units as u
import numpy as np
import reproject
import solpolpy
from astropy.modeling.models import Gaussian2D
from astropy.table import QTable
Expand All @@ -19,7 +22,7 @@
from prefect_dask import DaskTaskRunner
from punchbowl.data import (NormalizedMetadata, get_base_file_name,
load_ndcube_from_fits, write_ndcube_to_fits)
from punchbowl.data.wcs import calculate_celestial_wcs_from_helio
from punchbowl.data.wcs import calculate_celestial_wcs_from_helio, get_p_angle

from simpunch.stars import (filter_for_visible_stars, find_catalog_in_image,
load_raw_hipparcos_catalog)
Expand All @@ -38,10 +41,10 @@ def get_fcorona_parameters(date_obs: astropy.time.Time) -> dict[str, float]:


def generate_fcorona(shape: (int, int),
tilt_angle: float = 3*u.deg,
tilt_angle: float = 3 * u.deg,
a: float = 600.,
b: float = 300.,
tilt_offset: tuple[float] = (0,0)) -> np.ndarray:
tilt_offset: tuple[float] = (0, 0)) -> np.ndarray:
"""Generate an F corona model."""
fcorona = np.zeros(shape)

Expand Down Expand Up @@ -78,7 +81,7 @@ def generate_fcorona(shape: (int, int),
for i in np.arange(fcorona.shape[0]):
fcorona[i, :, :] = fcorona_profile[:, :]
else:
fcorona[:,:] = fcorona_profile[:,:]
fcorona[:, :] = fcorona_profile[:, :]

return fcorona

Expand Down Expand Up @@ -135,22 +138,98 @@ def generate_starfield(wcs: WCS,
return fake_image, sources


def add_starfield_polarized(input_data: NDCube) -> NDCube:
"""Add synthetic starfield."""
def generate_dummy_polarization(map_scale: float = 0.225,
pol_factor: float = 0.5) -> NDCube:
"""Create a synthetic polarization map."""
shape = [int(floor(180 / map_scale)), int(floor(360 / map_scale))]
xcoord = np.linspace(-pol_factor, pol_factor, shape[1])
ycoord = np.linspace(-pol_factor, pol_factor, shape[0])
xin, yin = np.meshgrid(xcoord, ycoord)
zin = pol_factor - (xin ** 2 + yin ** 2)

wcs_sky = WCS(naxis=2)
wcs_sky.wcs.crpix = [shape[1] / 2 + .5, shape[0] / 2 + .5]
wcs_sky.wcs.cdelt = np.array([map_scale, map_scale])
wcs_sky.wcs.crval = [180.0, 0.0]
wcs_sky.wcs.ctype = ["RA---CAR", "DEC--CAR"]
wcs_sky.wcs.cunit = "deg", "deg"

return NDCube(data=zin, wcs=wcs_sky)


def add_starfield_polarized(input_collection: NDCollection, polfactor: tuple = (0.2, 0.3, 0.5)) -> NDCollection:
"""Add synthetic polarized starfield."""
input_data = input_collection["Z"]
wcs_stellar_input = calculate_celestial_wcs_from_helio(input_data.wcs,
input_data.meta.astropy_time,
input_data.data.shape)

starfield, stars = generate_starfield(wcs_stellar_input, input_data.data[0, :, :].shape, flux_set=2.0384547E-9,
fwhm=3, dimmest_magnitude=12, noise_mean=None, noise_std=None)
starfield, stars = generate_starfield(wcs_stellar_input, input_data.data.shape,
flux_set=2.0384547E-9, fwhm=3, dimmest_magnitude=12,
noise_mean=None, noise_std=None)

starfield_data = np.zeros(input_data.data.shape)
for i in range(starfield_data.shape[0]):
starfield_data[i, :, :] = starfield * (np.logical_not(np.isclose(input_data.data[i, :, :], 0, atol=1E-18)))
starfield_data[:, :] = starfield * (np.logical_not(np.isclose(input_data.data, 0, atol=1E-18)))

input_data.data[...] = input_data.data[...] + starfield_data
# Converting the input data polarization to celestial basis
mzp_angles = ([input_cube.meta["POLAR"].value for label, input_cube in input_collection.items() if
label != "alpha"]) * u.degree
cel_north_off = get_p_angle(time=input_collection["Z"].meta["DATE-OBS"].value)
new_angles = (mzp_angles + cel_north_off).value * u.degree
# check - or + ? confirm!

return input_data
valid_keys = [key for key in input_collection if key != "alpha"]

meta_a = dict(NormalizedMetadata.to_fits_header(input_collection[valid_keys[0]].meta,
wcs=input_collection[valid_keys[0]].wcs))
meta_b = dict(NormalizedMetadata.to_fits_header(input_collection[valid_keys[1]].meta,
wcs=input_collection[valid_keys[1]].wcs))
meta_c = dict(NormalizedMetadata.to_fits_header(input_collection[valid_keys[2]].meta,
wcs=input_collection[valid_keys[2]].wcs))

meta_a["POLAR"] = meta_a["POLAR"] * u.degree
meta_b["POLAR"] = meta_b["POLAR"] * u.degree
meta_c["POLAR"] = meta_c["POLAR"] * u.degree

data_collection = NDCollection(
[(str(valid_keys[0]), NDCube(data=input_collection[valid_keys[0]].data,
meta=meta_a, wcs=input_collection[valid_keys[0]].wcs)),
(str(valid_keys[1]), NDCube(data=input_collection[valid_keys[1]].data,
meta=meta_b, wcs=input_collection[valid_keys[1]].wcs)),
(str(valid_keys[2]), NDCube(data=input_collection[valid_keys[2]].data,
meta=meta_c, wcs=input_collection[valid_keys[2]].wcs))],
aligned_axes="all")

input_data_cel = solpolpy.resolve(data_collection, "npol", reference_angle=0 * u.degree, out_angles=new_angles)
valid_keys = [key for key in input_data_cel if key != "alpha"]

for k, key in enumerate(valid_keys):
dummy_polarmap = generate_dummy_polarization(pol_factor=polfactor[k])
# Extract ROI corresponding to input wcs
polar_roi = reproject.reproject_adaptive(
(dummy_polarmap.data, dummy_polarmap.wcs), wcs_stellar_input, input_data.data.shape,
roundtrip_coords=False, return_footprint=False, x_cyclic=True,
conserve_flux=True, center_jacobian=True, despike_jacobian=True)
input_data_cel[key].data[...] = input_data_cel[key].data + polar_roi * starfield_data

mzp_data_instru = solpolpy.resolve(input_data_cel, "mzpinstru", reference_angle=0 * u.degree) # Instrument MZP
### WCS is in celestial here?
valid_keys = [key for key in mzp_data_instru if key != "alpha"]
out_meta = {"M": copy.deepcopy(input_collection["M"].meta),
"Z": copy.deepcopy(input_collection["Z"].meta),
"P": copy.deepcopy(input_collection["P"].meta)}
for out_pol, meta_item in out_meta.items():
for key, kind in zip(["POLAR", "POLARREF", "POLAROFF"], [int, str, float], strict=False):
if isinstance(mzp_data_instru[out_pol].meta[key], u.Quantity):
meta_item[key] = kind(mzp_data_instru[out_pol].meta[key].value)
else:
meta_item[key] = kind(mzp_data_instru[out_pol].meta[key])

return NDCollection(
[(str(key), NDCube(data=mzp_data_instru[key].data,
meta=out_meta[key],
wcs=mzp_data_instru[key].wcs)) for key in valid_keys],
aligned_axes="all")


def add_starfield_clear(input_data: NDCube) -> NDCube:
Expand Down Expand Up @@ -178,7 +257,7 @@ def remix_polarization(input_data: NDCube) -> NDCube:
("pB", NDCube(data=input_data.data[1], wcs=input_data.wcs))],
aligned_axes="all")

resolved_data_collection = solpolpy.resolve(data_collection, "MZP", imax_effect=False)
resolved_data_collection = solpolpy.resolve(data_collection, "mzpsolar", imax_effect=False)

# Repack data
data_list = []
Expand All @@ -205,6 +284,7 @@ def remix_polarization(input_data: NDCube) -> NDCube:

return NDCube(data=new_data, wcs=new_wcs, uncertainty=new_uncertainty, meta=input_data.meta)


@task
def generate_l2_ptm(input_file: str, path_output: str) -> None:
"""Generate level 2 PTM synthetic data."""
Expand All @@ -221,7 +301,7 @@ def generate_l2_ptm(input_file: str, path_output: str) -> None:
# Synchronize overlapping metadata keys
output_header = output_meta.to_fits_header(output_wcs)
for key in output_header:
if (key in input_pdata.meta) and output_header[key] == "" and key not in ("COMMENT", "HISTORY"):
if (key in input_pdata.meta) and output_header[key] == "" and key not in ("COMMENT", "HISTORY"):
output_meta[key].value = input_pdata.meta[key].value
output_meta["DESCRPTN"] = "Simulated " + output_meta["DESCRPTN"].value
output_meta["TITLE"] = "Simulated " + output_meta["TITLE"].value
Expand Down
64 changes: 59 additions & 5 deletions simpunch/tests/test_starfield.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
import pytest
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS
from ndcube import NDCube
from ndcube import NDCollection, NDCube
from punchbowl.data import NormalizedMetadata

from simpunch.level2 import add_starfield_clear
from simpunch.level2 import add_starfield_clear, add_starfield_polarized


@pytest.fixture
def sample_ndcube() -> NDCube:
def _sample_ndcube(shape: tuple, code:str = "PM1", level:str = "0") -> NDCube:
def _sample_ndcube(shape: tuple, code: str = "PM1", level: str = "0") -> NDCube:
data = np.random.random(shape).astype(np.float32)
sqrt_abs_data = np.sqrt(np.abs(data))
uncertainty = StdDevUncertainty(np.interp(sqrt_abs_data, (sqrt_abs_data.min(), sqrt_abs_data.max()),
(0,1)).astype(np.float32))
(0, 1)).astype(np.float32))
wcs = WCS(naxis=2)
wcs.wcs.ctype = "HPLN-ARC", "HPLT-ARC"
wcs.wcs.cunit = "deg", "deg"
Expand All @@ -29,13 +29,67 @@ def _sample_ndcube(shape: tuple, code:str = "PM1", level:str = "0") -> NDCube:
meta["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1))
meta["FILEVRSN"] = "1"
return NDCube(data=data, uncertainty=uncertainty, wcs=wcs, meta=meta)

return _sample_ndcube


@pytest.fixture
def sample_ndcollection() -> callable:
def _create_sample_ndcollection(shape: tuple) -> NDCollection:
wcs = WCS(naxis=2)
wcs.wcs.ctype = ["HPLN-ARC", "HPLT-ARC"]
wcs.wcs.cunit = ["deg", "deg"]
wcs.wcs.cdelt = [0.1, 0.1]
wcs.wcs.crpix = [shape[1] // 2, shape[0] // 2]
wcs.wcs.crval = [1, 1]
wcs.wcs.cname = ["HPC lon", "HPC lat"]

metaz = NormalizedMetadata.load_template("PZ1", "1")
metaz["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1))
metaz["FILEVRSN"] = "1"
metaz["POLAROFF"] = 1.0
metaz["POLARREF"] = "Solar"

metam = NormalizedMetadata.load_template("PM1", "1")
metam["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1))
metam["FILEVRSN"] = "1"
metam["POLAROFF"] = 1.0
metam["POLARREF"] = "Solar"

metap = NormalizedMetadata.load_template("PP1", "1")
metap["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1))
metap["FILEVRSN"] = "1"
metap["POLAROFF"] = 1.0
metap["POLARREF"] = "Solar"

input_data_z = NDCube(np.random.random(shape).astype(np.float32), wcs=wcs, meta=metaz)
input_data_m = NDCube(np.random.random(shape).astype(np.float32), wcs=wcs, meta=metam)
input_data_p = NDCube(np.random.random(shape).astype(np.float32), wcs=wcs, meta=metap)

return NDCollection(
[("M", input_data_m),
("Z", input_data_z),
("P", input_data_p)],
aligned_axes="all",
)
return _create_sample_ndcollection



def test_starfield(sample_ndcube: NDCube) -> None:
"""Test starfield generation."""
input_data = sample_ndcube((2048,2048))
input_data = sample_ndcube((2048, 2048))

output_data = add_starfield_clear(input_data)

assert isinstance(output_data, NDCube)


def test_polarized_starfield(sample_ndcollection: NDCollection) -> None:
"""Test polarized starfield generation."""
shape = (128, 128)
input_data = sample_ndcollection(shape)

output_data = add_starfield_polarized(input_data)

assert isinstance(output_data, NDCollection)

0 comments on commit 598392a

Please sign in to comment.