Skip to content

Commit

Permalink
Merge pull request #98 from SNEWS2/float_timeseries
Browse files Browse the repository at this point in the history
Float timeseries
  • Loading branch information
Storreslara authored May 10, 2024
2 parents f37c49e + 4db8f99 commit 4da6619
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 31 deletions.
1 change: 1 addition & 0 deletions snews_pt/auxiliary/test-config.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ HOP_BROKER="kafka.scimma.org"

OBSERVATION_TOPIC="kafka://${HOP_BROKER}/snews.experiments-test"
ALERT_TOPIC="kafka://${HOP_BROKER}/snews.alert-test"
PRODUCTION_TOPIC=""

FIREDRILL_OBSERVATION_TOPIC="kafka://${HOP_BROKER}/snews.experiments-firedrill"
FIREDRILL_ALERT_TOPIC="kafka://${HOP_BROKER}/snews.alert-firedrill"
Expand Down
67 changes: 43 additions & 24 deletions snews_pt/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import json
import numpy as np
from abc import ABC, abstractmethod

from datetime import datetime
try:
fromisoformat = datetime.fromisoformat
Expand Down Expand Up @@ -308,39 +307,59 @@ def is_valid(self):


class SNEWSTimingTierMessage(SNEWSMessage):
"""Message for SNEWS 2.0 timing tier."""
""" Message for SNEWS 2.0 timing tier.
`timing_series` can either be list of string or nanosecond-precision integers
representing the time after the initial neutrino time.
"""

reqfields = [ 'timing_series' ]
reqfields = [ 'timing_series', 'neutrino_time' ]
fields = SNEWSMessage.basefields + reqfields + [ 'machine_time', 'p_val', 'is_test' ]

def __init__(self, p_val=None, timing_series=None, **kwargs):
# TODO: timing series as float additions of nanoseconds to the initial neutrino time
def __init__(self, p_val=None, timing_series=None, neutrino_time=None, **kwargs):
initial_nu_time_str = clean_time_input(neutrino_time)
initial_nu_time_object = np.datetime64(initial_nu_time_str).astype('datetime64[ns]')
# first convert the timing series into relative times
times = self._convert_times(timing_series, initial_neutrino_time=initial_nu_time_object)
super().__init__(self.fields,
p_val=p_val,
timing_series=[clean_time_input(t) for t in timing_series],
timing_series=times,
neutrino_time=initial_nu_time_str,
**kwargs)

def _convert_times(self, timing_series, initial_neutrino_time):
if all([isinstance(t, str) for t in timing_series]):
# convert to numpy datetime objects
times_obj = np.array([np.datetime64(t) for t in timing_series]).astype('datetime64[ns]')
times_obj = np.sort(times_obj)
# make sure they are always ns precision
relative_times = (times_obj - initial_neutrino_time).astype('timedelta64[ns]').astype(int).tolist()
elif all([isinstance(t, (int, float)) for t in timing_series]):
# then we assume they are relative times from the first neutrino time with ns precision
relative_times = timing_series if isinstance(timing_series, list) else list(timing_series)
else:
raise ValueError(f'{self.__class__.__name__} timing_series must be a list of isoformat strings or '
f'ns-precision floats from the first neutrino time.')
return relative_times


def is_valid(self):
"""Check that parameter values are valid for this tier."""
for time in self.message_data['timing_series']:
if isinstance(time, str):
time = np.datetime64(time)
else:
raise ValueError(f'{self.__class__.__name__} timing_series must be a list of strings.')
"""Check that parameter values are valid for this tier.
timing series can either be a list of iso-convertible strings or a list of floats."""

if not self.is_test:
# time format is corrected at the base class, check if reasonable
timeobj = np.datetime64(time)
duration = (timeobj - np.datetime64(datetime.utcnow())) / np.timedelta64(1, 's')
if (duration <= -172800.0) or (duration > 0.0):
raise ValueError(f'{self.__class__.__name__} neutrino_time must be within 48 hours of now.')
if not self.is_test:
# Check timing validity
# expect to see a monotonic increase in the relative times
is_monotonic = np.all(np.diff(self.message_data['timing_series']) >= 0)
if not is_monotonic:
raise ValueError(f'{self.__class__.__name__} timing_series must be in increasing order. '
f'They represent the time after initial neutrino time')

