From 3a98c76cc1d4385d5fc2170f4d8fc4b1cdea5731 Mon Sep 17 00:00:00 2001 From: Hidde Jense Date: Wed, 1 Feb 2023 18:45:03 +0000 Subject: [PATCH] Allow more flexible CosmoPower network settings. (#88) * Allow user-specified number of networks. * Minor mistake in error string formatting. * Added option for internal ell-factor in network. * Bugfix. * Changed which parameters get passed on to network. * Added Clpp support. * Generalized CMB unit factor. * Added parameter requirements for cobaya standard. * Added general ell factor function and type hinting. * Added ell_factor docstring. * Fixed a bug where cobaya would not resume a run due to default argument overwrite. * Using BoltzmannBase.renames instead of old system. * Added Derived parameter network. * Codestyle. * Added init file change. * Docstring changes in prep for #37 * Updated pytest. * Codestyle. * codestyle fixes * fixed data location for tests * fixed test data path * syntax for py37 * added need to install cosmopower --------- Co-authored-by: Ian Harrison --- setup.cfg | 2 +- soliket/__init__.py | 2 +- soliket/cosmopower.py | 361 +++++++++++++++++++++++++------ soliket/tests/test_cosmopower.py | 78 +++++-- 4 files changed, 361 insertions(+), 82 deletions(-) diff --git a/setup.cfg b/setup.cfg index fa66459f..8bd80d5a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,7 +27,7 @@ install_requires = mflike @ git+https://github.com/simonsobs/lat_mflike@master [options.package_data] -soliket = *.yaml,*.bibtex,clusters/data/*,clusters/data/selFn_equD56/*,lensing/data/*.txt,mflike/*.yaml,tests/*.yaml,data/xcorr_simulated/*.txt +soliket = *.yaml,*.bibtex,clusters/data/*,clusters/data/selFn_equD56/*,lensing/data/*.txt,mflike/*.yaml,tests/*.yaml,data/xcorr_simulated/*.txt,data/CosmoPower/CP_paper/CMB/*.pkl testpaths = "soliket" text_file_format = rst diff --git a/soliket/__init__.py b/soliket/__init__.py index 204d928c..ead5c120 100644 --- a/soliket/__init__.py +++ b/soliket/__init__.py @@ -7,7 +7,7 @@ from .xcorr import XcorrLikelihood # noqa: F401 from .foreground import Foreground from .bandpass import BandPass -from .cosmopower import CosmoPower +from .cosmopower import CosmoPower, CosmoPowerDerived try: from .clusters import ClusterLikelihood # noqa: F401 diff --git a/soliket/cosmopower.py b/soliket/cosmopower.py index 29353360..349dbe56 100755 --- a/soliket/cosmopower.py +++ b/soliket/cosmopower.py @@ -1,3 +1,30 @@ +""" +.. module:: soliket.cosmopower + +:Synopsis: Simple CosmoPower theory wrapper for Cobaya. +:Author: Hidde T. Jense + +.. |br| raw:: html + +
+ +.. note:: + + **If you use this cosmological code, please cite:** + |br| + A. Spurio Mancini et al. + *CosmoPower: emulating cosmological power spectra for accelerated Bayesian + inference from next-generation surveys* + (`arXiv:210603846 `_) + + And remember to cite any sources for trained networks you use. + +Usage +----- + +After installing SOLikeT and cosmopower, you can use the ``CosmoPower`` theory codes +by adding the ``soliket.CosmoPower`` code as a block in your parameter files. +""" import os try: import cosmopower as cp # noqa F401 @@ -7,100 +34,300 @@ HAS_COSMOPOWER = True import numpy as np +from typing import Dict, Iterable, Tuple + +from cobaya.log import LoggedError +from cobaya.theory import Theory from cobaya.theories.cosmo import BoltzmannBase from cobaya.typing import InfoDict -""" - Simple CosmoPower theory wrapper for Cobaya. - author: Hidde T. Jense -""" - class CosmoPower(BoltzmannBase): - soliket_data_path: str = "soliket/data/CosmoPower" - network_path: str = "CP_paper/CMB" - cmb_tt_nn_filename: str = "cmb_TT_NN" - cmb_te_pcaplusnn_filename: str = "cmb_TE_PCAplusNN" - cmb_ee_nn_filename: str = "cmb_EE_NN" - - extra_args: InfoDict = {} - - renames: dict = { - "omega_b": ["ombh2", "omegabh2"], - "omega_cdm": ["omch2", "omegach2"], - "ln10^{10}A_s": ["logA"], - "n_s": ["ns"], - "h": [], - "tau_reio": ["tau"], - } - - def initialize(self): + """A CosmoPower Network wrapper for Cobaya.""" + + stop_at_error: bool = False + + network_path: str = "soliket/data/CosmoPower/CP_paper/CMB" + network_settings: InfoDict = None + + extra_args: InfoDict = None + + def initialize(self) -> None: super().initialize() - base_path = os.path.join(self.soliket_data_path, self.network_path) - - self.cp_tt_nn = cp.cosmopower_NN( - restore=True, - restore_filename=os.path.join(base_path, self.cmb_tt_nn_filename), - ) - self.cp_te_nn = cp.cosmopower_PCAplusNN( - restore=True, - restore_filename=os.path.join(base_path, self.cmb_te_pcaplusnn_filename), - ) - self.cp_ee_nn = cp.cosmopower_NN( - restore=True, - restore_filename=os.path.join(base_path, self.cmb_ee_nn_filename), - ) + if self.network_settings is None: + raise LoggedError("No network settings were provided.") + + self.networks = {} + self.all_parameters = set([]) + + for spectype in self.network_settings: + netdata = {} + nettype = self.network_settings[spectype] + netpath = os.path.join(self.network_path, nettype["filename"]) + + if nettype["type"] == "NN": + network = cp.cosmopower_NN( + restore=True, restore_filename=netpath) + elif nettype["type"] == "PCAplusNN": + network = cp.cosmopower_PCAplusNN( + restore=True, restore_filename=netpath) + elif self.stop_at_error: + raise ValueError( + f"Unknown network type {nettype['type']} for network {spectype}.") + else: + self.log.warn( + f"Unknown network type {nettype['type']}\ + for network {spectype}: skipped!") + + netdata["type"] = nettype["type"] + netdata["log"] = nettype.get("log", True) + netdata["network"] = network + netdata["parameters"] = list(network.parameters) + netdata["lmax"] = network.modes.max() + netdata["has_ell_factor"] = nettype.get("has_ell_factor", False) + + self.all_parameters = self.all_parameters | set(network.parameters) + + if network is not None: + self.networks[spectype.lower()] = netdata if "lmax" not in self.extra_args: self.extra_args["lmax"] = None self.log.info(f"Loaded CosmoPower from directory {self.network_path}") + self.log.info( + f"CosmoPower will expect the parameters {self.all_parameters}") - def calculate(self, state, want_derived=True, **params): - cmb_params = {} + def calculate(self, state: dict, want_derived: bool = True, **params) -> bool: + ## sadly, this syntax not valid until python 3.9 + # cmb_params = { + # p: [params[p]] for p in params + # } | { + # self.translate_param(p): [params[p]] for p in params + # } + cmb_params = {**{ + p: [params[p]] for p in params + }, **{ + self.translate_param(p): [params[p]] for p in params + }} - for par in self.renames: - if par in params: - cmb_params[par] = [params[par]] + ells = None + + for spectype in self.networks: + network = self.networks[spectype] + used_params = {par: (cmb_params[par] if par in cmb_params else [ + params[par]]) for par in network["parameters"]} + + if network["log"]: + data = network["network"].ten_to_predictions_np(used_params)[ + 0, :] else: - for r in self.renames[par]: - if r in params: - cmb_params[par] = [params[r]] - break + data = network["network"].predictions_np(used_params)[0, :] + + state[spectype] = data + + if ells is None: + ells = network["network"].modes - state["tt"] = self.cp_tt_nn.ten_to_predictions_np(cmb_params)[0, :] - state["te"] = self.cp_te_nn.predictions_np(cmb_params)[0, :] - state["ee"] = self.cp_ee_nn.ten_to_predictions_np(cmb_params)[0, :] - state["ell"] = self.cp_tt_nn.modes + state["ell"] = ells.astype(int) - def get_Cl(self, ell_factor=False, units="FIRASmuK2"): + return True + + def get_Cl(self, ell_factor: bool = False, units: str = "FIRASmuK2") -> dict: cls_old = self.current_state.copy() - lmax = self.extra_args["lmax"] or (cls_old["tt"].shape[0] + 2) + lmax = self.extra_args["lmax"] or cls_old["ell"].max() - cls = {"ell": np.arange(lmax).astype(int)} + cls = {"ell": np.arange(lmax + 1).astype(int)} ls = cls_old["ell"] - for k in ["tt", "te", "ee"]: - # cls[k] = np.zeros(cls["ell"].shape, dtype=float) - cls[k] = np.empty(cls["ell"].shape, dtype=float) - cls[k][:] = np.nan + for k in self.networks: + cls[k] = np.tile(np.nan, cls["ell"].shape) - cmb_fac = self._cmb_unit_factor(units, 2.7255) + for k in self.networks: + prefac = np.ones_like(ls).astype(float) - if ell_factor: - ls_fac = ls * (ls + 1.0) / (2.0 * np.pi) - else: - ls_fac = 1.0 + if self.networks[k]["has_ell_factor"]: + prefac /= self.ell_factor(ls, k) + if ell_factor: + prefac *= self.ell_factor(ls, k) - for k in ["tt", "te", "ee"]: - cls[k][ls] = cls_old[k] * ls_fac * cmb_fac ** 2.0 + cls[k][ls] = cls_old[k] * prefac * \ + self.cmb_unit_factor(k, units, 2.7255) + cls[k][:2] = 0.0 if np.any(np.isnan(cls[k])): - self.log.warning("CosmoPower used outside of trained "\ + self.log.warning("CosmoPower used outside of trained " "{} ell range. Filled in with NaNs.".format(k)) return cls - def get_can_support_params(self): - return ["omega_b", "omega_cdm", "h", "logA", "ns", "tau_reio"] + def ell_factor(self, ls: np.ndarray, spectra: str) -> np.ndarray: + """ + Calculate the ell factor for a specific spectrum. + These prefactors are used to convert from Cell to Dell and vice-versa. + + See also: + cobaya.BoltzmannBase.get_Cl + `camb.CAMBresults.get_cmb_power_spectra `_ # noqa E501 + + Example: + ell_factor(l, "tt") -> l(l+1)/(2 pi). + ell_factor(l, "pp") -> l^2(l+1)^2/(2 pi). + + :param ls: the range of ells. + :param spectra: a two-character string with each character being one of [tebp]. + + :return: an array filled with ell factors for the given spectrum. + """ + ellfac = np.ones_like(ls).astype(float) + + if spectra in ["tt", "te", "tb", "ee", "et", "eb", "bb", "bt", "be"]: + ellfac = ls * (ls + 1.0) / (2.0 * np.pi) + elif spectra in ["pt", "pe", "pb", "tp", "ep", "bp"]: + ellfac = (ls * (ls + 1.0)) ** (3. / 2.) / (2.0 * np.pi) + elif spectra in ["pp"]: + ellfac = (ls * (ls + 1.0)) ** 2.0 / (2.0 * np.pi) + + return ellfac + + def cmb_unit_factor(self, spectra: str, + units: str = "FIRASmuK2", + Tcmb: float = 2.7255) -> float: + """ + Calculate the CMB prefactor for going from dimensionless power spectra to + CMB units. + + :param spectra: a length 2 string specifying the spectrum for which to + calculate the units. + :param units: a string specifying which units to use. + :param Tcmb: the used CMB temperature [units of K]. + :return: The CMB unit conversion factor. + """ + res = 1.0 + x, y = spectra.lower() + + if x == "t" or x == "e" or x == "b": + res *= self._cmb_unit_factor(units, Tcmb) + elif x == "p": + res *= 1. / np.sqrt(2.0 * np.pi) + + if y == "t" or y == "e" or y == "b": + res *= self._cmb_unit_factor(units, Tcmb) + elif y == "p": + res *= 1. / np.sqrt(2.0 * np.pi) + + return res + + def get_can_support_parameters(self) -> Iterable[str]: + return self.all_parameters + + def get_requirements(self) -> Iterable[Tuple[str, str]]: + requirements = [] + for k in self.all_parameters: + if k in self.renames.values(): + for v in self.renames: + if self.renames[v] == k: + requirements.append((v, None)) + break + else: + requirements.append((k, None)) + + return requirements + + +class CosmoPowerDerived(Theory): + """A theory class that can calculate derived parameters from CosmoPower networks.""" + stop_at_error: bool = False + + network_path: str = "soliket/data/CosmoPower/CP_paper/CMB" + network_settings: InfoDict = None + + derived_parameters: Iterable[str] + derived_values: Dict + + extra_args: InfoDict = None + + renames: Dict = {} + + def initialize(self) -> None: + super().initialize() + + if self.network_settings is None: + raise LoggedError("No network settings were provided.") + + netpath = os.path.join(self.network_path, self.network_settings["filename"]) + + if self.network_settings["type"] == "NN": + self.network = cp.cosmopower_NN( + restore=True, restore_filename=netpath) + elif self.network_settings["type"] == "PCAplusNN": + self.network = cp.cosmopower_PCAplusNN( + restore=True, restore_filename=netpath) + else: + raise LoggedError( + f"Unknown network type {self.network_settings['type']}.") + + self.input_parameters = set(self.network.parameters) + + self.log_data = self.network_settings.get("log", False) + + self.log.info( + f"Loaded CosmoPowerDerived from directory {self.network_path}") + self.log.info( + f"CosmoPowerDerived will expect the parameters {self.input_parameters}") + self.log.info( + f"CosmoPowerDerived can provide the following parameters: \ + {self.get_can_provide()}.") + + def translate_param(self, p): + return self.renames.get(p, p) + + def calculate(self, state: dict, want_derived: bool = True, **params) -> bool: + ## sadly, this syntax not valid until python 3.9 + # input_params = { + # p: [params[p]] for p in params + # } | { + # self.translate_param(p): [params[p]] for p in params + # } + input_params = {**{ + p: [params[p]] for p in params + }, **{ + self.translate_param(p): [params[p]] for p in params + }} + + if self.log_data: + data = self.network.ten_to_predictions_np(input_params)[0, :] + else: + data = self.network.predictions_np(input_params)[0, :] + + for k, v in zip(self.derived_parameters, data): + if len(k) == 0 or k == "_": + continue + + state["derived"][k] = v + + return True + + def get_param(self, p) -> float: + return self.current_state["derived"][self.translate_param(p)] + + def get_can_support_parameters(self) -> Iterable[str]: + return self.input_parameters + + def get_requirements(self) -> Iterable[Tuple[str, str]]: + requirements = [] + for k in self.input_parameters: + if k in self.renames.values(): + for v in self.renames: + if self.renames[v] == k: + requirements.append((v, None)) + break + else: + requirements.append((k, None)) + + return requirements + + def get_can_provide(self) -> Iterable[str]: + return set([par for par in self.derived_parameters + if (len(par) > 0 and not par == "_")]) diff --git a/soliket/tests/test_cosmopower.py b/soliket/tests/test_cosmopower.py index 1ef05efa..55e90a6d 100644 --- a/soliket/tests/test_cosmopower.py +++ b/soliket/tests/test_cosmopower.py @@ -30,11 +30,37 @@ }, "theory": { "soliket.CosmoPower": { - # "soliket_data_path": os.path.normpath( - # os.path.join(os.getcwd(), "../data/CosmoPower") - # ), - "soliket_data_path": "soliket/data/CosmoPower", "stop_at_error": True, + "network_settings": { + "tt": { + "type": "NN", + "log": True, + "filename": "cmb_TT_NN", + # If your network has been trained on (l (l+1) / 2 pi) C_l, + # this flag needs to be set. + "has_ell_factor": False, + }, + "ee": { + "type": "NN", + "log": True, + "filename": "cmb_EE_NN", + "has_ell_factor": False, + }, + "te": { + "type": "PCAplusNN", + # Trained on Cl, not log(Cl) + "log": False, + "filename": "cmb_TE_PCAplusNN", + "has_ell_factor": False, + }, + }, + "renames": { + "ombh2": "omega_b", + "omch2": "omega_cdm", + "ns": "n_s", + "logA": "ln10^{10}A_s", + "tau": "tau_reio" + } } }, } @@ -42,15 +68,16 @@ @pytest.mark.skipif(not HAS_COSMOPOWER, reason='test requires cosmopower') def test_cosmopower_theory(request): - info_dict['theory']['soliket.CosmoPower']['soliket_data_path'] = \ - os.path.join(request.config.rootdir, 'soliket/data/CosmoPower') + info_dict['theory']['soliket.CosmoPower']['network_path'] = \ + os.path.join(request.config.rootdir, 'soliket/data/CosmoPower/CP_paper/CMB') + model_fiducial = get_model(info_dict) # noqa F841 @pytest.mark.skipif(not HAS_COSMOPOWER, reason='test requires cosmopower') def test_cosmopower_loglike(request): - info_dict['theory']['soliket.CosmoPower']['soliket_data_path'] = \ - os.path.join(request.config.rootdir, 'soliket/data/CosmoPower') + info_dict['theory']['soliket.CosmoPower']['network_path'] = \ + os.path.join(request.config.rootdir, 'soliket/data/CosmoPower/CP_paper/CMB') model_cp = get_model(info_dict) logL_cp = float(model_cp.loglikes({})[0]) @@ -67,12 +94,37 @@ def test_cosmopower_against_camb(request): camb_cls = model_camb.theory['camb'].get_Cl() info_dict['theory'] = { - "soliket.CosmoPower": { - "soliket_data_path": os.path.join(request.config.rootdir, - 'soliket/data/CosmoPower'), - "stop_at_error": True, - "extra_args": {'lmax': camb_cls['ell'].max() + 1}} + "soliket.CosmoPower": { + "stop_at_error": True, + "extra_args": {'lmax': camb_cls['ell'].max()}, + 'network_path': os.path.join(request.config.rootdir, + 'soliket/data/CosmoPower/CP_paper/CMB'), + "network_settings": { + "tt": { + "type": "NN", + "log": True, + "filename": "cmb_TT_NN" + }, + "ee": { + "type": "NN", + "log": True, + "filename": "cmb_EE_NN" + }, + "te": { + "type": "PCAplusNN", + "log": False, + "filename": "cmb_TE_PCAplusNN" + }, + }, + "renames": { + "ombh2": "omega_b", + "omch2": "omega_cdm", + "ns": "n_s", + "logA": "ln10^{10}A_s", + "tau": "tau_reio" + } } + } model_cp = get_model(info_dict) logL_cp = float(model_cp.loglikes({})[0])