diff --git a/.github/workflows/test_examples.yml b/.github/workflows/test_examples.yml index 67d49fd4c..62ae5a09a 100644 --- a/.github/workflows/test_examples.yml +++ b/.github/workflows/test_examples.yml @@ -67,6 +67,8 @@ jobs: [ "$file" = "MHD_modelsipynb.py" ] || [ "$file" = "density_grid_samplingipynb.py" ] || [ "$file" = "lensing_crv4ipynb.py" ] || + [ "$file" = "interrupt_candidateVectoripynb.py" ] || + [ "$file" = "interrupt_sourceipynb.py" ] || [ "$file" = "lensing_mapsv4ipynb.py" ]; then echo "skip file $file" else diff --git a/doc/index.rst b/doc/index.rst index f398dca5b..b989cd1c5 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -31,6 +31,7 @@ Contents pages/acceleration.rst pages/extending_crpropa.rst pages/example_notebooks/propagation_comparison/Propagation_Comparison_CK_BP.ipynb + pages/interrupting-simulations.rst pages/AdditionalResources.rst diff --git a/doc/pages/example_notebooks/interrupting_simulations/interrupt_candidateVector.ipynb b/doc/pages/example_notebooks/interrupting_simulations/interrupt_candidateVector.ipynb new file mode 100644 index 000000000..bd6f3b447 --- /dev/null +++ b/doc/pages/example_notebooks/interrupting_simulations/interrupt_candidateVector.ipynb @@ -0,0 +1,396 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# interrupting and continuing of a simulation with a candidate Vector\n", + "\n", + "In this example the simulation uses a `candidateVector`, a predefined set of candidates, as source. The test simulation is a simple 1D propagation of particles in the energy range of $10^{17} eV$ to $10^{21} eV$. The candidates are order in energy, for an easier overview of the state of the simulation. No other propagation effects are considered. \n", + "\n", + "Two sets of simulation, one full simulation and one which is interrupted and continued, are compared. The candidate ID will be used as continues number to check, that all candidates are reaching the final observer and nothing is lost. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from crpropa import * \n", + "import numpy as np \n", + "import os \n", + "from multiprocessing import cpu_count" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## simulation setup\n", + "\n", + "The candidate energies are the same for both simulation cases (with / without interrupting). " + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# create candidate vector with increasing energies\n", + "lg_E_min = 17\n", + "lg_E_max = 21\n", + "lgE = np.random.uniform(lg_E_min, lg_E_max, 100_000)\n", + "lgE.sort()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "def init_candidate_vector():\n", + " \"\"\" initilize the candidate vector. Has to be done before every simulation. \"\"\"\n", + " cv = CandidateVector()\n", + " for i, _e in enumerate(lgE): \n", + " c = Candidate(i, 10**_e * eV, Vector3d(1 * Gpc, 0, 0), Vector3d(-1, 0, 0)) \n", + " cv.push_back(CandidateRefPtr(c))\n", + " return cv" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### full simulation (no interruption)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "crpropa::ModuleList: Number of Threads: 12\n", + "Run ModuleList\n", + " Started Thu Sep 5 12:28:34 2024 : [\u001b[1;32m Finished \u001b[0m] 100% Needed: 00:00:48 - Finished at Thu Sep 5 12:29:22 2024\n", + "\r" + ] + } + ], + "source": [ + "# general setup \n", + "def get_sim(filename):\n", + " \"\"\" returns a modulelist to ensure running the same modules in each case \"\"\"\n", + " \n", + " sim = ModuleList() \n", + " sim.add(SimplePropagation(1 * kpc, 10 * kpc)) # choose small steps to ensure long simulations \n", + "\n", + " obs = Observer() \n", + " obs.add(Observer1D())\n", + " out = TextOutput(filename) \n", + " obs.onDetection(out) \n", + " sim.add(obs)\n", + "\n", + " sim.setShowProgress(True)\n", + " return sim, out\n", + "\n", + "os.makedirs(\"cand_vector\", exist_ok=True)\n", + "sim, out = get_sim(\"cand_vector/full.txt\") \n", + "cv = init_candidate_vector()\n", + "sim.run(cv)\n", + "out.close()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### simulation with interruption\n", + "\n", + "This simulation is interrupted after some time. This process has to be done manually." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "crpropa::ModuleList: Number of Threads: 12\n", + "Run ModuleList\n", + " Started Thu Sep 5 12:29:26 2024 : [====> ] 41% Finish in: 00:00:28 \r" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "crpropa::ModuleList: Signal 2 (SIGINT/SIGTERM) received\n", + "############################################################################\n", + "#\tInterrupted CRPropa simulation \n", + "# in total 58477 Candidates have not been started.\n", + "# the indicies of the vector haven been written to to output file. \n", + "############################################################################\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[5], line 8\u001b[0m\n\u001b[1;32m 6\u001b[0m sim\u001b[38;5;241m.\u001b[39msetShowProgress(\u001b[38;5;28;01mTrue\u001b[39;00m) \n\u001b[1;32m 7\u001b[0m cv \u001b[38;5;241m=\u001b[39m init_candidate_vector()\n\u001b[0;32m----> 8\u001b[0m sim\u001b[38;5;241m.\u001b[39mrun(cv)\n", + "\u001b[0;31mKeyboardInterrupt\u001b[0m: " + ] + } + ], + "source": [ + "sim, out = get_sim(\"cand_vector/interrupted.txt\")\n", + "\n", + "out_interrupt = TextOutput(\"cand_vector/on_interruption.txt\")\n", + "sim.setInterruptAction(out_interrupt)\n", + "\n", + "sim.setShowProgress(True) \n", + "cv = init_candidate_vector()\n", + "sim.run(cv)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "out.close() # closing the output file to avoid data loss " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### reloading and reruning the simulation" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "number of indices read from file: 58477\n" + ] + } + ], + "source": [ + "# load candidates from interrupted simulation \n", + "\n", + "file = \"cand_vector/on_interruption.txt\"\n", + "pc = ParticleCollector() \n", + "pc.load(file)\n", + "\n", + "# expected size of particles should be equal to the number of cores \n", + "assert pc.size() <= cpu_count() , f\"the number of loaded particles ({pc.size()}) must be lower or equal to the number of cores ({cpu_count()})\"\n", + "\n", + "# load indicies of not started candidates\n", + "with open(file, \"r\") as f: \n", + " line = f.readlines()[-1]\n", + " indices = np.array(line.strip(\"\\n\").split(\"\\t\")[1:-1], dtype= int)\n", + "\n", + "print(\"number of indices read from file:\", len(indices))" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "# create a new candidate vector with the missing particles \n", + "cv_new = pc.getContainer()\n", + "cv = init_candidate_vector()\n", + "for i, c in enumerate(cv): \n", + " if i in indices: \n", + " # accept candidates which were not started\n", + " cv_new.push_back(c)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "crpropa::ModuleList: Number of Threads: 12\n", + "Run ModuleList\n", + " Started Thu Sep 5 12:30:00 2024 : [\u001b[1;32m Finished \u001b[0m] 100% Needed: 00:00:30 - Finished at Thu Sep 5 12:30:30 2024\n", + "\r" + ] + } + ], + "source": [ + "# run the simulation with the missing candidates \n", + "sim, out = get_sim(\"cand_vector/continued.txt\")\n", + "sim.run(cv_new)\n", + "out.close()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## read data from different simulations\n", + "\n", + "The data from both simulation sets are loaded and compared" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd \n", + "import matplotlib.pyplot as plt \n", + "\n", + "def read_crp(filename): \n", + " \"\"\" read a crpropa output file \"\"\"\n", + " \n", + " with open(filename) as f: \n", + " names = f.readline().strip(\"\\n\").split(\"\\t\")[1:]\n", + " \n", + " return pd.read_csv(filename, names = names, delimiter = \"\\t\", comment=\"#\")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "the splited simulation has the length of 41518 and 58482\n" + ] + } + ], + "source": [ + "df_full = read_crp(\"cand_vector/full.txt\")\n", + "df_first_half = read_crp(\"cand_vector/interrupted.txt\")\n", + "df_second_half = read_crp(\"cand_vector/continued.txt\")\n", + "print(f\"the splited simulation has the length of {len(df_first_half)} and {len(df_second_half)}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### check ID number of particles \n", + "\n", + "Checking that both simulation outputs contain all particles. The particle ID is the number of the particles in the orignial candidate vector." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "id_list_full = list(df_full.ID)\n", + "id_list_full.sort() \n", + "assert np.all(id_list_full == np.arange(100_000))" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "id_list_continued = list(df_first_half.ID) + list(df_second_half.ID)\n", + "id_list_continued.sort() \n", + "assert np.all(id_list_continued == np.arange(100_000))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### energy distribution of the arriving particles" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "e_bins = np.logspace(-1, 3, 101) # in EeV as default output unit \n", + "dE = np.diff(e_bins) \n", + "\n", + "dNdE_full = np.histogram(df_full.E, bins = e_bins)[0] \n", + "dNdE_first = np.histogram(df_first_half.E, e_bins)[0] \n", + "dNdE_second = np.histogram(df_second_half.E, e_bins)[0]\n", + "assert np.all(dNdE_full == (dNdE_first + dNdE_second))\n", + "\n", + "plt.figure(dpi = 150) \n", + "plt.stairs(dNdE_full, label = \"full simulation\")\n", + "plt.stairs(dNdE_first, label = \"first half\", ls = \"--\")\n", + "plt.stairs(dNdE_second, label = \"second half\", ls = \"--\")\n", + "plt.loglog()\n", + "plt.legend()\n", + "plt.ylabel(\"# particles\")\n", + "plt.xlabel(\"Energy [GeV]\")\n", + "plt.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "CRP_Interrupt", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/doc/pages/example_notebooks/interrupting_simulations/interrupt_source.ipynb b/doc/pages/example_notebooks/interrupting_simulations/interrupt_source.ipynb new file mode 100644 index 000000000..079967089 --- /dev/null +++ b/doc/pages/example_notebooks/interrupting_simulations/interrupt_source.ipynb @@ -0,0 +1,355 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# interrupting and continuing of a simulation with a source\n", + "\n", + "In this example the simulation uses a `sourceInterface` and allows for the production of secondaries.\n", + "\n", + "The source emmits a limited number of photons ($n = 100$) with an energy $E = 100 \\, \\mathrm{TeV}$ at a distance of $D = 50 \\, \\mathrm{Mpc}$. The photons are propagated in 1D to the observer, taking interactions with the CMB and EBL into account. \n", + "\n", + "The interrupted simulation contains three different parts: (1) the candidates arriving at the observer before the simulation is interrupted, (2) the candidates which are in the simulation at the point of interruption and (3) the particles which have not been started before the interruption. In the case of secondaries the number of particles which are contained in the simulation is much larger than the number of cores. \n", + "\n", + "In the end, the SED of the arriving photons is compared. Small differences between the full and the interrupted simulation are expected due to the Monte-Carlo nature of the interactions. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from crpropa import * \n", + "import pandas as pd \n", + "import numpy as np \n", + "import matplotlib.pyplot as plt \n", + "import os " + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "def read_crp(file): \n", + " with open(file, \"r\") as f: \n", + " names = f.readline().strip(\"\\n\").split(\"\\t\")[1:]\n", + " \n", + " return pd.read_csv(file, delimiter=\"\\t\", comment =\"#\", names = names)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## full simulation" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "crpropa::ModuleList: Number of Threads: 12\n", + "Run ModuleList\n", + " Started Thu Sep 5 14:07:59 2024 : [\u001b[1;32m Finished \u001b[0m] 100% Needed: 00:01:39 - Finished at Thu Sep 5 14:09:38 2024\n", + "\r" + ] + } + ], + "source": [ + "n_sim = int(100)\n", + "\n", + "def get_sim(file):\n", + " sim = ModuleList() \n", + " sim.add(SimplePropagation())\n", + "\n", + " # add EM interactions \n", + " photon_fields = [CMB(), IRB_Gilmore12()]\n", + " for field in photon_fields:\n", + " sim.add(EMInverseComptonScattering(field, True)) # allow photons\n", + " sim.add(EMPairProduction(field, True)) # allow electrons \n", + " sim.add(EMDoublePairProduction(field, True))\n", + " sim.add(EMTripletPairProduction(field, True))\n", + "\n", + " sim.add(MinimumEnergy(10 * GeV))\n", + "\n", + " sub_dir = \"cascade/\"\n", + " os.makedirs(sub_dir, exist_ok=True)\n", + " out = TextOutput(f\"{sub_dir}/{file}\")\n", + " out.setEnergyScale(TeV)\n", + " obs = Observer() \n", + " obs.add(Observer1D())\n", + " obs.add(ObserverInactiveVeto())\n", + " obs.onDetection(out)\n", + " sim.add(obs) \n", + "\n", + " source = Source() \n", + " source.add(SourceParticleType(22))\n", + " source.add(SourcePosition(Vector3d(50 * Mpc, 0, 0)))\n", + " source.add(SourceEnergy(100 * TeV))\n", + "\n", + " sim.setShowProgress(True)\n", + " return source, sim, out\n", + "\n", + "source, sim, out = get_sim(\"full.txt\")\n", + "sim.run(source, n_sim)\n", + "out.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# load data \n", + "df_full = read_crp(\"cascade/full.txt\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## interrupted simulation" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "crpropa::ModuleList: Number of Threads: 12\n", + "Run ModuleList\n", + " Started Thu Sep 5 14:09:39 2024 : [===> ] 30% Finish in: 00:01:07 \r" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "crpropa::ModuleList: Signal 2 (SIGINT/SIGTERM) received\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Started Thu Sep 5 14:09:39 2024 : [===> ] 31% Finish in: 00:01:11 \r" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "############################################################################\n", + "# Interrupted CRPropa simulation \n", + "# Number of not started candidates from source: 69\n", + "############################################################################\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[5], line 6\u001b[0m\n\u001b[1;32m 3\u001b[0m interrupt_out \u001b[38;5;241m=\u001b[39m TextOutput(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mcascade/on_interrupt.txt\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 4\u001b[0m sim\u001b[38;5;241m.\u001b[39msetInterruptAction(interrupt_out)\n\u001b[0;32m----> 6\u001b[0m sim\u001b[38;5;241m.\u001b[39mrun(source, n_sim)\n", + "\u001b[0;31mKeyboardInterrupt\u001b[0m: " + ] + } + ], + "source": [ + "source, sim, out = get_sim(\"interrupt_1.txt\")\n", + "\n", + "interrupt_out = TextOutput(f\"cascade/on_interrupt.txt\")\n", + "sim.setInterruptAction(interrupt_out)\n", + "\n", + "sim.run(source, n_sim)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# close datafile to avoid data loss\n", + "out.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "df_1 = read_crp(f\"cascade/interrupt_1.txt\") # at state of interruption" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "crpropa::ModuleList: Number of Threads: 12\n", + "Run ModuleList\n", + " Started Thu Sep 5 14:10:30 2024 : [\u001b[1;32m Finished \u001b[0m] 100% Needed: 00:01:08 - Finished at Thu Sep 5 14:11:38 2024\n", + "\r" + ] + } + ], + "source": [ + "n_missing = 69 # taken from output -> will be different on each try\n", + "\n", + "source, sim, out = get_sim(\"interrupt_2.txt\")\n", + "sim.run(source, n_missing) # use modulelist and source as previously defined\n", + "out.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "df_2 = read_crp(f\"cascade/interrupt_2.txt\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "number of loaded particles: 5690\n", + "crpropa::ModuleList: Number of Threads: 12\n", + "Run ModuleList\n", + " Started Thu Sep 5 14:11:38 2024 : [\u001b[1;32m Finished \u001b[0m] 100% Needed: 00:00:00 - Finished at Thu Sep 5 14:11:38 2024\n", + "\r" + ] + } + ], + "source": [ + "# close outputfile before reading \n", + "interrupt_out.close()\n", + "\n", + "pc = ParticleCollector()\n", + "pc.load(\"cascade/on_interrupt.txt\")\n", + "\n", + "print(\"number of loaded particles:\", pc.size())\n", + "\n", + "# run simulation with missing particles\n", + "source, sim, out = get_sim(\"interrupt_3.txt\")\n", + "sim.run(pc.getContainer())\n", + "out.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " df_3 = read_crp(\"cascade/interrupt_3.txt\")\n", + "except: \n", + " # it can happen that all particles from the interruption time are cascaded to lower energies than the minimum energy\n", + " # in this case the dataset will be empty\n", + " print(\"no data from interrupt_3.txt\")\n", + " df_3 = pd.DataFrame({\"E\":[], \"ID\":[]})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## show spectrum at earth" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "e_bins = np.logspace(-2, 2, 51)\n", + "dE = np.diff(e_bins)\n", + "e_mid = 0.5 * (e_bins[1:] + e_bins[:-1])\n", + "\n", + "get_dnde = lambda df: np.histogram(df[df.ID == 22].E, bins = e_bins)[0]/dE \n", + "\n", + "dNdE_full = get_dnde(df_full)\n", + "dNdE_1 = get_dnde(df_1)\n", + "dNdE_2 = get_dnde(df_2)\n", + "dNdE_3 = get_dnde(df_3) \n", + "\n", + "plt.figure(dpi = 150)\n", + "plt.scatter(e_mid, e_mid**2 * dNdE_full, label = \"full sim\", color = \"tab:green\") \n", + "plt.plot(e_mid, e_mid**2 * dNdE_1, label = \"finished particles\", marker =\"s\", ls = \"\", fillstyle=\"none\")\n", + "plt.plot(e_mid, e_mid**2 * dNdE_2, label = \"not startet particles\", marker =\"x\", ls = \"\", )\n", + "plt.plot(e_mid, e_mid**2 * dNdE_3, label = \"on the fly particles\", fillstyle=\"none\", ls = \"\",marker =\"d\", color = \"tab:red\")\n", + "\n", + "plt.plot(e_mid, e_mid**2 * (dNdE_1 + dNdE_2 + dNdE_3), label =\"total (interrupted)\", color =\"k\")\n", + "\n", + "\n", + "plt.loglog() \n", + "plt.xlabel(r\"$E_\\gamma$ [TeV]\")\n", + "plt.ylabel(r\"$E^2 \\, dN/dE$ [a.u.]\")\n", + "plt.legend(loc = \"lower center\", ncol = 3, bbox_to_anchor=(0.5, 1.))\n", + "plt.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "CRP_Interrupt", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/doc/pages/interrupting-simulations.rst b/doc/pages/interrupting-simulations.rst new file mode 100644 index 000000000..88b62c4cb --- /dev/null +++ b/doc/pages/interrupting-simulations.rst @@ -0,0 +1,22 @@ +Interrupting simulations on runtime +------------------------------------------------ + +CRPropa simulations can be interrupted on runtime with the `SIGTERM` or `SIGINT` signals. +If the user defines an output for the interruption (called `InterruptAction`) all candidates which are currently in the simulation will be passed to this output. +In the error stream the user will see a message denoting the number of candidates which have not been started yet. +If the simulation was run with a `candidateVector` as source, the indices of the candidates which have not been started yet will be printed or written to the file. +For a simulation with a source interface, a restart with the missing number of candidates will be sufficient to continue the simulation. + +.. toctree:: + :caption: Using a candidateVector as source + :maxdepth: 1 + + example_notebooks/interrupting_simulations/interrupt_candidateVector.ipynb + +.. toctree:: + :caption: Using a source interface + :maxdepth: 1 + + example_notebooks/interrupting_simulations/interrupt_source.ipynb + + diff --git a/include/crpropa/ModuleList.h b/include/crpropa/ModuleList.h index 38fdf7658..cf44dfbbb 100644 --- a/include/crpropa/ModuleList.h +++ b/include/crpropa/ModuleList.h @@ -4,6 +4,7 @@ #include "crpropa/Candidate.h" #include "crpropa/Module.h" #include "crpropa/Source.h" +#include "crpropa/module/Output.h" #include #include @@ -47,9 +48,15 @@ class ModuleList: public Module { iterator end(); const_iterator end() const; + void setInterruptAction(Output* action); + void dumpCandidate(Candidate* cand) const; + private: module_list_t modules; bool showProgress; + Output* interruptAction; + bool haveInterruptAction = false; + std::vector notFinished; // list with not finished numbers of candidates }; /** diff --git a/include/crpropa/module/Output.h b/include/crpropa/module/Output.h index 76b28fc6e..5341a7bfe 100644 --- a/include/crpropa/module/Output.h +++ b/include/crpropa/module/Output.h @@ -52,16 +52,18 @@ namespace crpropa { They can be easily customised by enabling/disabling specific columns. */ class Output: public Module { -protected: - double lengthScale, energyScale; - std::bitset<64> fields; - +public: struct Property { std::string name; std::string comment; Variant defaultValue; }; + +protected: + double lengthScale, energyScale; + std::bitset<64> fields; + std::vector properties; bool oneDimensional; @@ -163,6 +165,18 @@ class Output: public Module { size_t size() const; void process(Candidate *) const; + + /** + * write the indices of not started candidates into the output file. + * Used for interrupting the simulation + * @param indices list of not started indices + */ + virtual void dumpIndexList(std::vector indices) { + std::cout << "indices:\t"; + for (int i = 0; i < indices.size(); i++) + std::cout << indices[i] << ", "; + std::cout << "\n"; + }; }; /** @}*/ diff --git a/include/crpropa/module/TextOutput.h b/include/crpropa/module/TextOutput.h index 817fc1c0f..0c21caf43 100644 --- a/include/crpropa/module/TextOutput.h +++ b/include/crpropa/module/TextOutput.h @@ -71,6 +71,8 @@ class TextOutput: public Output { */ static void load(const std::string &filename, ParticleCollector *collector); std::string getDescription() const; + + void dumpIndexList(std::vector indicies); }; /** @}*/ diff --git a/python/2_headers.i b/python/2_headers.i index d34665838..2db4540bb 100644 --- a/python/2_headers.i +++ b/python/2_headers.i @@ -279,6 +279,11 @@ %feature("director") crpropa::AbstractCondition; %include "crpropa/Module.h" +%template(OutputRefPtr) crpropa::ref_ptr; +%feature("director") crpropa::Output; +%ignore crpropa::Output::dumpIndexList(std::vector); +%include "crpropa/module/Output.h" + %implicitconv crpropa::ref_ptr; %template(MagneticFieldRefPtr) crpropa::ref_ptr; %feature("director") crpropa::MagneticField; @@ -394,8 +399,6 @@ } } - -%include "crpropa/module/Output.h" %include "crpropa/module/DiffusionSDE.h" %include "crpropa/module/TextOutput.h" %include "crpropa/module/HDF5Output.h" diff --git a/src/ModuleList.cpp.in b/src/ModuleList.cpp.in index c28cfd99c..3d96f6416 100644 --- a/src/ModuleList.cpp.in +++ b/src/ModuleList.cpp.in @@ -8,6 +8,7 @@ #include #include +#include #ifndef sighandler_t typedef void (*sighandler_t)(int); #endif @@ -87,6 +88,10 @@ void ModuleList::run(Candidate* candidate, bool recursive, bool secondariesFirst run(candidate->secondaries[i], recursive, secondariesFirst); } } + + // dump candidae and secondaries if interrupted. + if (candidate->isActive() && (g_cancel_signal_flag != 0)) + dumpCandidate(candidate); } void ModuleList::run(ref_ptr candidate, bool recursive, bool secondariesFirst) { @@ -114,8 +119,11 @@ void ModuleList::run(const candidate_vector_t *candidates, bool recursive, bool #pragma omp parallel for schedule(OMP_SCHEDULE) for (size_t i = 0; i < count; i++) { - if (g_cancel_signal_flag != 0) + if (g_cancel_signal_flag != 0) { +#pragma omp critical(interrupt_write) + notFinished.push_back(i); continue; + } try { run(candidates->operator[](i), recursive); @@ -132,8 +140,18 @@ void ModuleList::run(const candidate_vector_t *candidates, bool recursive, bool ::signal(SIGINT, old_sigint_handler); ::signal(SIGTERM, old_sigterm_handler); // Propagate signal to old handler. - if (g_cancel_signal_flag > 0) + if (g_cancel_signal_flag > 0) { raise(g_cancel_signal_flag); + std::cerr << "############################################################################\n"; + std::cerr << "# Interrupted CRPropa simulation \n"; + std::cerr << "# A total of " << notFinished.size() << " candidates have not been started.\n"; + std::cerr << "# the indicies of the vector haven been written to to output file. \n"; + std::cerr << "############################################################################\n"; + + // dump list to output file + std::sort(notFinished.begin(), notFinished.end()); + interruptAction->dumpIndexList(notFinished); + } } void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool secondariesFirst) { @@ -156,8 +174,11 @@ void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool #pragma omp parallel for schedule(OMP_SCHEDULE) for (size_t i = 0; i < count; i++) { - if (g_cancel_signal_flag !=0) + if (g_cancel_signal_flag !=0) { +#pragma omp critical(interrupt_write) + notFinished.push_back(i); continue; + } ref_ptr candidate; @@ -189,8 +210,13 @@ void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool ::signal(SIGINT, old_signal_handler); ::signal(SIGTERM, old_sigterm_handler); // Propagate signal to old handler. - if (g_cancel_signal_flag > 0) + if (g_cancel_signal_flag > 0) { raise(g_cancel_signal_flag); + std::cerr << "############################################################################\n"; + std::cerr << "# Interrupted CRPropa simulation \n"; + std::cerr << "# Number of not started candidates from source: " << notFinished.size() << "\n"; + std::cerr << "############################################################################\n"; + } } ModuleList::iterator ModuleList::begin() { @@ -222,6 +248,27 @@ void ModuleList::showModules() const { std::cout << getDescription(); } +void ModuleList::setInterruptAction(Output* action) { + interruptAction = action; + haveInterruptAction = true; +} + +void ModuleList::dumpCandidate(Candidate *cand) const { + if (!haveInterruptAction) + return; + + if (cand->isActive()) + interruptAction->process(cand); + else + KISS_LOG_WARNING << "ModuleList::dumpCandidate is called with a non active candidate. This should not happen for the interrupt action. Please check candidate with serieal number " + << cand->getSerialNumber() << std::endl; + + for (int i = 0; i < cand->secondaries.size(); i++) { + if (cand->secondaries[i]) + dumpCandidate(cand->secondaries[i]); + } +} + ModuleListRunner::ModuleListRunner(ModuleList *mlist) : mlist(mlist) { } diff --git a/src/module/TextOutput.cpp b/src/module/TextOutput.cpp index e979c85dc..2dda875da 100644 --- a/src/module/TextOutput.cpp +++ b/src/module/TextOutput.cpp @@ -7,6 +7,7 @@ #include "kiss/string.h" +#include #include #include #include @@ -378,4 +379,17 @@ void TextOutput::gzip() { #endif } +void TextOutput::dumpIndexList(std::vector indices) { +#pragma omp critical(FileOutput) + { + std::stringstream ss; + ss << "#" << "\t"; + for (int i = 0; i < indices.size(); i++) + ss << indices[i] << "\t"; + + const std::string cstr = ss.str(); + out-> write(cstr.c_str(), cstr.length()); + } +} + } // namespace crpropa