From 88b287ef5da39525192e1e4bcd23daf3e8c7d338 Mon Sep 17 00:00:00 2001 From: Austin Schneider Date: Wed, 5 Jun 2024 10:19:48 -0500 Subject: [PATCH] Sketch loading logic for DarkNews --- .../DarkNewsTables/DarkNewsCrossSection.py | 503 ++++++++++++++++++ .../DarkNewsTables/DarkNewsDecay.py | 320 +++++++++++ .../CrossSections/DarkNewsTables/logger.py | 11 + .../CrossSections/DarkNewsTables/processes.py | 342 ++++++++++++ 4 files changed, 1176 insertions(+) create mode 100644 resources/CrossSections/DarkNewsTables/DarkNewsCrossSection.py create mode 100644 resources/CrossSections/DarkNewsTables/DarkNewsDecay.py create mode 100644 resources/CrossSections/DarkNewsTables/logger.py create mode 100644 resources/CrossSections/DarkNewsTables/processes.py diff --git a/resources/CrossSections/DarkNewsTables/DarkNewsCrossSection.py b/resources/CrossSections/DarkNewsTables/DarkNewsCrossSection.py new file mode 100644 index 00000000..688da76a --- /dev/null +++ b/resources/CrossSections/DarkNewsTables/DarkNewsCrossSection.py @@ -0,0 +1,503 @@ +import os +import numpy as np +import functools +from scipy.interpolate import LinearNDInterpolator, PchipInterpolator + +base_path = os.path.dirname(os.path.abspath(__file__)) +loader_file = os.path.join(base_path, "loader.py") +siren._util.load_module("loader", loader_file) + +# SIREN methods +from siren.interactions import DarkNewsCrossSection +from siren import dataclasses +from siren.dataclasses import Particle + +# DarkNews methods +from DarkNews import phase_space + +# A class representing a single ups_case DarkNews class +# Only handles methods concerning the upscattering part +class PyDarkNewsCrossSection(DarkNewsCrossSection): + def __init__( + self, + ups_case, # DarkNews UpscatteringProcess instance + tolerance=1e-6, # supposed to represent machine epsilon + interp_tolerance=5e-2, # relative interpolation tolerance + always_interpolate=True, # bool whether to always interpolate the total/differential cross section + ): + DarkNewsCrossSection.__init__(self) # C++ constructor + + self.ups_case = ups_case + self.tolerance = tolerance + self.interp_tolerance = interp_tolerance + self.always_interpolate = always_interpolate + + # 2D table in E, sigma + self.total_cross_section_table = np.empty((0, 2), dtype=float) + # 3D table in E, z, dsigma/dQ2 where z = (Q2 - Q2min) / (Q2max - Q2min) + self.differential_cross_section_table = np.empty((0, 3), dtype=float) + + def load_from_table(self, table_dir): + # Make the table directory where will we store cross section tables + table_dir_exists = False + if os.path.exists(table_dir): + # print("Directory '%s' already exists"%table_dir) + table_dir_exists = True + else: + try: + os.makedirs(table_dir, exist_ok=False) + # print("Directory '%s' created successfully" % table_dir) + except OSError as error: + raise RuntimeError("Directory '%s' cannot be created" % table_dir) + + # Look in table dir and check whether total/differential xsec tables exist + if table_dir_exists: + total_xsec_file = os.path.join(table_dir, "total_cross_sections.npy") + if os.path.exists(total_xsec_file): + self.total_cross_section_table = np.load(total_xsec_file) + diff_xsec_file = os.path.join( + table_dir, "differential_cross_sections.npy" + ) + if os.path.exists(diff_xsec_file): + self.differential_cross_section_table = np.load(diff_xsec_file) + + self.configure() + + + # serialization method + def get_representation(self): + return { + "total_cross_section_table": self.total_cross_section_table, + "differential_cross_section_table": self.differential_cross_section_table, + "ups_case": self.ups_case, + "tolerance": self.tolerance, + "interp_tolerance": self.interp_tolerance, + "always_interpolate": self.always_interpolate, + "is_configured": False, + } + + # Configure function to set up member variables + # assumes we have defined the following: + # ups_case, total_cross_section_table, differential_cross_section_table, + # tolerance, interp_tolerance, always_interpolate + # kwargs argument can be used to set any of these + def configure(self, **kwargs): + + for k, v in kwargs.items(): + self.__setattr__(k, v) + + # Define the target particle + # make sure protons are stored as H nuclei + self.target_type = Particle.ParticleType(self.ups_case.nuclear_target.pdgid) + if self.target_type == Particle.ParticleType.PPlus: + self.target_type = Particle.ParticleType.HNucleus + + # Initialize interpolation objects + self.total_cross_section_interpolator = None + self.differential_cross_section_interpolator = None + self._redefine_interpolation_objects(total=True, diff=True) + self.is_configured = True + + # Sorts and redefines scipy interpolation objects + def _redefine_interpolation_objects(self, total=False, diff=False): + if total: + if len(self.total_cross_section_table) <= 1: + return + idxs = np.argsort(self.total_cross_section_table[:, 0]) + self.total_cross_section_table = self.total_cross_section_table[idxs] + self.total_cross_section_interpolator = PchipInterpolator( + self.total_cross_section_table[:, 0], + self.total_cross_section_table[:, 1], + ) + if diff: + if len(self.differential_cross_section_table) <= 1: + return + idxs = np.lexsort( + ( + self.differential_cross_section_table[:, 1], + self.differential_cross_section_table[:, 0], + ) + ) + self.differential_cross_section_table = ( + self.differential_cross_section_table[idxs] + ) + # If we only have two energy points, don't try to construct interpolator + if len(np.unique(self.differential_cross_section_table[:, 0])) <= 2: + return + self.differential_cross_section_interpolator = LinearNDInterpolator( + self.differential_cross_section_table[:, :2], + self.differential_cross_section_table[:, 2], + rescale=True, + ) + + # Check whether we have close-enough entries in the intrepolation tables + def _interpolation_flags(self, inputs, mode): + # + # returns UseSinglePoint,Interpolate,closest_idx + # UseSinglePoint: whether to use a single point in table + # Interpolate: whether to interpolate bewteen different points + # closest_idx: index of closest point in table (for UseSinglePoint) + + # Determine which table we are using + if mode == "total": + interp_table = self.total_cross_section_table + elif mode == "differential": + interp_table = self.differential_cross_section_table + else: + print("Invalid interpolation table mode %s" % mode) + exit(0) + + # first check if we have saved table points already + if len(interp_table) == 0: + return False, False, -1 + + # bools to keep track of whether to use a single point or interpolate + UseSinglePoint = False + Interpolate = True + # order events by the relative difference + rel_diff = np.abs((interp_table[:, :-1] - inputs) / inputs) + rel_diff_length = np.sqrt(np.sum(rel_diff**2, axis=-1)) + closest_idx_abs = np.argmin(rel_diff_length, axis=-1) + # First check whether we have a close-enough single point + if np.all(np.abs(rel_diff[closest_idx_abs]) < self.tolerance): + UseSinglePoint = True + # Ensure we have enough points to interpolate + if len(interp_table) < len(inputs) + 1: + Interpolate = False + # Require that we have at least len(inputs)+1 close points to interpolate + else: + close = np.all(rel_diff < self.interp_tolerance, axis=-1) + if sum(close) < len(inputs) + 1: + Interpolate = False + return UseSinglePoint, Interpolate, closest_idx_abs + + # return entries in interpolation table if we have inputs + def _query_interpolation_table(self, inputs, mode): + # + # returns: + # 0 if we are not close enough to any points in the interpolation table + # otherwise, returns the desired interpolated value + + # First make sure we are configured + self._ensure_configured() + + # Determine which table we are using + if mode == "total": + interp_table = self.total_cross_section_table + interpolator = self.total_cross_section_interpolator + elif mode == "differential": + interp_table = self.differential_cross_section_table + interpolator = self.differential_cross_section_interpolator + else: + print("Invalid interpolation table mode %s" % mode) + exit(0) + + if self.always_interpolate: + # check if energy is within table range + + if interpolator is None or inputs[0] > interp_table[-1, 0]: + print( + "Requested interpolation at %2.2f GeV. Either this is above the table boundary or the interpolator doesn't yet exist. Filling %s table" + % (inputs[0], mode) + ) + n = self.FillInterpolationTables( + total=(mode == "total"), + diff=(mode == "differential"), + Emax=(1 + self.interp_tolerance) * inputs[0], + ) + print("Added %d points" % n) + if mode == "total": + interpolator = self.total_cross_section_interpolator + elif mode == "differential": + interpolator = self.differential_cross_section_interpolator + elif inputs[0] < interp_table[0, 0]: + print( + "Requested interpolation at %2.2f GeV below table boundary. Requring calculation" + % inputs[0] + ) + return 0 + val = max(0, interpolator(inputs)) + if val < 0: + print( + "WARNING: negative interpolated value for %s-%s %s cross section at," + % ( + self.ups_case.nuclear_target.name, + self.ups_case.scattering_regime, + mode, + ), + inputs, + ) + return val + + UseSinglePoint, Interpolate, closest_idx = self._interpolation_flags( + inputs, mode + ) + + if UseSinglePoint: + if closest_idx < 0: + print( + "Trying to use a single table point, but no closest idx found. Exiting..." + ) + exit(0) + return interp_table[closest_idx, -1] + elif Interpolate: + return interpolator(inputs) + else: + return -1 + + def FillTableAtEnergy(self, E, total=True, diff=True, factor=0.8): + num_added_points = 0 + if total: + xsec = self.ups_case.total_xsec(E) + self.total_cross_section_table = np.append( + self.total_cross_section_table, [[E, xsec]], axis=0 + ) + num_added_points += 1 + if diff: + interaction = dataclasses.InteractionRecord() + interaction.signature.primary_type = self.GetPossiblePrimaries()[ + 0 + ] # only one primary + interaction.signature.target_type = self.GetPossibleTargets()[ + 0 + ] # only one target + interaction.target_mass = self.ups_case.MA + interaction.primary_momentum = [E, 0, 0, 0] + zmin, zmax = self.tolerance, 1 + Q2min = self.Q2Min(interaction) + Q2max = self.Q2Max(interaction) + z = zmin + while z < zmax: + Q2 = Q2min + z * (Q2max - Q2min) + dxsec = self.ups_case.diff_xsec_Q2(E, Q2).item() + self.differential_cross_section_table = np.append( + self.differential_cross_section_table, + [[E, z, dxsec]], + axis=0, + ) + num_added_points += 1 + z *= 1 + factor * self.interp_tolerance + self._redefine_interpolation_objects(total=total, diff=diff) + return num_added_points + + # Fills the total and differential cross section tables within interp_tolerance + def FillInterpolationTables(self, total=True, diff=True, factor=0.8, Emax=None): + increment_factor = 0.5 * factor * self.interp_tolerance + Emin = (1.0 + self.tolerance) * self.ups_case.Ethreshold + if Emax is None: + if ( + len(self.total_cross_section_table) + + len(self.differential_cross_section_table) + ) <= 0: + return 0 + Emax = max( + np.max([0] + list(self.total_cross_section_table[:, 0])), + np.max([0] + list(self.differential_cross_section_table[:, 0])), + ) + num_added_points = 0 + E = Emin + E_existing_total = np.unique(self.total_cross_section_table[:, 0]) + E_existing_diff = np.unique(self.differential_cross_section_table[:, 0]) + while E < Emax: + # sample more coarsely past 1.5*threshold + if E > 1.5 * self.ups_case.Ethreshold: + increment_factor = factor * self.interp_tolerance + n = self.FillTableAtEnergy( + E, + total=(total and (E not in E_existing_total)), + diff=(diff and (E not in E_existing_diff)), + factor=factor, + ) + num_added_points += n + E *= 1 + increment_factor + self._redefine_interpolation_objects(total=total, diff=diff) + return num_added_points + + # Saves the tables for the scipy interpolation objects + def SaveInterpolationTables(self, table_dir, total=True, diff=True): + if total: + self._redefine_interpolation_objects(total=True) + with open( + os.path.join(table_dir, "total_cross_sections.npy"), "wb" + ) as f: + np.save(f, self.total_cross_section_table) + if diff: + self._redefine_interpolation_objects(diff=True) + with open( + os.path.join(table_dir, "differential_cross_sections.npy"), "wb" + ) as f: + np.save(f, self.differential_cross_section_table) + + def GetPossiblePrimaries(self): + return [Particle.ParticleType(self.ups_case.nu_projectile.pdgid)] + + def _ensure_configured(self): + if not self.is_configured: + self.configure() + + def GetPossibleTargetsFromPrimary(self, primary_type): + self._ensure_configured() + if Particle.ParticleType(self.ups_case.nu_projectile.pdgid) == primary_type: + return [self.target_type] + return [] + + def GetPossibleTargets(self): + self._ensure_configured() + return [self.target_type] + + def GetPossibleSignatures(self): + self._ensure_configured() + signature = dataclasses.InteractionSignature() + signature.primary_type = Particle.ParticleType( + self.ups_case.nu_projectile.pdgid + ) + signature.target_type = self.target_type + signature.secondary_types = [] + signature.secondary_types.append( + Particle.ParticleType(self.ups_case.nu_upscattered.pdgid) + ) + signature.secondary_types.append(self.target_type) + return [signature] + + def GetPossibleSignaturesFromParents(self, primary_type, target_type): + if ( + Particle.ParticleType(self.ups_case.nu_projectile.pdgid) == primary_type + ) and ((self.target_type == target_type)): + signature = dataclasses.InteractionSignature() + signature.primary_type = Particle.ParticleType( + self.ups_case.nu_projectile.pdgid + ) + signature.target_type = self.target_type + secondary_types = [] + secondary_types.append( + Particle.ParticleType(self.ups_case.nu_upscattered.pdgid) + ) + secondary_types.append( + Particle.ParticleType(self.ups_case.nuclear_target.pdgid) + ) + signature.secondary_types = secondary_types + return [signature] + return [] + + def DifferentialCrossSection(self, arg1, target=None, energy=None, Q2=None): + if type(arg1) == dataclasses.InteractionRecord: + interaction = arg1 + # Calculate Q2 assuming we are in the target rest frame + m1sq = interaction.primary_momentum[0] ** 2 - np.sum( + [p**2 for p in interaction.primary_momentum[1:]] + ) + m3sq = interaction.secondary_momenta[0][0] ** 2 - np.sum( + [p**2 for p in interaction.secondary_momenta[0][1:]] + ) + p1p3 = interaction.primary_momentum[0] * interaction.secondary_momenta[0][ + 0 + ] - np.sum( + p1 * p3 + for p1, p3 in zip( + interaction.primary_momentum[1:], + interaction.secondary_momenta[0][1:], + ) + ) + Q2 = -(m1sq + m3sq - 2 * p1p3) + energy = interaction.primary_momentum[0] + else: + primary = arg1 + interaction = dataclasses.InteractionRecord() + interaction.signature.primary_type = primary + interaction.signature.target_type = target + interaction.primary_momentum = [energy, 0, 0, 0] + interaction.target_mass = self.ups_case.MA + if interaction.signature.primary_type != Particle.ParticleType( + self.ups_case.nu_projectile.pdgid + ): + return 0 + if interaction.primary_momentum[0] < self.InteractionThreshold(interaction): + return 0 + Q2min = self.Q2Min(interaction) + Q2max = self.Q2Max(interaction) + if Q2 < Q2min or Q2 > Q2max: + return 0 + z = (Q2 - Q2min) / (Q2max - Q2min) + + if self.always_interpolate: + # Check if we can interpolate + val = self._query_interpolation_table([energy, z], mode="differential") + if val >= 0: + # we have recovered the differential cross section from the interpolation table + return val + + # If we have reached this block, we must compute the differential cross section using DarkNews + dxsec = self.ups_case.diff_xsec_Q2(energy, Q2).item() + return dxsec + + def TargetMass(self, target_type): + target_mass = self.ups_case.MA + return target_mass + + def SecondaryMasses(self, secondary_types): + secondary_masses = [] + secondary_masses.append(self.ups_case.m_ups) + secondary_masses.append(self.ups_case.MA) + return secondary_masses + + def SecondaryHelicities(self, record): + secondary_helicities = [] + secondary_helicities.append( + self.ups_case.h_upscattered * record.primary_helicity + ) + secondary_helicities.append(record.target_helicity) + self.h_ups = self.ups_case.m_ups + self.h_target = self.ups_case.MA + return secondary_helicities + + def TotalCrossSection(self, arg1, energy=None, target=None): + # Handle overloaded arguments + if type(arg1) == dataclasses.InteractionRecord: + primary = arg1.signature.primary_type + energy = arg1.primary_momentum[0] + target = arg1.signature.target_type + elif energy is not None and target is not None: + primary = arg1 + else: + print("Incorrect function call to TotalCrossSection!") + exit(0) + if int(primary) != self.ups_case.nu_projectile: + return 0 + interaction = dataclasses.InteractionRecord() + interaction.signature.primary_type = primary + interaction.signature.target_type = target + interaction.primary_momentum[0] = energy + if energy < self.InteractionThreshold(interaction): + # print("Python: energy %2.2f < self.InteractionThreshold(interaction) %2.2f"%(energy,self.InteractionThreshold(interaction))) + return 0 + + # Check if we can interpolate + val = self._query_interpolation_table([energy], mode="total") + if val >= 0: + # we have recovered the cross section from the interpolation table + return val + + # If we have reached this block, we must compute the cross section using DarkNews + xsec = self.ups_case.total_xsec(energy) + self.total_cross_section_table = np.append( + self.total_cross_section_table, [[energy, xsec]], axis=0 + ) + self._redefine_interpolation_objects(total=True) + return xsec + + def InteractionThreshold(self, interaction): + return self.ups_case.Ethreshold + + def Q2Min(self, interaction): + return phase_space.upscattering_Q2min( + interaction.primary_momentum[0], + self.ups_case.m_ups, + self.ups_case.MA, + ) + + def Q2Max(self, interaction): + return phase_space.upscattering_Q2max( + interaction.primary_momentum[0], + self.ups_case.m_ups, + self.ups_case.MA, + ) diff --git a/resources/CrossSections/DarkNewsTables/DarkNewsDecay.py b/resources/CrossSections/DarkNewsTables/DarkNewsDecay.py new file mode 100644 index 00000000..5a3b3712 --- /dev/null +++ b/resources/CrossSections/DarkNewsTables/DarkNewsDecay.py @@ -0,0 +1,320 @@ +import os +import numpy as np +import functools + +base_path = os.path.dirname(os.path.abspath(__file__)) +loader_file = os.path.join(base_path, "loader.py") +siren._util.load_module("loader", loader_file) + +# SIREN methods +from siren.interactions import DarkNewsDecay +from siren import dataclasses +from siren.dataclasses import Particle + +# A class representing a single decay_case DarkNews class +# Only handles methods concerning the decay part +class PyDarkNewsDecay(DarkNewsDecay): + def __init__(self, dec_case): + DarkNewsDecay.__init__(self) # C++ constructor + self.dec_case = dec_case + + # Some variables for storing the decay phase space integrator + self.decay_integrator = None + self.decay_norm = None + self.PS_samples = None + self.PS_weights = None + self.PS_weights_CDF = None + self.total_width = None + + def load_from_table(self, table_dir): + if table_dir is None: + print( + "No table_dir specified; will sample from new VEGAS integrator for each decay" + ) + print("WARNING: this will siginficantly slow down event generation") + return + + # Make the table directory where will we store cross section integrators + table_dir_exists = False + if os.path.exists(table_dir): + # print("Directory '%s' already exists"%table_dir) + table_dir_exists = True + else: + try: + os.makedirs(table_dir, exist_ok=False) + print("Directory '%s' created successfully" % table_dir) + except OSError as error: + print("Directory '%s' cannot be created" % table_dir) + exit(0) + + # Try to find the decay integrator + int_file = os.path.join(table_dir, "decay_integrator.pkl") + if os.path.isfile(int_file): + with open(int_file, "rb") as ifile: + self.decay_integrator = pickle.load(ifile) + # Try to find the normalization information + norm_file = os.path.join(table_dir, "decay_norm.json") + if os.path.isfile(norm_file): + with open( + norm_file, + ) as nfile: + self.decay_norm = json.load(nfile) + + + # serialization method + def get_representation(self): + return {"decay_integrator":self.decay_integrator, + "decay_norm":self.decay_norm, + "dec_case":self.dec_case, + "PS_samples":self.PS_samples, + "PS_weights":self.PS_weights, + "PS_weights_CDF":self.PS_weights_CDF, + "total_width":self.total_width, + } + + def SetIntegratorAndNorm(self, decay_norm, decay_integrator): + self.decay_norm = decay_norm + self.decay_integrator = decay_integrator + + def GetPossibleSignatures(self): + signature = dataclasses.InteractionSignature() + signature.primary_type = Particle.ParticleType(self.dec_case.nu_parent.pdgid) + signature.target_type = Particle.ParticleType.Decay + secondary_types = [] + secondary_types.append(Particle.ParticleType(self.dec_case.nu_daughter.pdgid)) + for secondary in self.dec_case.secondaries: + secondary_types.append(Particle.ParticleType(secondary.pdgid)) + signature.secondary_types = secondary_types + return [signature] + + def GetPossibleSignaturesFromParent(self, primary_type): + if Particle.ParticleType(self.dec_case.nu_parent.pdgid) == primary_type: + signature = dataclasses.InteractionSignature() + signature.primary_type = Particle.ParticleType( + self.dec_case.nu_parent.pdgid + ) + signature.target_type = Particle.ParticleType.Decay + secondary_types = [] + secondary_types.append( + Particle.ParticleType(self.dec_case.nu_daughter.pdgid) + ) + for secondary in self.dec_case.secondaries: + secondary_types.append(Particle.ParticleType(secondary.pdgid)) + signature.secondary_types = secondary_types + return [signature] + return [] + + def DifferentialDecayWidth(self, record): + # Momentum variables of HNL necessary for calculating decay phase space + PN = np.array(record.primary_momentum) + + if type(self.dec_case) == FermionSinglePhotonDecay: + gamma_idx = 0 + for secondary in record.signature.secondary_types: + if secondary == dataclasses.Particle.ParticleType.Gamma: + break + gamma_idx += 1 + if gamma_idx >= len(record.signature.secondary_types): + print("No gamma found in the list of secondaries!") + exit(0) + + Pgamma = np.array(record.secondary_momenta[gamma_idx]) + momenta = np.expand_dims(PN, 0), np.expand_dims(Pgamma, 0) + + elif type(self.dec_case) == FermionDileptonDecay: + lepminus_idx = -1 + lepplus_idx = -1 + nu_idx = -1 + for idx, secondary in enumerate(record.signature.secondary_types): + if secondary in [ + dataclasses.Particle.ParticleType.EMinus, + dataclasses.Particle.ParticleType.MuMinus, + dataclasses.Particle.ParticleType.TauMinus, + ]: + lepminus_idx = idx + elif secondary in [ + dataclasses.Particle.ParticleType.EPlus, + dataclasses.Particle.ParticleType.MuPlus, + dataclasses.Particle.ParticleType.TauPlus, + ]: + lepplus_idx = idx + else: + nu_idx = idx + if -1 in [lepminus_idx, lepplus_idx, nu_idx]: + print("Couldn't find two leptons and a neutrino in the final state!") + exit(0) + Pnu = np.array(record.secondary_momenta[nu_idx]) + Plepminus = np.array(record.secondary_momenta[lepminus_idx]) + Plepplus = np.array(record.secondary_momenta[lepplus_idx]) + momenta = ( + np.expand_dims(PN, 0), + np.expand_dims(Plepminus, 0), + np.expand_dims(Plepplus, 0), + np.expand_dims(Pnu, 0), + ) + else: + print("%s is not a valid decay class type!" % type(self.dec_case)) + exit(0) + return self.dec_case.differential_width(momenta) + + def TotalDecayWidth(self, arg1): + if type(arg1) == dataclasses.InteractionRecord: + primary = arg1.signature.primary_type + elif type(arg1) == dataclasses.Particle.ParticleType: + primary = arg1 + else: + print("Incorrect function call to TotalDecayWidth!") + exit(0) + if int(primary) != self.dec_case.nu_parent: + return 0 + if self.total_width is None: + # Need to set the total width + if type(self.dec_case) == FermionDileptonDecay and ( + self.dec_case.vector_off_shell and self.dec_case.scalar_off_shell + ): + # total width calculation requires evaluating an integral + if self.decay_integrator is None or self.decay_norm is None: + # We need to initialize a new VEGAS integrator in DarkNews + self.total_width, dec_norm, dec_integrator = self.dec_case.total_width( + return_norm=True, return_dec=True + ) + self.SetIntegratorAndNorm(dec_norm, dec_integrator) + else: + self.total_width = ( + self.decay_integrator["diff_decay_rate_0"].mean + * self.decay_norm["diff_decay_rate_0"] + ) + else: + self.total_width = self.dec_case.total_width() + return self.total_width + + def TotalDecayWidthForFinalState(self, record): + sig = self.GetPossibleSignatures()[0] + if ( + (record.signature.primary_type != sig.primary_type) + or (record.signature.target_type != sig.target_type) + or (len(record.signature.secondary_types) != len(sig.secondary_types)) + or ( + np.any( + [ + record.signature.secondary_types[i] != sig.secondary_types[i] + for i in range(len(sig.secondary_types)) + ] + ) + ) + ): + return 0 + ret = self.dec_case.total_width() + return ret + + def DensityVariables(self): + if type(self.dec_case) == FermionSinglePhotonDecay: + return "cost" + elif type(self.dec_case) == FermionDileptonDecay: + if self.dec_case.vector_on_shell and self.dec_case.scalar_on_shell: + print("Can't have both the scalar and vector on shell") + exit(0) + elif (self.dec_case.vector_on_shell and self.dec_case.scalar_off_shell) or ( + self.dec_case.vector_off_shell and self.dec_case.scalar_on_shell + ): + return "cost" + elif self.dec_case.vector_off_shell and self.dec_case.scalar_off_shell: + return "t,u,c3,phi34" + else: + print("%s is not a valid decay class type!" % type(self.dec_case)) + exit(0) + return "" + + def GetPSSample(self, random): + # Make the PS weight CDF if that hasn't been done + if self.PS_weights_CDF is None: + self.PS_weights_CDF = np.cumsum(self.PS_weights) + + # Random number to determine + x = random.Uniform(0, self.PS_weights_CDF[-1]) + + # find first instance of a CDF entry greater than x + PSidx = np.argmax(x - self.PS_weights_CDF <= 0) + return self.PS_samples[:, PSidx] + + def SampleRecordFromDarkNews(self, record, random): + # First, make sure we have PS samples and weights + if self.PS_samples is None or self.PS_weights is None: + # We need to generate new PS samples + if self.decay_integrator is None or self.decay_norm is None: + # We need to initialize a new VEGAS integrator in DarkNews + (self.PS_samples, PS_weights_dict), dec_norm, dec_integrator = self.dec_case.SamplePS( + return_norm=True, return_dec=True + ) + self.PS_weights = PS_weights_dict["diff_decay_rate_0"] + self.SetIntegratorAndNorm(dec_norm, dec_integrator) + else: + # We already have an integrator, we just need new PS samples + self.PS_samples, PS_weights_dict = self.dec_case.SamplePS( + existing_integrator=self.decay_integrator + ) + self.PS_weights = PS_weights_dict["diff_decay_rate_0"] + + # Now we must sample an PS point on the hypercube + PS = self.GetPSSample(random) + + # Find the four-momenta associated with this point + # Expand dims required to call DarkNews function on signle sample + four_momenta = get_decay_momenta_from_vegas_samples( + np.expand_dims(PS, 0), + self.dec_case, + np.expand_dims(np.array(record.primary_momentum), 0), + ) + + secondaries = record.GetSecondaryParticleRecords() + + if type(self.dec_case) == FermionSinglePhotonDecay: + gamma_idx = 0 + for secondary in record.signature.secondary_types: + if secondary == dataclasses.Particle.ParticleType.Gamma: + break + gamma_idx += 1 + if gamma_idx >= len(record.signature.secondary_types): + print("No gamma found in the list of secondaries!") + exit(0) + nu_idx = 1 - gamma_idx + secondaries[gamma_idx].four_momentum = np.squeeze(four_momenta["P_decay_photon"]) + secondaries[gamma_idx].mass = 0 + secondaries[nu_idx].four_momentum = np.squeeze(four_momenta["P_decay_N_daughter"]) + secondaries[nu_idx].mass = 0 + + elif type(self.dec_case) == FermionDileptonDecay: + lepminus_idx = -1 + lepplus_idx = -1 + nu_idx = -1 + for idx, secondary in enumerate(record.signature.secondary_types): + if secondary in [ + dataclasses.Particle.ParticleType.EMinus, + dataclasses.Particle.ParticleType.MuMinus, + dataclasses.Particle.ParticleType.TauMinus, + ]: + lepminus_idx = idx + elif secondary in [ + dataclasses.Particle.ParticleType.EPlus, + dataclasses.Particle.ParticleType.MuPlus, + dataclasses.Particle.ParticleType.TauPlus, + ]: + lepplus_idx = idx + else: + nu_idx = idx + if -1 in [lepminus_idx, lepplus_idx, nu_idx]: + print([lepminus_idx, lepplus_idx, nu_idx]) + print(record.signature.secondary_types) + print("Couldn't find two leptons and a neutrino in the final state!") + exit(0) + secondaries[lepminus_idx].four_momentum = ( + np.squeeze(four_momenta["P_decay_ell_minus"]) + ) + secondaries[lepplus_idx].four_momentum = ( + np.squeeze(four_momenta["P_decay_ell_plus"]) + ) + secondaries[nu_idx].four_momentum = ( + np.squeeze(four_momenta["P_decay_N_daughter"]) + ) + return record + diff --git a/resources/CrossSections/DarkNewsTables/logger.py b/resources/CrossSections/DarkNewsTables/logger.py new file mode 100644 index 00000000..e8ebef6f --- /dev/null +++ b/resources/CrossSections/DarkNewsTables/logger.py @@ -0,0 +1,11 @@ +# Monkey patch DarkNews logger to hide printouts + +from DarkNews.ModelContainer import ModelContainer +ModelContainer_configure_logger = ModelContainer.configure_logger + +@functools.wraps(ModelContainer.configure_logger) +def suppress_info(self, logger, loglevel="INFO", prettyprinter=None, logfile=None, verbose=False): + return ModelContainer_configure_logger(self, logger, loglevel="WARNING", prettyprinter=prettyprinter, logfile=logfile, verbose=verbose) + +ModelContainer.configure_logger = suppress_info + diff --git a/resources/CrossSections/DarkNewsTables/processes.py b/resources/CrossSections/DarkNewsTables/processes.py new file mode 100644 index 00000000..c26e0aad --- /dev/null +++ b/resources/CrossSections/DarkNewsTables/processes.py @@ -0,0 +1,342 @@ +import os +import siren + +base_path = os.path.dirname(os.path.abspath(__file__)) +loader_file = os.path.join(base_path, "loader.py") +siren._util.load_module("loader", loader_file) + +from DarkNews.ModelContainer import ModelContainer + +xs_path = siren.utilities.get_cross_section_model_path( + f"DarkNewsTables-v{siren.utilities.darknews_version()}", must_exist=False +) + +def GetDetectorModelTargets(detector_model): + """ + Determines the targets that exist inside the detector model + :return: lists of targets and strings + :rtype: (list, list) + """ + count = 0 + targets = [] + target_strs = [] + while detector_model.Materials.HasMaterial(count): + for target in detector_model.Materials.GetMaterialTargets(count): + if target not in targets: + targets.append(target) + if str(target).find("Nucleus") == -1: + continue + else: + target_str = str(target)[ + str(target).find("Type") + 5 : str(target).find("Nucleus") + ] + if target_str == "H": + target_str = "H1" + if target_str not in target_strs: + target_strs.append(target_str) + count += 1 + return targets, target_strs + + +def load_cross_section( + model_container, + upscattering_key, + tolerance=1e-6, + interp_tolerance=5e-2, + always_interpolate=True, +): + if upscattering_key not in model_container.ups_cases: + raise KeyError( + f'Upscattering key "{upscattering_key}" not present in model_container.ups_cases' + ) + upscattering_model = model_container.ups_cases[upscattering_key] + return PyDarkNewsCrossSection( + upscattering_model, + tolerance=tolerance, + interp_tolerance=interp_tolerance, + always_interpolate=always_interpolate, + ) + + +def load_cross_section_from_table( + model_container, + upscattering_key, + table_dir, + tolerance=1e-6, + interp_tolerance=5e-2, + always_interpolate=True, +): + subdir = "_".join(["CrossSection"] + [str(x) for x in upscattering_key]) + table_subdir = os.path.join(table_dir, subdir) + + cross_section = load_cross_section( + model_container, + upscattering_key, + tolerance=tolerance, + interp_tolerance=interp_tolerance, + always_interpolate=always_interpolate, + ) + cross_section.load_from_table(table_subdir) + return cross_section + + +def load_cross_section_from_pickle( + upscattering_key, + table_dir, + tolerance=1e-6, + interp_tolerance=5e-2, + always_interpolate=True, +): + subdir = "_".join(["CrossSection"] + [str(x) for x in upscattering_key]) + table_subdir = os.path.join(table_dir, subdir) + fname = os.path.join(table_dir, "xs_object.pkl") + with open(fname, "rb") as f: + xs_obj = pickle.load(f) + xs_obj.configure( + tolerance=tolerance, + interp_tolerance=interp_tolerance, + always_interpolate=always_interpolate, + ) + return xs_obj + + +def attempt_to_load_cross_section( + models, + ups_key, + tabel_dir, + preferences, +): + if len(preferences) == 0: + raise ValueError("preferences must have at least one entry") + + subdir = "_".join(["CrossSection"] + [str(x) for x in ups_key]) + loaded = False + cross_section = None + for p in preferences: + if p == "table": + table_subdir = os.path.join(table_dir, subdir) + if os.path.isdir(table_subdir): + try: + cross_section = append( + load_cross_section_from_table( + models, + ups_key, + table_subdir, + tolerance=tolerance, + interp_tolerance=interp_tolerance, + always_interpolate=always_interpolate, + ) + ) + loaded = True + except Exception as e: + print( + "Encountered exception while loading DN cross section from table" + ) + raise e from None + break + elif p == "pickle": + table_subdir = os.path.join(table_dir, subdir) + if os.path.isdir(table_subdir): + try: + cross_section = append( + load_cross_section_from_pickle( + ups_key, + table_subdir, + tolerance=tolerance, + interp_tolerance=interp_tolerance, + always_interpolate=always_interpolate, + ) + ) + loaded = True + except Exception as e: + print( + "Encountered exception while loading DN cross section from pickle" + ) + raise e from None + break + elif p == "normal": + try: + cross_sections = append( + load_cross_section( + models, + ups_key, + tolerance=tolerance, + interp_tolerance=interp_tolerance, + always_interpolate=always_interpolate, + ) + ) + loaded = True + except Exception as e: + print("Encountered exception while loading DN cross section normally") + raise e from None + break + + if not loaded: + raise RuntimeError("Not able to load DN cross section with any strategy") + return cross_section + + +def load_cross_sections( + model_kwargs, + table_dir=None, + tolerance=1e-6, + interp_tolerance=5e-2, + always_interpolate=True, + preferences=None, +): + if preferences is None: + preferences = ["table", "pickle", "normal"] + + models = ModelContainer(**model_kwargs) + + if table_dir is None: + table_dir = "" + + cross_sections = [] + for ups_key, ups_case in models.ups_cases.items(): + cross_sections.append( + attempt_to_load_cross_section(models, ups_key, table_dir, preferences) + ) + + return cross_sections + + +def load_processes( + primary_type=None, + target_types=None, + fill_tables_at_start=False, + Emax=None, + m4=None, + mu_tr_mu4=None, # GeV^-1 + UD4=0, + Umu4=0, + epsilon=0.0, + gD=0.0, + decay_product="photon", + noHC=True, + HNLtype="dirac", + nuclear_targets=None, + detector_model=None, + tolerance=1e-6, # supposed to represent machine epsilon + interp_tolerance=5e-2, # relative interpolation tolerance + always_interpolate=True, # bool whether to always interpolate the total/differential cross section +): + + if nuclear_targets is None and detector_model is None: + raise ValueError( + 'Either "nuclear_targets" or "detector_model" must be provided' + ) + + if nuclear_targets is None: + nuclear_targets = GetDetectorModelTargets(detector_model)[1] + + base_path = os.path.dirname(os.path.abspath(__file__)) + table_dir = os.path.join(base_path, "Dipole_M%2.2e_mu%2.2e" % (m4, mu_tr_mu4)) + + model_kwargs = { + "m4": m4, + "mu_tr_mu4": mu_tr_mu4, + "UD4": UD4, + "Umu4": Umu4, + "epsilon": epsilon, + "gD": gD, + "decay_product": decay_product, + "noHC": noHC, + } + + cross_sections = load_cross_sections( + model_kwargs, + table_dir=None, + tolerance=tolerance, + interp_tolerance=interp_tolerance, + always_interpolate=always_interpolate, + ) + + if fill_tables_at_start: + if Emax is None: + print( + "WARNING: Cannot fill cross section tables without specifying a maximum energy" + ) + else: + for cross_section in cross_sections: + cross_section.FillInterpolationTables(Emax=Emax) + + # Initialize primary InteractionCollection + # Loop over available cross sections and save those which match primary type + primary_cross_sections = [] + for cross_section in self.DN_processes.cross_sections: + if primary_type == _dataclasses.Particle.ParticleType( + cross_section.ups_case.nu_projectile.pdgid + ): + primary_cross_sections.append(cross_section) + primary_interaction_collection = _interactions.InteractionCollection( + primary_type, primary_cross_sections + ) + + # Initialize secondary processes and define secondary InteractionCollection objects + secondary_decays = {} + # Also keep track of the minimum decay width for defining the position distribution later + self.DN_min_decay_width = np.inf + # Loop over available decays, group by parent type + for decay in self.DN_processes.decays: + secondary_type = _dataclasses.Particle.ParticleType( + decay.dec_case.nu_parent.pdgid + ) + if secondary_type not in secondary_decays.keys(): + secondary_decays[secondary_type] = [] + secondary_decays[secondary_type].append(decay) + total_decay_width = decay.TotalDecayWidth(secondary_type) + if total_decay_width < self.DN_min_decay_width: + self.DN_min_decay_width = total_decay_width + # Now make the list of secondary cross section collections + # Add new secondary injection and physical processes at the same time + secondary_interaction_collections = [] + for secondary_type, decay_list in secondary_decays.items(): + # Define a sedcondary injection distribution + secondary_injection_process = _injection.SecondaryInjectionProcess() + secondary_physical_process = _injection.PhysicalProcess() + secondary_injection_process.primary_type = secondary_type + secondary_physical_process.primary_type = secondary_type + + # Add the secondary position distribution + if self.fid_vol is not None: + secondary_injection_process.AddSecondaryInjectionDistribution( + _distributions.SecondaryBoundedVertexDistribution(self.fid_vol) + ) + else: + secondary_injection_process.AddSecondaryInjectionDistribution( + _distributions.SecondaryPhysicalVertexDistribution() + ) + + self.secondary_injection_processes.append(secondary_injection_process) + self.secondary_physical_processes.append(secondary_physical_process) + + secondary_interaction_collections.append( + _interactions.InteractionCollection(secondary_type, decay_list) + ) + + self.SetInteractions( + primary_interaction_collection, secondary_interaction_collections + ) + + +def GetFiducialVolume(self): + """ + :return: identified fiducial volume for the experiment, None if not found + """ + detector_model_file = _util.get_detector_model_path(self.experiment) + with open(detector_model_file) as file: + fiducial_line = None + detector_line = None + for line in file: + data = line.split() + if len(data) <= 0: + continue + elif data[0] == "fiducial": + fiducial_line = line + elif data[0] == "detector": + detector_line = line + if fiducial_line is None or detector_line is None: + return None + return _detector.DetectorModel.ParseFiducialVolume(fiducial_line, detector_line) + return None