diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f7bd8aa..c68c76b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ exclude: "\\.asdf$" repos: # This should be before any formatting hooks like isort - repo: https://github.com/astral-sh/ruff-pre-commit - rev: "v0.7.4" + rev: "v0.8.0" hooks: - id: ruff args: ["--fix"] diff --git a/simpunch/level1.py b/simpunch/level1.py index 611f83f..3380ac1 100644 --- a/simpunch/level1.py +++ b/simpunch/level1.py @@ -21,11 +21,12 @@ from sunpy.coordinates import sun from tqdm import tqdm -from simpunch.level2 import add_starfield_clear +from simpunch.level2 import add_starfield_clear, add_starfield_polarized from simpunch.util import update_spacecraft_location CURRENT_DIR = os.path.dirname(__file__) + def generate_spacecraft_wcs(spacecraft_id: str, rotation_stage: int, time: astropy.time.Time) -> WCS: """Generate the spacecraft world coordinate system.""" angle_step = 30 @@ -46,7 +47,7 @@ def generate_spacecraft_wcs(spacecraft_id: str, rotation_stage: int, time: astro out_wcs.wcs.cdelt = 88 / 3600 * 0.9, 88 / 3600 * 0.9 out_wcs.wcs.pc = calculate_pc_matrix(angle_wfi, out_wcs.wcs.cdelt) - out_wcs.wcs.set_pv([(2, 1, (-sun.earth_distance(time)/sun.constants.radius).decompose().value)]) + out_wcs.wcs.set_pv([(2, 1, (-sun.earth_distance(time) / sun.constants.radius).decompose().value)]) out_wcs.wcs.ctype = "HPLN-AZP", "HPLT-AZP" out_wcs.wcs.cunit = "deg", "deg" @@ -133,16 +134,16 @@ def deproject_clear(input_data: NDCube, output_wcs: WCS, adaptive_reprojection: if adaptive_reprojection: reprojected_data[:, :] = reproject.reproject_adaptive((input_data.data[:, :], - reconstructed_wcs), - output_wcs, - (2048, 2048), - roundtrip_coords=False, return_footprint=False, - kernel="Gaussian", boundary_mode="ignore") + reconstructed_wcs), + output_wcs, + (2048, 2048), + roundtrip_coords=False, return_footprint=False, + kernel="Gaussian", boundary_mode="ignore") else: reprojected_data[:, :] = reproject.reproject_interp((input_data.data[:, :], - reconstructed_wcs), - output_wcs, (2048, 2048), - roundtrip_coords=False, return_footprint=False) + reconstructed_wcs), + output_wcs, (2048, 2048), + roundtrip_coords=False, return_footprint=False) reprojected_data[np.isnan(reprojected_data)] = 0 @@ -159,8 +160,8 @@ def remix_polarization(input_data: NDCube) -> NDCube: # Unpack data into a NDCollection object w = WCS(naxis=2) data_collection = NDCollection([("M", NDCube(data=input_data.data[0], wcs=w, meta={})), - ("Z", NDCube(data=input_data.data[1], wcs=w, meta={})), - ("P", NDCube(data=input_data.data[2], wcs=w, meta={}))]) + ("Z", NDCube(data=input_data.data[1], wcs=w, meta={})), + ("P", NDCube(data=input_data.data[2], wcs=w, meta={}))]) data_collection["M"].meta["POLAR"] = -60. * u.degree data_collection["Z"].meta["POLAR"] = 0. * u.degree @@ -168,7 +169,7 @@ def remix_polarization(input_data: NDCube) -> NDCube: # TODO - Remember that this needs to be the instrument frame MZP, not the mosaic frame resolved_data_collection = solpolpy.resolve(data_collection, "npol", - out_angles=[-60, 0, 60]*u.deg, imax_effect=False) + out_angles=[-60, 0, 60] * u.deg, imax_effect=False) # Repack data data_list = [] @@ -281,13 +282,21 @@ def generate_l1_pmzp(input_file: str, path_output: str, rotation_stage: int, spa output_pdata.meta["POLAR"] = 60 # Add distortion - # output_mdata = add_distortion(output_mdata) # noqa: ERA001 - # output_zdata = add_distortion(output_zdata) # noqa: ERA001 - # output_pdata = add_distortion(output_pdata) # noqa: ERA001 - - output_mdata = add_starfield_clear(output_mdata) - output_zdata = add_starfield_clear(output_zdata) - output_pdata = add_starfield_clear(output_pdata) + output_mdata = add_distortion(output_mdata) + output_zdata = add_distortion(output_zdata) + output_pdata = add_distortion(output_pdata) + + # Add polarized starfield + output_collection = NDCollection( + [("-60.0 deg", output_mdata), + ("0.0 deg", output_zdata), + ("60.0 deg", output_pdata)], + aligned_axes="all") + + output_mzp = add_starfield_polarized(output_collection) + output_mdata = output_mzp["M"] + output_zdata = output_mzp["Z"] + output_pdata = output_mzp["P"] output_pdata = update_spacecraft_location(output_pdata, output_pdata.meta.astropy_time) output_mdata = update_spacecraft_location(output_mdata, output_mdata.meta.astropy_time) diff --git a/simpunch/level2.py b/simpunch/level2.py index f11021e..a64efa9 100644 --- a/simpunch/level2.py +++ b/simpunch/level2.py @@ -2,12 +2,15 @@ PTM - PUNCH Level-2 Polarized (MZP) Mosaic """ +import copy import glob import os +from math import floor import astropy.time import astropy.units as u import numpy as np +import reproject import solpolpy from astropy.modeling.models import Gaussian2D from astropy.table import QTable @@ -19,7 +22,7 @@ from prefect_dask import DaskTaskRunner from punchbowl.data import (NormalizedMetadata, get_base_file_name, load_ndcube_from_fits, write_ndcube_to_fits) -from punchbowl.data.wcs import calculate_celestial_wcs_from_helio +from punchbowl.data.wcs import calculate_celestial_wcs_from_helio, get_p_angle from simpunch.stars import (filter_for_visible_stars, find_catalog_in_image, load_raw_hipparcos_catalog) @@ -38,10 +41,10 @@ def get_fcorona_parameters(date_obs: astropy.time.Time) -> dict[str, float]: def generate_fcorona(shape: (int, int), - tilt_angle: float = 3*u.deg, + tilt_angle: float = 3 * u.deg, a: float = 600., b: float = 300., - tilt_offset: tuple[float] = (0,0)) -> np.ndarray: + tilt_offset: tuple[float] = (0, 0)) -> np.ndarray: """Generate an F corona model.""" fcorona = np.zeros(shape) @@ -78,7 +81,7 @@ def generate_fcorona(shape: (int, int), for i in np.arange(fcorona.shape[0]): fcorona[i, :, :] = fcorona_profile[:, :] else: - fcorona[:,:] = fcorona_profile[:,:] + fcorona[:, :] = fcorona_profile[:, :] return fcorona @@ -135,22 +138,98 @@ def generate_starfield(wcs: WCS, return fake_image, sources -def add_starfield_polarized(input_data: NDCube) -> NDCube: - """Add synthetic starfield.""" +def generate_dummy_polarization(map_scale: float = 0.225, + pol_factor: float = 0.5) -> NDCube: + """Create a synthetic polarization map.""" + shape = [int(floor(180 / map_scale)), int(floor(360 / map_scale))] + xcoord = np.linspace(-pol_factor, pol_factor, shape[1]) + ycoord = np.linspace(-pol_factor, pol_factor, shape[0]) + xin, yin = np.meshgrid(xcoord, ycoord) + zin = pol_factor - (xin ** 2 + yin ** 2) + + wcs_sky = WCS(naxis=2) + wcs_sky.wcs.crpix = [shape[1] / 2 + .5, shape[0] / 2 + .5] + wcs_sky.wcs.cdelt = np.array([map_scale, map_scale]) + wcs_sky.wcs.crval = [180.0, 0.0] + wcs_sky.wcs.ctype = ["RA---CAR", "DEC--CAR"] + wcs_sky.wcs.cunit = "deg", "deg" + + return NDCube(data=zin, wcs=wcs_sky) + + +def add_starfield_polarized(input_collection: NDCollection, polfactor: tuple = (0.2, 0.3, 0.5)) -> NDCollection: + """Add synthetic polarized starfield.""" + input_data = input_collection["Z"] wcs_stellar_input = calculate_celestial_wcs_from_helio(input_data.wcs, input_data.meta.astropy_time, input_data.data.shape) - starfield, stars = generate_starfield(wcs_stellar_input, input_data.data[0, :, :].shape, flux_set=2.0384547E-9, - fwhm=3, dimmest_magnitude=12, noise_mean=None, noise_std=None) + starfield, stars = generate_starfield(wcs_stellar_input, input_data.data.shape, + flux_set=2.0384547E-9, fwhm=3, dimmest_magnitude=12, + noise_mean=None, noise_std=None) starfield_data = np.zeros(input_data.data.shape) - for i in range(starfield_data.shape[0]): - starfield_data[i, :, :] = starfield * (np.logical_not(np.isclose(input_data.data[i, :, :], 0, atol=1E-18))) + starfield_data[:, :] = starfield * (np.logical_not(np.isclose(input_data.data, 0, atol=1E-18))) - input_data.data[...] = input_data.data[...] + starfield_data + # Converting the input data polarization to celestial basis + mzp_angles = ([input_cube.meta["POLAR"].value for label, input_cube in input_collection.items() if + label != "alpha"]) * u.degree + cel_north_off = get_p_angle(time=input_collection["Z"].meta["DATE-OBS"].value) + new_angles = (mzp_angles + cel_north_off).value * u.degree + # check - or + ? confirm! - return input_data + valid_keys = [key for key in input_collection if key != "alpha"] + + meta_a = dict(NormalizedMetadata.to_fits_header(input_collection[valid_keys[0]].meta, + wcs=input_collection[valid_keys[0]].wcs)) + meta_b = dict(NormalizedMetadata.to_fits_header(input_collection[valid_keys[1]].meta, + wcs=input_collection[valid_keys[1]].wcs)) + meta_c = dict(NormalizedMetadata.to_fits_header(input_collection[valid_keys[2]].meta, + wcs=input_collection[valid_keys[2]].wcs)) + + meta_a["POLAR"] = meta_a["POLAR"] * u.degree + meta_b["POLAR"] = meta_b["POLAR"] * u.degree + meta_c["POLAR"] = meta_c["POLAR"] * u.degree + + data_collection = NDCollection( + [(str(valid_keys[0]), NDCube(data=input_collection[valid_keys[0]].data, + meta=meta_a, wcs=input_collection[valid_keys[0]].wcs)), + (str(valid_keys[1]), NDCube(data=input_collection[valid_keys[1]].data, + meta=meta_b, wcs=input_collection[valid_keys[1]].wcs)), + (str(valid_keys[2]), NDCube(data=input_collection[valid_keys[2]].data, + meta=meta_c, wcs=input_collection[valid_keys[2]].wcs))], + aligned_axes="all") + + input_data_cel = solpolpy.resolve(data_collection, "npol", reference_angle=0 * u.degree, out_angles=new_angles) + valid_keys = [key for key in input_data_cel if key != "alpha"] + + for k, key in enumerate(valid_keys): + dummy_polarmap = generate_dummy_polarization(pol_factor=polfactor[k]) + # Extract ROI corresponding to input wcs + polar_roi = reproject.reproject_adaptive( + (dummy_polarmap.data, dummy_polarmap.wcs), wcs_stellar_input, input_data.data.shape, + roundtrip_coords=False, return_footprint=False, x_cyclic=True, + conserve_flux=True, center_jacobian=True, despike_jacobian=True) + input_data_cel[key].data[...] = input_data_cel[key].data + polar_roi * starfield_data + + mzp_data_instru = solpolpy.resolve(input_data_cel, "mzpinstru", reference_angle=0 * u.degree) # Instrument MZP + ### WCS is in celestial here? + valid_keys = [key for key in mzp_data_instru if key != "alpha"] + out_meta = {"M": copy.deepcopy(input_collection["M"].meta), + "Z": copy.deepcopy(input_collection["Z"].meta), + "P": copy.deepcopy(input_collection["P"].meta)} + for out_pol, meta_item in out_meta.items(): + for key, kind in zip(["POLAR", "POLARREF", "POLAROFF"], [int, str, float], strict=False): + if isinstance(mzp_data_instru[out_pol].meta[key], u.Quantity): + meta_item[key] = kind(mzp_data_instru[out_pol].meta[key].value) + else: + meta_item[key] = kind(mzp_data_instru[out_pol].meta[key]) + + return NDCollection( + [(str(key), NDCube(data=mzp_data_instru[key].data, + meta=out_meta[key], + wcs=mzp_data_instru[key].wcs)) for key in valid_keys], + aligned_axes="all") def add_starfield_clear(input_data: NDCube) -> NDCube: @@ -178,7 +257,7 @@ def remix_polarization(input_data: NDCube) -> NDCube: ("pB", NDCube(data=input_data.data[1], wcs=input_data.wcs))], aligned_axes="all") - resolved_data_collection = solpolpy.resolve(data_collection, "MZP", imax_effect=False) + resolved_data_collection = solpolpy.resolve(data_collection, "mzpsolar", imax_effect=False) # Repack data data_list = [] @@ -205,6 +284,7 @@ def remix_polarization(input_data: NDCube) -> NDCube: return NDCube(data=new_data, wcs=new_wcs, uncertainty=new_uncertainty, meta=input_data.meta) + @task def generate_l2_ptm(input_file: str, path_output: str) -> None: """Generate level 2 PTM synthetic data.""" @@ -221,7 +301,7 @@ def generate_l2_ptm(input_file: str, path_output: str) -> None: # Synchronize overlapping metadata keys output_header = output_meta.to_fits_header(output_wcs) for key in output_header: - if (key in input_pdata.meta) and output_header[key] == "" and key not in ("COMMENT", "HISTORY"): + if (key in input_pdata.meta) and output_header[key] == "" and key not in ("COMMENT", "HISTORY"): output_meta[key].value = input_pdata.meta[key].value output_meta["DESCRPTN"] = "Simulated " + output_meta["DESCRPTN"].value output_meta["TITLE"] = "Simulated " + output_meta["TITLE"].value diff --git a/simpunch/tests/test_starfield.py b/simpunch/tests/test_starfield.py index c20e39e..efb3dfd 100644 --- a/simpunch/tests/test_starfield.py +++ b/simpunch/tests/test_starfield.py @@ -4,19 +4,19 @@ import pytest from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS -from ndcube import NDCube +from ndcube import NDCollection, NDCube from punchbowl.data import NormalizedMetadata -from simpunch.level2 import add_starfield_clear +from simpunch.level2 import add_starfield_clear, add_starfield_polarized @pytest.fixture def sample_ndcube() -> NDCube: - def _sample_ndcube(shape: tuple, code:str = "PM1", level:str = "0") -> NDCube: + def _sample_ndcube(shape: tuple, code: str = "PM1", level: str = "0") -> NDCube: data = np.random.random(shape).astype(np.float32) sqrt_abs_data = np.sqrt(np.abs(data)) uncertainty = StdDevUncertainty(np.interp(sqrt_abs_data, (sqrt_abs_data.min(), sqrt_abs_data.max()), - (0,1)).astype(np.float32)) + (0, 1)).astype(np.float32)) wcs = WCS(naxis=2) wcs.wcs.ctype = "HPLN-ARC", "HPLT-ARC" wcs.wcs.cunit = "deg", "deg" @@ -29,13 +29,67 @@ def _sample_ndcube(shape: tuple, code:str = "PM1", level:str = "0") -> NDCube: meta["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1)) meta["FILEVRSN"] = "1" return NDCube(data=data, uncertainty=uncertainty, wcs=wcs, meta=meta) + return _sample_ndcube +@pytest.fixture +def sample_ndcollection() -> callable: + def _create_sample_ndcollection(shape: tuple) -> NDCollection: + wcs = WCS(naxis=2) + wcs.wcs.ctype = ["HPLN-ARC", "HPLT-ARC"] + wcs.wcs.cunit = ["deg", "deg"] + wcs.wcs.cdelt = [0.1, 0.1] + wcs.wcs.crpix = [shape[1] // 2, shape[0] // 2] + wcs.wcs.crval = [1, 1] + wcs.wcs.cname = ["HPC lon", "HPC lat"] + + metaz = NormalizedMetadata.load_template("PZ1", "1") + metaz["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1)) + metaz["FILEVRSN"] = "1" + metaz["POLAROFF"] = 1.0 + metaz["POLARREF"] = "Solar" + + metam = NormalizedMetadata.load_template("PM1", "1") + metam["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1)) + metam["FILEVRSN"] = "1" + metam["POLAROFF"] = 1.0 + metam["POLARREF"] = "Solar" + + metap = NormalizedMetadata.load_template("PP1", "1") + metap["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1)) + metap["FILEVRSN"] = "1" + metap["POLAROFF"] = 1.0 + metap["POLARREF"] = "Solar" + + input_data_z = NDCube(np.random.random(shape).astype(np.float32), wcs=wcs, meta=metaz) + input_data_m = NDCube(np.random.random(shape).astype(np.float32), wcs=wcs, meta=metam) + input_data_p = NDCube(np.random.random(shape).astype(np.float32), wcs=wcs, meta=metap) + + return NDCollection( + [("M", input_data_m), + ("Z", input_data_z), + ("P", input_data_p)], + aligned_axes="all", + ) + return _create_sample_ndcollection + + + def test_starfield(sample_ndcube: NDCube) -> None: """Test starfield generation.""" - input_data = sample_ndcube((2048,2048)) + input_data = sample_ndcube((2048, 2048)) output_data = add_starfield_clear(input_data) assert isinstance(output_data, NDCube) + + +def test_polarized_starfield(sample_ndcollection: NDCollection) -> None: + """Test polarized starfield generation.""" + shape = (128, 128) + input_data = sample_ndcollection(shape) + + output_data = add_starfield_polarized(input_data) + + assert isinstance(output_data, NDCollection)