diff --git a/docs/conf.py b/docs/conf.py index 1aaa0f0..fd3da7f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,4 +1,8 @@ import os +import sys +sys.path.insert(0, os.path.abspath('../src')) +sys.path.insert(0, os.path.abspath('../src/idmlaser_cholera/')) +sys.path.insert(0, os.path.abspath('../src/idmlaser_cholera/mods')) extensions = [ "sphinx.ext.autodoc", @@ -39,6 +43,37 @@ } html_short_title = f"{project}-{version}" -napoleon_use_ivar = True -napoleon_use_rtype = False -napoleon_use_param = False +# Napoleon settings (Napolean converts Google-style docstrings to reStructuredText) +napoleon_google_docstring = True +napoleon_numpy_docstring = True +napoleon_include_init_with_doc = False +napoleon_include_private_with_doc = False +napoleon_include_special_with_doc = True +napoleon_use_admonition_for_examples = False +napoleon_use_admonition_for_notes = False +napoleon_use_admonition_for_references = False +napoleon_use_ivar = True # from Cookiecutter template, False is the default +napoleon_use_param = False # from Cookiecutter template, True is the default +napoleon_use_rtype = False # from Cookiecutter template, True is the default +napoleon_preprocess_types = False +napoleon_type_aliases = None +napoleon_attr_annotations = True + +mathjax3_config = {"TeX": {"Macros": {"small": ["{\\scriptstyle #1}", 1]}}} + +# Prevent the following warning: +# sphinx/builders/linkcheck.py:86: RemovedInSphinx80Warning: The default value for 'linkcheck_report_timeouts_as_broken' will change to False in Sphinx 8, meaning that request timeouts will be reported with a new 'timeout' status, instead of as 'broken'. This is intended to provide more detail as to the failure mode. See https://github.com/sphinx-doc/sphinx/issues/11868 for details. +# warnings.warn(deprecation_msg, RemovedInSphinx80Warning, stacklevel=1) +linkcheck_report_timeouts_as_broken = False +latex_documents = [ + ('index', 'LASER_Cholera.tex', 'LASER_Cholera', 'IDM', 'manual'), +] +linkcheck_ignore = [ + r"https://laser\.readthedocs\.io/", + r"https://pypi\.org/project/idmlaser" +] + +exclude_patterns = [ + 'reference/idmlaser_cholera.tools.make_suitability_random_data.rst', # Exclude this specific .rst file + 'idmlaser_cholera/__main__.py', +] diff --git a/docs/reference/idmlaser.rst b/docs/reference/idmlaser.rst deleted file mode 100644 index e1a02f2..0000000 --- a/docs/reference/idmlaser.rst +++ /dev/null @@ -1,9 +0,0 @@ -idmlaser -======== - -.. testsetup:: - - from idmlaser import * - -.. automodule:: idmlaser - :members: diff --git a/requirements.txt b/requirements.txt index 26909e1..dec3d6a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,6 @@ h5py # caching pandas # sigh numba numpy -pkg_resources requests taichi tqdm diff --git a/src/idmlaser_cholera/models/__init__.py b/src/idmlaser_cholera/models/__init__.py deleted file mode 100644 index cdecde7..0000000 --- a/src/idmlaser_cholera/models/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Expose the various model implementations to the package level. diff --git a/src/idmlaser_cholera/models/model.py b/src/idmlaser_cholera/models/model.py deleted file mode 100644 index fb5220e..0000000 --- a/src/idmlaser_cholera/models/model.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Base class for all models.""" - -from tqdm import tqdm - - -class DiseaseModel: - """Base class for all disease models.""" - - def __init__(self): - self._tick = 0 - return - - def initialize(self) -> None: - raise NotImplementedError - - def step(self, tick: int, pbar: tqdm) -> None: - raise NotImplementedError - - def finalize(self) -> None: - raise NotImplementedError - - def run(self, ticks: int) -> None: - for _tick in (pbar := tqdm(range(ticks))): - self.step(self._tick, pbar) - self._tick += 1 - return - - def serialize(self, filename: str) -> None: - raise NotImplementedError - - @classmethod - def deserialize(self, filename: str) -> "DiseaseModel": - raise NotImplementedError diff --git a/src/idmlaser_cholera/models/numpynumba.py b/src/idmlaser_cholera/models/numpynumba.py deleted file mode 100644 index b758b57..0000000 --- a/src/idmlaser_cholera/models/numpynumba.py +++ /dev/null @@ -1,495 +0,0 @@ -"""Spatial SEIR model implementation with NumPy+Numba""" - -import json -from datetime import datetime -from datetime import timezone -from pathlib import Path -from typing import Optional -from typing import Tuple - -import numba as nb -import numpy as np -import pandas as pd -from tqdm import tqdm - -from idmlaser.numpynumba import Demographics -from idmlaser.numpynumba import Population -from idmlaser.utils import NumpyJSONEncoder - -from .model import DiseaseModel - -_STATES_TYPE_NP = np.uint8 # S=0, E=1, I=2, R=3, ACTIVE=0x80, DECEASED=0x40 -_STATES_TYPE_NB = nb.uint8 -STATE_ACTIVE = np.uint8(0x80) # bit flag for active state -STATE_SUSCEPTIBLE = STATE_ACTIVE | np.uint8(0) -STATE_EXPOSED = STATE_ACTIVE | np.uint8(1) -STATE_INFECTIOUS = STATE_ACTIVE | np.uint8(2) -STATE_RECOVERED = STATE_ACTIVE | np.uint8(3) -STATE_DECEASED = np.uint8(0x40) # bit flag for deceased state - -_DOB_TYPE_NP = np.int32 # measured in days anchored at t=0, so all initial DoBs are negative -_SUSCEPTIBILITY_TYPE_NP = np.uint8 # currently just 1|0 -_SUSCEPTIBILITY_TYPE_NB = nb.uint8 -_ITIMER_TYPE_NP = np.uint8 # don't need more than 255 days of infectiousness at this point -_ITIMER_TYPE_NB = nb.uint8 -_NODEID_TYPE_NP = np.uint16 # don't need more than 65,535 nodes -_NODEID_TYPE_NB = nb.uint16 - - -class NumbaSpatialSEIR(DiseaseModel): - """Spatial SEIR model implementation with NumPy+Numba""" - - def __init__(self, parameters: dict): - super().__init__() - self.update_parameters(parameters) - self._population = None - self._popcounts = None - # incubation_update comes _after_ infection_update so we don't immediately decrement the infection timer - self._phases = [vital_dynamics, infection_update, incubation_update, transmission_update, report_update] - self._demographics = None - self._network = None - - self._metrics = [] - - self.prng = np.random.default_rng(seed=self.parameters.prng_seed) - seed_numba(self.parameters.prng_seed) - # print(f"Threading layer chosen: {nb.threading_layer()}") - - return - - def update_parameters(self, parameters: dict) -> None: - """Update the parameters of the model.""" - self.parameters = self._parameters() - for k, v in (parameters if isinstance(parameters, dict) else vars(parameters)).items(): - self.parameters.__setattr__(k, v) - - self.parameters.beta = self.parameters.r_naught / self.parameters.inf_mean - - if self.parameters.prng_seed is None: - self.parameters.prng_seed = datetime.now(timezone.utc).milliseconds() - - print(f"Model parameters: {vars(self.parameters)}") - - return - - class _parameters: - """Default parameters for the model.""" - - def __init__(self) -> None: - """Initialize the default parameters.""" - self.exp_mean = np.float32(7) - self.exp_std = np.float32(1) - self.inf_mean = np.float32(7) - self.inf_std = np.float32(1) - self.r_naught = np.float32(14) - self.prng_seed = np.uint32(20240412) - self.ticks = np.uint32(365) - - return - - def initialize( - self, - capacity: np.uint32, - demographics: Demographics, - initial: np.ndarray, - network: np.ndarray, - ) -> None: - """ - Initialize the model with the given parameters. - - Parameters: - - capacity: The maximum capacity of the model. - - demographics: The demographic information. - - initial: The initial state of the model (S, E, I, and R populations - [node, state]). - - network: The network structure of the model. - - Returns: - None - """ - assert network.shape[0] == network.shape[1], "Network must be square" - assert network.shape[0] == demographics.nnodes, "Network must be same size as number of nodes" - print(f"Initializing model with {demographics.nnodes} nodes: ", end="") - self._popcounts = demographics.population[0] - print(f"(initial population: {self._popcounts.sum():,} maximum capacity: {capacity:,})") - population = Population(capacity) - population.add_property("states", dtype=_STATES_TYPE_NP, default=0) - population.add_property("dob", dtype=_DOB_TYPE_NP, default=0) - population.add_property("susceptibility", dtype=_SUSCEPTIBILITY_TYPE_NP, default=1) - population.add_property("etimer", dtype=_ITIMER_TYPE_NP, default=0) - population.add_property("itimer", dtype=_ITIMER_TYPE_NP, default=0) - population.add_property("nodeid", dtype=_NODEID_TYPE_NP, default=0) - - # iterate through population setting nodeid = i for next pops[i] individuals - nodeidx = 0 - states = population.states - susceptibilities = population.susceptibility # pre-fetch for speed - etimers = population.etimer # pre-fetch for speed - itimers = population.itimer # pre-fetch for speed - nodeids = population.nodeid # pre-fetch for speed - init_pop = demographics.population[0] - - ISUS = 0 - IINC = 1 - IINF = 2 - IREC = 3 - - for c in range(demographics.nnodes): - popcount = init_pop[c] - assert initial[c].sum() == popcount, "SEIR counts do not sum to node population" - i, j = population.add(popcount) - assert i == nodeidx, "Population index mismatch" - assert j == nodeidx + popcount, "Population index mismatch" - nodeids[i:j] = c # assign new individuals to this node - # susceptible individuals - numsus = initial[c, ISUS] - states[nodeidx : nodeidx + numsus] = STATE_SUSCEPTIBLE - # susceptibilities[nodeidx : nodeidx + numrec] = 1 # unnecessary - nodeidx += numsus - # incubating individuals - numinc = initial[c, IINC] - states[nodeidx : nodeidx + numinc] = STATE_EXPOSED - etimers[nodeidx : nodeidx + numinc] = ( - self.prng.normal(self.parameters.exp_mean, self.parameters.exp_std, size=numinc).round().astype(_ITIMER_TYPE_NP) - ) + 1 - nodeidx += numinc - # infectious individuals - numinf = initial[c, IINF] - states[nodeidx : nodeidx + numinf] = STATE_INFECTIOUS - itimers[nodeidx : nodeidx + numinf] = ( - self.prng.normal(self.parameters.inf_mean, self.parameters.inf_std, size=numinf).round().astype(_ITIMER_TYPE_NP) - ) + 1 - nodeidx += numinf - # recovered individuals - numrec = initial[c, IREC] - states[nodeidx : nodeidx + numrec] = STATE_RECOVERED - susceptibilities[nodeidx : nodeidx + numrec] = 0 - nodeidx += numrec - # nodeidx += popcount - - self._population = population - - self._demographics = demographics - self._network = network - - # pre-allocate these rather than allocating new array on each timestep - self._forces = np.zeros(demographics.nnodes, dtype=np.float32) - - # ticks+1 here for room to capture the initial state - # 3 dimensions: time (tick), state (S:0, E:1, I:2, R:3, D:4), node (0..nnodes-1) - # 5 columns: susceptible, exposed, infected, recovered, deceased - timestep is implied - self.report = np.zeros((self.parameters.ticks + 1, 5, demographics.nnodes), dtype=np.uint32) - - # self._contagion = np.zeros(demographics.nnodes, dtype=np.uint32) - self.cases = np.zeros((self.parameters.ticks, demographics.nnodes), dtype=np.uint32) - - # record initial state, state _after_ timestep i processing will be in index i+1 - report_update(self, -1) - - return - - @property - def population(self): - """Return the population.""" - return self._population - - # add a processing step to be called at each time step - def add_phase(self, phase): - """Add a processing phase to be called at each time step""" - self._phases.append(phase) - return - - def step(self, tick: int, pbar: tqdm) -> None: - """Step the model by one tick.""" - timings = [tick] - for phase in self._phases: - t0 = datetime.now(tz=None) # noqa: DTZ005 - phase(self, tick) - t1 = datetime.now(tz=None) # noqa: DTZ005 - delta = t1 - t0 - timings.append(delta.seconds * 1_000_000 + delta.microseconds) - self._metrics.append(timings) - - @property - def metrics(self): - return pd.DataFrame(data=self._metrics, columns=["tick"] + [fn.__name__ for fn in self._phases]) # np.array(self._metrics) - - def finalize(self, directory: Optional[Path] = None) -> Tuple[Optional[Path], Path]: - """Finalize the model.""" - directory = directory if directory else self.parameters.output - directory.mkdir(parents=True, exist_ok=True) - prefix = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") - prefix += f"-{self.parameters.scenario}" - try: - Path(paramfile := directory / (prefix + "-parameters.json")).write_text(json.dumps(vars(self.parameters), cls=NumpyJSONEncoder)) - print(f"Wrote parameters to '{paramfile}'.") - except Exception as e: - print(f"Error writing parameters: {e}") - paramfile = None - prefix += f"-{self._demographics.nnodes}-{self.parameters.ticks}-" - np.save(npyfile := directory / (prefix + "spatial_seir.npy"), self.report) - print(f"Wrote SEIR channels, by node, to '{npyfile}'.") - - return (paramfile, npyfile) - - def run(self, ticks: int) -> None: - """Run the model for a number of ticks.""" - start = datetime.now(timezone.utc) - for tick in (pbar := tqdm(range(ticks))): - self.step(tick, pbar) - finish = datetime.now(timezone.utc) - print(f"elapsed time: {finish - start}") - return - - def serialize(self, filename: str) -> None: - """Serialize the model to a file.""" - raise NotImplementedError - - @classmethod - def deserialize(self, filename: str) -> "DiseaseModel": - """Deserialize the model from a file.""" - raise NotImplementedError - - -# model step phases -# We do the dance seen below a couple of times because Numba doesn't know what it can -# do with Python classes, e.g. node. So we use Numba on an inner loop with the -# argument types explicitly passed and call the inner loop from the more general step function. - -# Note that we use the _NB (Numba) type versions here vs. the _NP (Numpy) type versions above. - - -def vital_dynamics(model: NumbaSpatialSEIR, tick: int) -> None: - """Update the vital dynamics of the population.""" - - population = model.population - - year = tick // 365 - doy = tick % 365 + 1 # day of year, 1-based - - # Deactivate deaths_t randomly selected individuals - # deaths = sum(node.deaths[tick] for node in model.nodes) - - # Activate births_t new individuals as susceptible - annual_births = model._demographics.births[year] - todays_births = (annual_births * doy // 365) - (annual_births * (doy - 1) // 365) - if (total_births := todays_births.sum()) > 0: - istart, iend = population.add(total_births) - population.dob[istart:iend] = tick - population.states[istart:iend] = STATE_SUSCEPTIBLE - population.susceptibility[istart:iend] = 1 - index = istart - for nodeid, births in enumerate(todays_births): - population.nodeid[index : index + births] = _NODEID_TYPE_NP(nodeid) # assign newborns to their nodes - index += births - - # Activate immigrations_t new individuals as not-susceptible - annual_immigrations = model._demographics.immigrations[year] - todays_immigrations = (annual_immigrations * doy // 365) - (annual_immigrations * (doy - 1) // 365) - if (total_immigrations := todays_immigrations.sum()) > 0: - istart, iend = model.population.add(total_immigrations) - population.states[istart:iend] = STATE_RECOVERED - population.susceptibility[istart:iend] = 0 - index = istart - for nodeid, immigrations in enumerate(todays_immigrations): - population.nodeid[index : index + immigrations] = _NODEID_TYPE_NP(nodeid) # assign immigrants to their nodes - index += immigrations - - return - - -@nb.njit((_STATES_TYPE_NB[:], _ITIMER_TYPE_NB[:], nb.uint32), parallel=True, nogil=True, cache=True) -def infection_update_inner(states, timers, count): - for i in nb.prange(count): - t = timers[i] - if t > 0: - t -= 1 - timers[i] = t - if t == 0: - states[i] = STATE_RECOVERED - # No other processing, susceptibility is already set to 0 in the transmission phase - return - - -def infection_update(model: NumbaSpatialSEIR, _tick: int) -> None: - """Update the infection timer for each individual in the population.""" - - infection_update_inner(model.population.states, model.population.itimer, model.population.count) - - return - - -@nb.njit( - (_STATES_TYPE_NB[:], _ITIMER_TYPE_NB[:], _ITIMER_TYPE_NB[:], nb.uint32, nb.float32, nb.float32), parallel=True, nogil=True, cache=True -) -def incubation_update_inner(states, etimers, itimers, count, inf_mean, inf_std): - for i in nb.prange(count): - t = etimers[i] - if t > 0: # if you have an active exposure timer... - t -= 1 - etimers[i] = t # ...decrement it - if t == 0: # if it has reached 0... - # set your infection timer to a draw from a normal distribution - itimers[i] = _ITIMER_TYPE_NP(np.round(np.random.normal(inf_mean, inf_std))) - states[i] = STATE_INFECTIOUS # ...and set your state to infectious - return - - -def incubation_update(model, _tick): - """Update the incubation timer for each individual in the population.""" - - incubation_update_inner( - model.population.states, - model.population.etimer, - model.population.itimer, - model.population.count, - model.parameters.inf_mean, - model.parameters.inf_std, - ) - return - - -@nb.njit( - (_STATES_TYPE_NB[:], _SUSCEPTIBILITY_TYPE_NB[:], nb.uint16[:], nb.float32[:], _ITIMER_TYPE_NB[:], nb.uint32, nb.float32, nb.float32), - parallel=True, - nogil=True, - cache=True, -) -def tx_inner(states, susceptibilities, nodeids, forces, etimers, count, exp_mean, exp_std): - for i in nb.prange(count): - if states[i] == STATE_SUSCEPTIBLE: - force = susceptibilities[i] * forces[nodeids[i]] # force of infection attenuated by personal susceptibility - if (force > 0) and (np.random.random_sample() < force): # draw random number < force means infection - susceptibilities[i] = 0.0 # set susceptibility to 0.0 - # set exposure timer for newly infected individuals to a draw from a normal distribution - etimers[i] = _ITIMER_TYPE_NP(np.round(np.random.normal(exp_mean, exp_std))) - states[i] = STATE_EXPOSED # set state to exposed - return - - -def transmission_update(model, tick) -> None: - """Do transmission based on infectious and susceptible individuals in the population.""" - - population = model.population - - # contagion = model._contagion - contagion = model.cases[tick, :] - nodeids = population.nodeid[: population.count] - states = population.states[: population.count] - np.add.at(contagion, nodeids[states == STATE_INFECTIOUS], 1) # accumulate contagion by node, 1 unit per infected individual - - network = model._network - transfer = (contagion * network).round().astype(np.uint32) # contagion * network = transfer - contagion += transfer.sum(axis=1) # increment by incoming "migration" - contagion -= transfer.sum(axis=0) # decrement by outgoing "migration" - - # model.cases[tick, :] = contagion # contagion is a proxy for # of infected individual/prevalence - - forces = model._forces - beta_effective = model.parameters.beta + model.parameters.seasonality_factor * np.sin( - 2 * np.pi * (tick - model.parameters.seasonality_offset) / 365 - ) - np.multiply(contagion, beta_effective, out=forces) # pre-multiply by beta (scalar now, could be array) - # np.divide(forces, model._popcounts, out=forces) # divide by population (forces is now per-capita) - np.divide(forces, model._demographics.population[tick // 365], out=forces) # divide by population (forces is now per-capita) - - tx_inner( - population.states, - population.susceptibility, - population.nodeid, - forces, - population.etimer, - population.count, - model.parameters.exp_mean, - model.parameters.exp_std, - ) - - return - - -@nb.njit( - ( - _STATES_TYPE_NB[:], # states - state of each individual - _SUSCEPTIBILITY_TYPE_NB[:], # susceptibilities - susceptibility of each individual - _ITIMER_TYPE_NB[:], # etimers - exposure timers of each individual - _ITIMER_TYPE_NB[:], # itimers - infection timers of each individual - _NODEID_TYPE_NB[:], # nodeids - node id of each individual - nb.uint32[:, :], # results - results array[channels, nodes] - nb.uint32[:, :, :], # scratch - scratch array[threads, channels, nodes] - ), - parallel=True, - nogil=True, - cache=True, -) -def report_parallel(states, susceptibilities, etimers, itimers, nodeids, results, scratch): - # results indexed by state (SEIR) and node - # scratch indexed by thread and node - - num_agents = susceptibilities.shape[0] - num_threads = scratch.shape[0] # should be equivalent to nb.get_num_threads() - see calling function - per_thread = (num_agents + num_threads - 1) // num_threads - scratch.fill(0) - for c in nb.prange(num_threads): - start = c * per_thread - end = min((c + 1) * per_thread, num_agents) - for i in range(start, end): - s = states[i] - if s == STATE_SUSCEPTIBLE: - scratch[c, 0, nodeids[i]] += 1 - elif s == STATE_EXPOSED: - scratch[c, 1, nodeids[i]] += 1 - elif s == STATE_INFECTIOUS: - scratch[c, 2, nodeids[i]] += 1 - elif s == STATE_RECOVERED: - scratch[c, 3, nodeids[i]] += 1 - elif (s & STATE_ACTIVE) == 0: - scratch[c, 4, nodeids[i]] += 1 - results[:, :] = scratch.sum(axis=0) - - return - - -_scratch = None - - -def report_update(model: NumbaSpatialSEIR, tick: int) -> None: - """Record the state of the population at the current tick.""" - - population = model.population - - # # Non-Numba version - # results = model.report - # nodeids = population.nodeid[: population.count] - # susceptibility = population.susceptibility[: population.count] - # etimer = population.etimer[: population.count] - # itimer = population.itimer[: population.count] - - # np.add.at(results[tick + 1, 0], nodeids[susceptibility > 0.0], 1) - # np.add.at(results[tick + 1, 1], nodeids[etimer > 0], 1) - # np.add.at(results[tick + 1, 2], nodeids[itimer > 0], 1) - # np.add.at(results[tick + 1, 3], nodeids[(susceptibility == 0.0) & (etimer == 0) & (itimer == 0)], 1) - - global _scratch - if _scratch is None: - # 5 = S, E, I, R, D - _scratch = np.zeros((nb.get_num_threads(), 5, model._demographics.nnodes), dtype=np.uint32) - report_parallel( - population.states[: population.count], - population.susceptibility[: population.count], - population.etimer[: population.count], - population.itimer[: population.count], - population.nodeid[: population.count], - model.report[tick + 1, :, :], - _scratch, - ) - - return - - -@nb.njit -def seed_numba(a): - num_threads = nb.get_num_threads() - # print(f"Seeding Numba random number generator with {num_threads} threads") - for _i in nb.prange(num_threads): - # seed the Numba random number generator (even though it says "np.random") - np.random.seed(a + nb.get_thread_id()) - return diff --git a/src/idmlaser_cholera/models/taichi.py b/src/idmlaser_cholera/models/taichi.py deleted file mode 100644 index 7b88500..0000000 --- a/src/idmlaser_cholera/models/taichi.py +++ /dev/null @@ -1,601 +0,0 @@ -"""Spatial SEIR model implementation using Taichi.""" - -import json -from datetime import datetime -from datetime import timezone -from pathlib import Path -from typing import Optional -from typing import Tuple - -import numpy as np -import pandas as pd -import taichi as ti -from tqdm import tqdm - -from idmlaser.numpynumba import Demographics -from idmlaser.taichi import Population -from idmlaser.utils import NumpyJSONEncoder - -from .model import DiseaseModel - -ti.init(arch=ti.gpu) # , kernel_profiler=True) - - -########################################## - -STATE_INACTIVE = 0 -STATE_ACTIVE = 128 -STATE_SUSCEPTIBLE = STATE_ACTIVE | 1 -STATE_INCUBATING = STATE_ACTIVE | 2 -STATE_INFECTIOUS = STATE_ACTIVE | 4 -STATE_RECOVERED = STATE_ACTIVE | 64 - -########################################## - - -class TaichiSpatialSEIR(DiseaseModel): - """Spatial SEIR model implementation using Taichi.""" - - def __init__(self, parameters: dict): - super().__init__() - self.update_parameters(parameters) - self._demographics = None - self._population = None - - self._phases = [vital_dynamics, infection_update, incubation_update, transmission, report_update] - - self._metrics = [] - - return - - def update_parameters(self, parameters: dict): - self.parameters = self._parameters() - for k, v in (parameters if isinstance(parameters, dict) else vars(parameters)).items(): - self.parameters.__setattr__(k, v) - - self.parameters.beta = self.parameters.r_naught / self.parameters.inf_mean - - if self.parameters.prng_seed is None: - ti.set_random_seed(datetime.now(timezone.utc).milliseconds()) - - print(f"Model parameters: {vars(self.parameters)}") - - return - - class _parameters: - """Default parameters for the model.""" - - def __init__(self) -> None: - """Initialize the default parameters.""" - self.exp_mean = np.float32(7) - self.exp_std = np.float32(1) - self.inf_mean = np.float32(7) - self.inf_std = np.float32(1) - self.r_naught = np.float32(14) - self.prng_seed = np.uint32(20240507) - self.ticks = np.int32(365) - - return - - def initialize(self, max_capacity: int, demographics: Demographics, initial: np.ndarray, network: np.ndarray) -> None: - """Initialize the model with the given parameters.""" - - population = Population(max_capacity) - population.add_property("states", ti.u8) - population.add_property("susceptibility", ti.u8) - population.add_property("etimers", ti.u8) - population.add_property("itimers", ti.u8) - population.add_property("nodeids", ti.u16) - - num_pops = demographics.nnodes - self._npatches = num_pops - nyears = max(self.parameters.ticks // 365, 1) # at least one year - assert demographics.population.shape == ( - nyears, - num_pops, - ), f"Population shape {demographics.population.shape} does not match node shape {(nyears, num_pops)}" - self.node_pops = ti.ndarray(dtype=ti.i32, shape=demographics.population.shape) - self.node_pops.from_numpy(demographics.population) - assert network.shape == (num_pops, num_pops), f"Network shape {network.shape} does not match population shape {num_pops}" - self.network = ti.ndarray(dtype=ti.f32, shape=(num_pops, num_pops)) - self.network.from_numpy(network) - - # self.f_susceptibility = ti.ndarray(dtype=ti.u8, shape=pop_size) # initially 0, we will make Ss susceptible - # self.f_etimers = ti.ndarray(dtype=ti.u8, shape=pop_size) # initially 0, we will give Es non-zero etimers - # self.f_itimers = ti.ndarray(dtype=ti.u8, shape=(pop_size,)) # initially 0, we will give Is non-zero itimers - - # self.f_nodeids = ti.ndarray(dtype=ti.u16, shape=pop_size) # initially 0, we will set nodeids in the initialization kernel - - pop_size = demographics.population[0, :].sum() # initial population size - first, last = population.add(pop_size) - assert first == 0, f"First index {first} is not 0" - assert last == pop_size, f"Last index {last} is not {pop_size}" - offsets = np.zeros_like(demographics.population[0, :]) - offsets[1:] = np.cumsum(demographics.population[0, :-1]) - initialize_population( - offsets.astype(np.int32), - initial.astype(np.int32), - population.states, - population.susceptibility, - population.etimers, - population.itimers, - population.nodeids, - self.parameters.exp_std, - self.parameters.exp_mean, - self.parameters.inf_std, - self.parameters.inf_mean, - ) - - self._demographics = demographics - self._population = population - - self._report = ti.ndarray(dtype=ti.i32, shape=(self.parameters.ticks + 1, 4, num_pops)) # S, E, I, and R counts for each node - self.contagion = ti.ndarray(dtype=ti.i32, shape=num_pops) # buffer to hold the current contagion by node - self.forces = ti.ndarray(dtype=ti.f32, shape=num_pops) # buffer to hold the current forces of infection by node - self.transfer = ti.ndarray( - dtype=ti.i32, shape=(num_pops, num_pops) - ) # buffer to hold the amount of contagion to transfer from A to B - self.axis_sums = ti.ndarray(dtype=ti.i32, shape=num_pops) # buffer for summing incoming/outgoing contagion - - report_update(self, -1) # record the initial state - - return - - @property - def demographics(self): - return self._demographics - - @property - def population(self): - return self._population - - def run(self, ticks: int) -> None: - """Run the model for a number of ticks.""" - start = datetime.now(timezone.utc) - for tick in (pbar := tqdm(range(ticks))): - self.step(tick, pbar) - finish = datetime.now(timezone.utc) - print(f"elapsed time: {finish - start}") - - # ti.profiler.print_kernel_profiler_info() # defaults to "count" - # ti.profiler.print_kernel_profiler_info("trace") - - return - - def step(self, tick: int, pbar: tqdm) -> None: - """Step the model by one tick.""" - timings = [tick] - for phase in self._phases: - t0 = datetime.now(tz=None) # noqa: DTZ005 - phase(self, tick) - ti.sync() - t1 = datetime.now(tz=None) # noqa: DTZ005 - delta = t1 - t0 - timings.append(delta.seconds * 1_000_000 + delta.microseconds) - self._metrics.append(timings) - - @property - def metrics(self): - return pd.DataFrame(data=self._metrics, columns=["tick"] + [fn.__name__ for fn in self._phases]) # np.array(self._metrics) - - def finalize(self, directory: Optional[Path] = None) -> Tuple[Optional[Path], Path]: - """Finalize the model.""" - directory = directory if directory else self.parameters.output - directory.mkdir(parents=True, exist_ok=True) - prefix = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") - prefix += f"-{self.parameters.scenario}" - try: - Path(paramfile := directory / (prefix + "-parameters.json")).write_text(json.dumps(vars(self.parameters), cls=NumpyJSONEncoder)) - print(f"Wrote parameters to '{paramfile}'.") - except Exception as e: - print(f"Error writing parameters: {e}") - prefix += f"-{self._npatches}-{self.parameters.ticks}-" - np.save(npyfile := directory / (prefix + "spatial_seir.npy"), self.report) - print(f"Wrote SEIR channels, by node, to '{npyfile}'.") - - return (paramfile, npyfile) - - @property - def report(self): - return self._report.to_numpy() - - -@ti.kernel -def initialize_population( - offsets: ti.types.ndarray(ti.i32), # type: ignore - initial: ti.types.ndarray(ti.i32), # type: ignore - states: ti.types.ndarray(ti.u8), # type: ignore - susceptibility: ti.types.ndarray(ti.u8), # type: ignore - etimers: ti.types.ndarray(ti.u8), # type: ignore - itimers: ti.types.ndarray(ti.u8), # type: ignore - nodeids: ti.types.ndarray(ti.u16), # type: ignore - inc_std: ti.types.f32, - inc_mean: ti.types.f32, - inf_std: ti.types.f32, - inf_mean: ti.types.f32, -): - # for each node... - ti.loop_config(serialize=True) # serialize the loop so we can parallelize the initialization - for i in range(offsets.shape[0]): - count = ti.cast(0, ti.i32) - offset = offsets[i] + count - - # set susceptibility for S agents... - for j in range(initial[i, 0]): - states[offset + j] = ti.cast(STATE_SUSCEPTIBLE, ti.u8) - susceptibility[offset + j] = ti.cast(1, ti.u8) - count += initial[i, 0] - offset = offsets[i] + count - - # set etimer for E agents... - for j in range(initial[i, 1]): - states[offset + j] = ti.cast(STATE_INCUBATING, ti.u8) - etimers[offset + j] = ti.cast(ti.round(ti.randn() * inc_std + inc_mean), ti.u8) - count += initial[i, 1] - offset = offsets[i] + count - - # set itimer for I agents... - for j in range(initial[i, 2]): - states[offset + j] = ti.cast(STATE_INFECTIOUS, ti.u8) - itimers[offset + j] = ti.cast(ti.round(ti.randn() * inf_std + inf_mean), ti.u8) - count += initial[i, 2] - offset = offsets[i] + count - - # skip R agents... - for j in range(initial[i, 3]): - states[offset + j] = ti.cast(STATE_RECOVERED, ti.u8) - count += initial[i, 3] - offset = offsets[i] + count - - # set nodeid for all agents... - for j in range(offsets[i], offsets[i] + count): - nodeids[j] = ti.cast(i, ti.u16) - - return - - -@ti.kernel -def births_kernel( - first: ti.types.i32, - count: ti.types.i32, - nodeid: ti.types.u16, - states: ti.types.ndarray(ti.u8), # type: ignore - susceptibility: ti.types.ndarray(ti.u8), # type: ignore - nodeids: ti.types.ndarray(ti.u16), # type: ignore -): - for i in range(first, first + count): - states[i] = ti.cast(STATE_SUSCEPTIBLE, ti.u8) - susceptibility[i] = ti.cast(1, ti.u8) - nodeids[i] = nodeid - - return - - -@ti.kernel -def immigrations_kernel( - first: ti.types.i32, - count: ti.types.i32, - nodeid: ti.types.u16, - states: ti.types.ndarray(ti.u8), # type: ignore - susceptibility: ti.types.ndarray(ti.u8), # type: ignore - nodeids: ti.types.ndarray(ti.u16), # type: ignore -): - for i in range(first, first + count): - states[i] = ti.cast(STATE_RECOVERED, ti.u8) - susceptibility[i] = ti.cast(0, ti.u8) - nodeids[i] = nodeid - - return - - -def vital_dynamics(model: TaichiSpatialSEIR, tick: int) -> None: - population = model.population - year = tick // 365 - doy = (tick % 365) + 1 - - # Deactivate deaths_t agents - - # Activate births_t agents as susceptible - annual_births = model.demographics.births[year] - todays_births = (annual_births * doy // 365) - (annual_births * (doy - 1) // 365) - if (total_births := todays_births.sum()) > 0: - istart, _iend = population.add(total_births) - # population.dob[istart:iend] = tick - # population.states[istart:iend] = STATE_SUSCEPTIBLE - # population.susceptibility[istart:iend] = 1 # in births() kernel - index = istart - for nodeid, births in enumerate(todays_births): - if births > 0: - # population.nodeid[index : index + births] = nodeid # assign newborns to their nodes - births_kernel(index, births, nodeid, population.states, population.susceptibility, population.nodeids) - index += births - - # Activate immigrations_t agents as not-susceptible (recovered) - annual_immigrations = model.demographics.immigrations[year] - todays_immigrations = (annual_immigrations * doy // 365) - (annual_immigrations * (doy - 1) // 365) - if (total_immigrations := todays_immigrations.sum()) > 0: - istart, _iend = model.population.add(total_immigrations) - # population.states[istart:iend] = STATE_RECOVERED - # population.susceptibility[istart:iend] = 0 # in immigrations() kernel - index = istart - for nodeid, immigrations in enumerate(todays_immigrations): - if immigrations > 0: - # population.nodeid[index : index + immigrations] = nodeid # assign immigrants to their nodes - immigrations_kernel(index, immigrations, nodeid, population.states, population.susceptibility, population.nodeids) - index += immigrations - - return - - -@ti.kernel -def inf_update(count: ti.types.i32, states: ti.types.ndarray(ti.u8), itimers: ti.types.ndarray(ti.u8)): # type: ignore - for i in range(count): - if itimers[i] > 0: - tmp = itimers[i] - ti.cast(1, ti.u8) - itimers[i] = tmp - if tmp == 0: - states[i] = ti.cast(STATE_RECOVERED, ti.u8) - - -def infection_update(model: TaichiSpatialSEIR, _t: int) -> None: - inf_update(model.population.count, model.population.states, model.population.itimers) - return - - -@ti.kernel -def inc_update( - count: ti.types.i32, - states: ti.types.ndarray(ti.u8), # type: ignore - etimers: ti.types.ndarray(ti.u8), # type: ignore - itimers: ti.types.ndarray(ti.u8), # type: ignore - inf_std: ti.types.f32, - inf_mean: ti.types.f32, -): - for i in range(count): - if etimers[i] > 0: - tmp = etimers[i] - ti.cast(1, ti.u8) - etimers[i] = tmp - if tmp == 0: - duration = ti.round(ti.randn() * inf_std + inf_mean) - if duration <= 0: - duration = 1 - states[i] = ti.cast(STATE_INFECTIOUS, ti.u8) - itimers[i] = ti.cast(duration, ti.u8) - - -def incubation_update(model: TaichiSpatialSEIR, _t: int) -> None: - inc_update( - model.population.count, - model.population.states, - model.population.etimers, - model.population.itimers, - model.parameters.inf_std, - model.parameters.inf_mean, - ) - return - - -@ti.kernel -def tx_kernel( - tick: ti.types.i32, - count: ti.types.i32, - contagion: ti.types.ndarray(ti.i32), # type: ignore - states: ti.types.ndarray(ti.u8), # type: ignore - susceptibility: ti.types.ndarray(ti.u8), # type: ignore - itimers: ti.types.ndarray(ti.u8), # type: ignore - etimers: ti.types.ndarray(ti.u8), # type: ignore - nodeids: ti.types.ndarray(ti.u16), # type: ignore - network: ti.types.ndarray(ti.f32), # type: ignore - transfer: ti.types.ndarray(ti.i32), # type: ignore - axis_sums: ti.types.ndarray(ti.i32), # type: ignore - node_populations: ti.types.ndarray(ti.i32), # type: ignore - forces: ti.types.ndarray(ti.f32), # type: ignore - beta: ti.f32, - inc_std: ti.f32, - inc_mean: ti.f32, -): - # zero out the contagion array - for i in contagion: - contagion[i] = 0 - - # accumulate contagion for each node - for i in range(count): - # if (susceptibility[i] == 0) and (itimers[i] > 0): - if states[i] == STATE_INFECTIOUS: - contagion[ti.cast(nodeids[i], ti.i32)] += 1 - - # multiple accumulated contagion by the network - for i, j in transfer: - transfer[i, j] = ti.cast(ti.round(contagion[i] * network[i, j]), ti.i32) - - # accumulate across rows for incoming contagion - for i in axis_sums: - axis_sums[i] = 0 - for i, j in transfer: - axis_sums[j] += transfer[i, j] - for i in axis_sums: - contagion[i] = contagion[i] + axis_sums[i] - - # accumulate down columns for outgoing contagion - for i in axis_sums: - axis_sums[i] = 0 - for i, j in transfer: - axis_sums[i] += transfer[i, j] - for i in axis_sums: - contagion[i] = contagion[i] - axis_sums[i] - - # multiply contagion by beta - for i in forces: - forces[i] = beta * contagion[i] - - # divide node contagion by node population - year = tick // 365 - for i in forces: - forces[i] = forces[i] / node_populations[year, i] - - # visit each individual determining transmision by node force of infection and individual susceptibility - for i in range(count): - if ti.random() < (forces[ti.cast(nodeids[i], ti.i32)] * susceptibility[i]): - susceptibility[i] = ti.cast(0, ti.u8) - duration = ti.round(ti.randn() * inc_std + inc_mean) - if duration <= 0: - duration = 1 - states[i] = ti.cast(STATE_INCUBATING, ti.u8) - etimers[i] = ti.cast(duration, ti.u8) - - -def transmission(model: TaichiSpatialSEIR, tick: int) -> None: - tx_kernel( - tick, - model.population.count, - model.contagion, - model.population.states, - model.population.susceptibility, - model.population.itimers, - model.population.etimers, - model.population.nodeids, - model.network, - model.transfer, - model.axis_sums, - model.node_pops, - model.forces, - model.parameters.beta, - model.parameters.exp_std, - model.parameters.exp_mean, - ) - return - - -@ti.kernel -def report_kernel( - tick: ti.types.i32, - count: ti.types.i32, - results: ti.types.ndarray(ti.i32), # type: ignore - states: ti.types.ndarray(ti.u8), # type: ignore - susceptibility: ti.types.ndarray(ti.u8), # type: ignore - etimers: ti.types.ndarray(ti.u8), # type: ignore - itimers: ti.types.ndarray(ti.u8), # type: ignore - nodeids: ti.types.ndarray(ti.u16), # type: ignore -): - for i in range(count): - nodeid = ti.cast(nodeids[i], ti.i32) - state = states[i] - if state == STATE_SUSCEPTIBLE: - results[tick, 0, nodeid] += 1 - elif state == STATE_INCUBATING: - results[tick, 1, nodeid] += 1 - elif state == STATE_INFECTIOUS: - results[tick, 2, nodeid] += 1 - elif state == STATE_RECOVERED: - results[tick, 3, nodeid] += 1 - - return - - -def report_update(model: TaichiSpatialSEIR, tick: int) -> None: - report_kernel( - tick + 1, - model.population.count, - model._report, # Use internal _report Taichi array - model.population.states, - model.population.susceptibility, - model.population.etimers, - model.population.itimers, - model.population.nodeids, - ) - - return - - -#################################################################################################### - - -def spatial_seir(params): - # debugging ############################ - """ - @ti.kernel - def tx0(t: ti.i32): - # zero out the f_contagion array - for i in f_contagion: - f_contagion[i] = 0 - - @ti.kernel - def tx1(t: ti.i32): - # accumulate contagion for each node - for i in f_susceptibility: - if (f_susceptibility[i] == 0) and (f_itimers[i] > 0): - f_contagion[ti.cast(f_nodeids[i], ti.i32)] += 1 - - @ti.kernel - def tx2(t: ti.i32): - # multiple accumulated contagion by the network - for i, j in f_transfer: - f_transfer[i, j] = ti.cast( - ti.round(f_contagion[i] * f_network[i, j]), ti.i32 - ) - - @ti.kernel - def tx3(t: ti.i32): - # accumulate across rows for incoming contagion - for i in f_axis_sums: - f_axis_sums[i] = 0 - for i, j in f_transfer: - f_axis_sums[j] += f_transfer[i, j] - for i in f_axis_sums: - f_contagion[i] = f_contagion[i] + f_axis_sums[i] - - @ti.kernel - def tx4(t: ti.i32): - # accumulate down columns for outgoing contagion - for i in f_axis_sums: - f_axis_sums[i] = 0 - for i, j in f_transfer: - f_axis_sums[i] += f_transfer[i, j] - for i in f_axis_sums: - f_contagion[i] = f_contagion[i] - f_axis_sums[i] - - @ti.kernel - def tx6(t: ti.i32): - # multiply contagion by beta - for i in f_forces: - f_forces[i] = params.beta * f_contagion[i] - - @ti.kernel - def tx7(t: ti.i32): - # divide node contagion by node population - for i in f_forces: - f_forces[i] = f_forces[i] / f_node_populations[t, i] - - @ti.kernel - def tx8(t: ti.i32): - # visit each individual determining transmision by node force of infection and individual susceptibility - for i in f_susceptibility: - if ti.random() < ( - f_forces[ti.cast(f_nodeids[i], ti.i32)] * f_susceptibility[i] - ): - f_susceptibility[i] = ti.cast(0, ti.u8) - duration = ti.round(ti.randn() * params.inc_std + params.inc_mean) - if duration <= 0: - duration = 1 - f_etimers[i] = ti.cast(duration, ti.u8) - """ - ######################################## - - for t in tqdm(range(params.timesteps)): - inf_update() - inc_update() - - transmission(t) - # tx0(t) # zero out the f_contagion array - # tx1(t) # accumulate contagion for each node (into f_contagion[f_nodeids]) - # tx2(t) # multiple accumulated contagion by the network (into f_transfer) - # tx3(t) # accumulate across rows for incoming contagion (into f_contagion) - # tx4(t) # accumulate down columns for outgoing contagion (out of f_contagion) - # tx6(t) # multiply contagion by beta (into f_forces) - # tx7(t) # divide node contagion by node population (in f_forces) - # tx8( - # t - # ) # visit each individual determining transmision by node force of infection and individual susceptibility - ti.sync() - - return diff --git a/src/idmlaser_cholera/mymodel.py b/src/idmlaser_cholera/mymodel.py index 0dd8349..a44ac8b 100644 --- a/src/idmlaser_cholera/mymodel.py +++ b/src/idmlaser_cholera/mymodel.py @@ -26,8 +26,8 @@ def __init__(self, params): @classmethod def get(cls, params): - """ - Factory method to create and initialize a Model instance. + """Factory method to create and initialize a Model instance. + Handles cached data or initializes from data as needed. """ model = cls(params) # Create a Model instance @@ -38,6 +38,37 @@ def get(cls, params): return model def save( self, filename ): + """ + Save the full agent population state to disk in HDF5 format. + + This method stores the agent population, including the initial population + distribution across patches, age structure, and natural mortality profile, + in a specified HDF5 file. The data stored includes: + + - `initial_populations`: The initial population count for each patch in the model. + - `age_distribution`: The distribution of agents across age groups at the time of saving. + - `cumulative_deaths`: The cumulative number of deaths recorded in the simulation up to this point. + + Before saving, the method checks that the `age_distribution` data has been initialized. + If the `age_distribution` is not properly set, a `ValueError` is raised. + + Args: + filename (str): The path to the HDF5 file where the population data should be saved. + + Raises: + ValueError: If the `age_distribution` is not initialized before attempting to save. + + Notes: + - The method utilizes an external `age_data_manager` to retrieve the current age distribution. + - The `population.save` function is responsible for handling the actual file-saving process, + including packaging the data into an HDF5 file format. + - This method assumes that the agent population and the other components (age distribution, + mortality data) are properly initialized before being saved. + + Example: + # Save the population data to 'population_data.h5' + model.save('population_data.h5') + """ if age_init.age_data_manager.get_data() is None: raise ValueError( f"age_distribution uninitialized while saving" ) self.population.save( filename=filename, diff --git a/src/idmlaser_cholera/taichi/__init__.py b/src/idmlaser_cholera/taichi/__init__.py deleted file mode 100644 index 481cecf..0000000 --- a/src/idmlaser_cholera/taichi/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Expose the Taichi based Population class to the package level.""" - -from .population import Population - -__all__ = ["Population"] diff --git a/src/idmlaser_cholera/taichi/population.py b/src/idmlaser_cholera/taichi/population.py deleted file mode 100644 index 2777779..0000000 --- a/src/idmlaser_cholera/taichi/population.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Taichi array-based population class for agent-based patch models.""" - -import taichi as ti - - -class Population: - """Taichi array-based Agent Based Population Class""" - - def __init__(self, capacity, **kwargs): - """Initialize a Population object.""" - - self._count = 0 - self._capacity = capacity - for key, value in kwargs.items(): - setattr(self, key, value) - - return - - def add_property(self, name, dtype=ti.types.i32, default=0): - """Add a property to the class""" - # initialize the property to a Taichi field with of size self._count, dtype, and default value - setattr(self, name, ti.ndarray(dtype, self._capacity)) - - return - - @property - def count(self): - return self._count - - @property - def capacity(self): - return self._capacity - - def add(self, count: int): - """Add agents to the population""" - assert self._count + count <= self._capacity, f"Population exceeds capacity ({self._count=}, {count=}, {self._capacity=})" - i = self._count - self._count += int(count) - j = self._count - - return i, j - - def __len__(self): - return self._count diff --git a/src/idmlaser_cholera/utils.py b/src/idmlaser_cholera/utils.py index 30dc497..bb8fda5 100644 --- a/src/idmlaser_cholera/utils.py +++ b/src/idmlaser_cholera/utils.py @@ -23,384 +23,9 @@ pdf_filename = 'combined_plots.pdf' # should probably be in manifest? -def pop_from_cbr_and_mortality( - initial: Union[int, np.integer], - cbr: Union[Iterable[Union[int, float, np.number]], Union[int, float, np.number]], - mortality: Union[Iterable[Union[int, float, np.number]], Union[int, float, np.number]], - nyears: Union[int, np.integer], -) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """Calculate births, deaths, and total population for each year.""" - births = np.zeros(nyears, dtype=np.uint32) - deaths = np.zeros(nyears, dtype=np.uint32) - population = np.zeros(nyears + 1, dtype=np.uint32) - - if not isinstance(cbr, Iterable): - cbr = np.full(nyears, cbr) - if not isinstance(mortality, Iterable): - mortality = np.full(nyears, mortality) - - population[0] = initial - - for year, cbr_i, mortality_i in zip(range(nyears), cbr, mortality): - current_population = population[year] - births[year] = np.random.poisson(cbr_i * current_population / 1000) - deaths[year] = np.random.poisson(mortality_i * current_population / 1000) - population[year + 1] = population[year] + births[year] - deaths[year] - - return births, deaths, population - - -def daily_births_deaths_from_annual(annual_births, annual_deaths) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """Calculate daily births and deaths from annual values.""" - assert len(annual_births) == len(annual_deaths), "Lengths of arrays must be equal" - nyears = len(annual_births) - daily_births = np.zeros(nyears * 365, dtype=np.uint32) - daily_deaths = np.zeros(nyears * 365, dtype=np.uint32) - - for day in range(nyears * 365): - year = day // 365 - doy = (day % 365) + 1 - daily_births[day] = (annual_births[year] * doy // 365) - (annual_births[year] * (doy - 1) // 365) - daily_deaths[day] = (annual_deaths[year] * doy // 365) - (annual_deaths[year] * (doy - 1) // 365) - - return daily_births, daily_deaths - - -class NumpyJSONEncoder(JSONEncoder): - """Custom JSON encoder for NumPy arrays.""" - - def default(self, obj): - if isinstance(obj, np.integer): - return int(obj) - if isinstance(obj, np.floating): - return float(obj) - if isinstance(obj, np.ndarray): - return obj.tolist() - if isinstance(obj, Path): - return str(obj) - return JSONEncoder.default(self, obj) - - -# Derived from table 1 of "National Vital Statistics Reports Volume 54, Number 14 United States Life Tables, 2003" (see README.md) -cumulative_deaths = [ - 0, - 687, - 733, - 767, - 792, - 811, - 829, - 844, - 859, - 872, - 884, - 895, - 906, - 922, - 945, - 978, - 1024, - 1081, - 1149, - 1225, - 1307, - 1395, - 1489, - 1587, - 1685, - 1781, - 1876, - 1968, - 2060, - 2153, - 2248, - 2346, - 2449, - 2556, - 2669, - 2790, - 2920, - 3060, - 3212, - 3377, - 3558, - 3755, - 3967, - 4197, - 4445, - 4715, - 5007, - 5322, - 5662, - 6026, - 6416, - 6833, - 7281, - 7759, - 8272, - 8819, - 9405, - 10032, - 10707, - 11435, - 12226, - 13089, - 14030, - 15051, - 16146, - 17312, - 18552, - 19877, - 21295, - 22816, - 24445, - 26179, - 28018, - 29972, - 32058, - 34283, - 36638, - 39108, - 41696, - 44411, - 47257, - 50228, - 53306, - 56474, - 59717, - 63019, - 66336, - 69636, - 72887, - 76054, - 79102, - 81998, - 84711, - 87212, - 89481, - 91501, - 93266, - 94775, - 96036, - 97065, - 97882, - 100000, -] - -__cdnp = np.array(cumulative_deaths) - - -def predicted_year_of_death(age_years, max_year=100): - """ - Calculates the predicted year of death based on the given age in years. - - Parameters: - - age_years (int): The age of the individual in years. - - max_year (int): The maximum year to consider for calculating the predicted year of death. Default is 100. - - Returns: - - yod (int): The predicted year of death. - - Example: - >>> predicted_year_of_death(40, max_year=80) - 62 - """ - - # e.g., max_year == 10, 884 deaths are recorded in the first 10 years - total_deaths = __cdnp[max_year + 1] - # account for current age, i.e., agent is already 4 years old, so 792 deaths have already occurred - already_deceased = __cdnp[age_years] - # this agent will be one of the deaths in (already_deceased, total_deaths] == [already_deceased+1, total_deaths+1) - draw = np.random.randint(already_deceased + 1, total_deaths + 1) - # find the year of death, e.g., draw == 733, searchsorted("left") will return 2, so the year of death is 1 - yod = np.searchsorted(__cdnp, draw, side="left") - 1 - - return yod - - -def predicted_day_of_death(age_days, max_year=100): - """ - Calculates the predicted day of death based on the given age in days and the maximum year of death. - - Parameters: - - age_days (int): The age in days. - - max_year (int): The maximum year of death. Defaults to 100. - - Returns: - - dod (int): The predicted day of death. - - The function first calculates the predicted year of death based on the given age in days and the maximum year of death. - Then, it randomly selects a day within the year of death. - The age/date of death has to be greater than today's age. - Finally, it calculates and returns the predicted day of death. - - Note: This function assumes that there are 365 days in a year. - """ - - yod = predicted_year_of_death(age_days // 365, max_year) - - # if the death age year is not the current age year pick any day that year - if age_days // 365 < yod: - # the agent will die sometime in the year of death, so we randomly select a day - doy = np.random.randint(365) - else: - # the agent will die on or before next birthday - age_doy = age_days % 365 # 0 ... 364 - if age_doy < 364: - # there is time before the next birthday, pick a day at random - doy = np.random.randint(age_doy + 1, 365) - else: - # the agent's birthday is tomorrow; bummer of a birthday present - yod += 1 - doy = 0 - - dod = yod * 365 + doy - - return dod - - -class PropertySet: - def __init__(self, *bags): - for bag in bags: - assert isinstance(bag, (type(self), dict)) - for key, value in (bag.__dict__ if isinstance(bag, type(self)) else bag).items(): - setattr(self, key, value) - - def __add__(self, other): - return PropertySet(self, other) - - def __iadd__(self, other): - assert isinstance(other, (type(self), dict)) - for key, value in (other.__dict__ if isinstance(other, type(self)) else other).items(): - setattr(self, key, value) - return self - - def __len__(self): - return len(self.__dict__) - - def __str__(self) -> str: - return str(self.__dict__) - - def __repr__(self) -> str: - return f"PropertySet({self.__dict__!s})" - - -class PriorityQueuePy: - """ - A priority queue implemented using NumPy arrays and sped-up with Numba. - Using the algorithm from the Python heapq module. - __init__ with an existing array of priority values - __push__ with an index into priority values - __pop__ returns the index of the highest priority value and its value - """ - - # https://github.com/python/cpython/blob/5592399313c963c110280a7c98de974889e1d353/Modules/_heapqmodule.c - # https://github.com/python/cpython/blob/5592399313c963c110280a7c98de974889e1d353/Lib/heapq.py - - def __init__(self, capacity: int, values: np.ndarray): - self.indices = np.zeros(capacity, dtype=np.uint32) - self.values = values - self.size = 0 - - return - - def push(self, index) -> None: - if self.size >= len(self.indices): - raise IndexError("Priority queue is full") - self.indices[self.size] = index - _siftdown(self.indices, self.values, 0, self.size) - self.size += 1 - return - - def peeki(self) -> np.uint32: - if self.size == 0: - raise IndexError("Priority queue is empty") - return self.indices[0] - - def peekv(self) -> Any: - if self.size == 0: - raise IndexError("Priority queue is empty") - return self.values[self.indices[0]] - - def peekiv(self) -> Tuple[np.uint32, Any]: - if self.size == 0: - raise IndexError("Priority queue is empty") - return (self.indices[0], self.values[self.indices[0]]) - - def popi(self) -> np.uint32: - index = self.peeki() - self.pop() - - return index - - def popv(self) -> Any: - value = self.peekv() - self.pop() - - return value - - def popiv(self) -> Tuple[np.uint32, Any]: - ivtuple = self.peekiv() - self.pop() - - return ivtuple - - def pop(self) -> None: - if self.size == 0: - raise IndexError("Priority queue is empty") - self.size -= 1 - self.indices[0] = self.indices[self.size] - _siftup(self.indices, self.values, 0, self.size) - return - - def __len__(self): - return self.size - - -@nb.njit((nb.uint32[:], nb.int32[:], nb.uint32, nb.uint32), nogil=True) -def _siftdown(indices, values, startpos, pos): - inewitem = indices[pos] - vnewitem = values[inewitem] - # Follow the path to the root, moving parents down until finding a place newitem fits. - while pos > startpos: - parentpos = (pos - 1) >> 1 - iparent = indices[parentpos] - vparent = values[iparent] - if vnewitem < vparent: - indices[pos] = iparent - pos = parentpos - continue - break - indices[pos] = inewitem - - return - - -@nb.njit((nb.uint32[:], nb.int32[:], nb.uint32, nb.uint32), nogil=True) -def _siftup(indices, values, pos, size): - endpos = size - startpos = pos - inewitem = indices[pos] - # Bubble up the smaller child until hitting a leaf. - childpos = 2 * pos + 1 # leftmost child position - while childpos < endpos: - # Set childpos to index of smaller child. - rightpos = childpos + 1 - if rightpos < endpos and not values[indices[childpos]] < values[indices[rightpos]]: - childpos = rightpos - # Move the smaller child up. - indices[pos] = indices[childpos] - pos = childpos - childpos = 2 * pos + 1 - # The leaf at pos is empty now. Put newitem there, and bubble it up - # to its final resting place (by sifting its parents down). - indices[pos] = inewitem - _siftdown(indices, values, startpos, pos) - return - def viz_2D( model, data, label, x_label, y_label ): - """ - Visualize a 2D array as a heatmap using a specified color map, with options for custom labels and an automatically scaled color bar. + """Visualize a 2D array as a heatmap using a specified color map, with + options for custom labels and an automatically scaled color bar. Parameters ---------- @@ -453,8 +78,8 @@ def viz_2D( model, data, label, x_label, y_label ): plt.show() def load_csv_maybe_header(file_path): - """ - Load a CSV file as a NumPy array, automatically detecting and skipping a header row if present. + """Load a CSV file as a NumPy array, automatically detecting and skipping a + header row if present. Parameters ---------- @@ -496,7 +121,6 @@ def load_csv_maybe_header(file_path): ------ ValueError If any item in the first line cannot be converted to a float when performing header detection checks. - """ # Open file to check the first line with open(file_path, 'r') as f: @@ -515,8 +139,8 @@ def load_csv_maybe_header(file_path): return data def viz_pop(model, url="https://packages.idmod.org:443/artifactory/idm-data/LASER/ssa_shapes.zip", extract_to_dir="ssa_shapes"): - """ - Download, unzip, and plot population data on a map of Sub-Saharan Africa. + """Download, unzip, and plot population data on a map of Sub-Saharan + Africa. Parameters: url (str): URL to download the zip file from. diff --git a/tox.ini b/tox.ini index 7d6a38e..411f3c4 100644 --- a/tox.ini +++ b/tox.ini @@ -14,21 +14,14 @@ envlist = clean, check, docs, - {py38,py39,py310,py311,py312,pypy38,pypy39,pypy310}, + {py310}, report ignore_basepython_conflict = true [testenv] basepython = - pypy38: {env:TOXPYTHON:pypy3.8} - pypy39: {env:TOXPYTHON:pypy3.9} - pypy310: {env:TOXPYTHON:pypy3.10} - py38: {env:TOXPYTHON:python3.8} - py39: {env:TOXPYTHON:python3.9} py310: {env:TOXPYTHON:python3.10} - py311: {env:TOXPYTHON:python3.11} - py312: {env:TOXPYTHON:python3.12} - {bootstrap,clean,check,report,docs,codecov}: {env:TOXPYTHON:python3} + {bootstrap,clean,check,report,docs,codecov}: {env:TOXPYTHON:python3.10} setenv = PYTHONPATH={toxinidir}/tests PYTHONUNBUFFERED=yes