# p_val must be a float between 0 and 1
pv = self.message_data['p_val']
if isinstance(pv, str):
pv = float(pv)
if not (0.0 <= pv <= 1.0):
raise ValueError(f'{self.__class__.__name__} p_value of the detection must be between 0 and 1.')
# p_val must be a float between 0 and 1
pv = self.message_data['p_val']
if isinstance(pv, str):
pv = float(pv)
if not (0.0 <= pv <= 1.0):
raise ValueError(f'{self.__class__.__name__} p_value of the detection must be between 0 and 1.')
return True


Expand Down
52 changes: 45 additions & 7 deletions snews_pt/test/test_timing_tier.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,64 @@
from snews_pt.messages import SNEWSMessageBuilder

def test_timing_expected():
"""Test with example of expected message type.
This test passes the neutrino times in strings, and along with an additional initial neutrino time.
"""
# Create timing tier message.
tims = SNEWSMessageBuilder(detector_name='XENONnT',
neutrino_time='2012-06-09T15:31:08.109876',
timing_series=['2012-06-09T15:31:08.109876', '2012-06-09T15:33:07.891011',
'2012-06-09T15:33:07.9910110', '2012-06-09T15:34:07.891011000',
'2012-06-09T15:35:17.0'],
machine_time='2012-06-09T15:30:00.009876',
firedrill_mode=False, is_test=True)

# # Check that message has expected structure.
assert tims.selected_tiers == ['SNEWSCoincidenceTierMessage', 'SNEWSTimingTierMessage']
assert tims.messages[1].message_data == {'_id': 'XENONnT_TimingTier_2012-06-09T15:30:00.009876000',
'schema_version': '1.3.1',
'detector_name': 'XENONnT',
'machine_time': '2012-06-09T15:30:00.009876000',
'neutrino_time': '2012-06-09T15:31:08.109876000',
'p_val': None,
'is_test': True,
'timing_series': [0, 119781135000, 119881135000,
179781135000, 248890124000]}
assert tims.messages[1].meta == { 'firedrill_mode': False}

assert tims.messages[1].is_valid() is True, "There are invalid messages"

# Try to send message to SNEWS 2.0 server.
try:
tims.send_messages()
except Exception as exc:
print('SNEWSMessageBuilder.send_messages() test failed!\n')
assert False, f"Exception raised:\n {exc}"

def test_timing_expected_with_floats():
"""Test with example of expected message type."""
# Create timing tier message.
tims = SNEWSMessageBuilder(detector_name='XENONnT',
timing_series=['2012-06-09T15:31:08.109876', '2012-06-09T15:33:07.891011'],
neutrino_time='2012-06-09T15:31:08.109876',
timing_series=[0, 119781135000, 119881135000,
179781135000, 248890124000],
machine_time='2012-06-09T15:30:00.009876',
firedrill_mode=False, is_test=True)

# # Check that message has expected structure.
assert tims.selected_tiers == ['SNEWSTimingTierMessage']
assert tims.messages[0].message_data == {'_id': 'XENONnT_TimingTier_2012-06-09T15:30:00.009876000',
assert tims.selected_tiers == ['SNEWSCoincidenceTierMessage', 'SNEWSTimingTierMessage']
assert tims.messages[1].message_data == {'_id': 'XENONnT_TimingTier_2012-06-09T15:30:00.009876000',
'schema_version': '1.3.1',
'detector_name': 'XENONnT',
'machine_time': '2012-06-09T15:30:00.009876000',
'neutrino_time': '2012-06-09T15:31:08.109876000',
'p_val': None,
'is_test': True,
'timing_series': ['2012-06-09T15:31:08.109876000',
'2012-06-09T15:33:07.891011000']}
assert tims.messages[0].meta == { 'firedrill_mode': False}
'timing_series': [0, 119781135000, 119881135000,
179781135000, 248890124000]}
assert tims.messages[1].meta == { 'firedrill_mode': False}

assert tims.messages[0].is_valid() is True, "There are invalid messages"
assert tims.messages[1].is_valid() is True, "There are invalid messages"

# Try to send message to SNEWS 2.0 server.
try:
Expand Down

0 comments on commit 4da6619

Please sign in to comment.