Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal testing logic #124

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,38 @@ jobs:
- name: Install SNEWS_Coincidence_System
run: |
python -m poetry install


- name: Install hop-client
run: |
pip install setuptools wheel
wget https://files.pythonhosted.org/packages/64/d1/108cea042128c7ea7790e15e12e3e5ed595bfcf4b051c34fe1064924beba/hop-client-0.9.0.tar.gz
tar -xzf hop-client-0.9.0.tar.gz
cd hop-client-0.9.0
python setup.py install
cd /home/runner/work/SNEWS_Coincidence_System/SNEWS_Coincidence_System

- shell: bash
env:
USERNAME: ${{ secrets.hop_user_name }}
PASSWORD: ${{ secrets.hop_user_password }}
run: |
sudo apt-get install -y expect
which expect
/usr/bin/expect << HOP
spawn hop auth add
expect "Username:"
send "$USERNAME\n"
expect "Password:"
send "$PASSWORD\n"
expect "Hostname (may be empty):"
send "kafka.scimma.org\n"
expect "Token endpoint (empty if not applicable):"
send "\n"
expect eof
HOP
hop auth locate

# Run the unit tests
- name: Test with pytest
run: |
python -m poetry run pytest
python -m poetry run pytest snews_cs
1 change: 0 additions & 1 deletion logs/.gitignore

This file was deleted.

7 changes: 6 additions & 1 deletion snews_cs/alert_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class AlertPublisher:
""" Class to publish SNEWS SuperNova Alerts based on coincidence

"""
def __init__(self, env_path=None, verbose=True, auth=True, firedrill_mode=True):
def __init__(self, env_path=None, verbose=True, auth=True, firedrill_mode=True, is_test=False):
"""
Alert publisher constructor
Parameters
Expand All @@ -36,6 +36,11 @@ def __init__(self, env_path=None, verbose=True, auth=True, firedrill_mode=True):
self.alert_topic = os.getenv("ALERT_TOPIC")
self.verbose = verbose

if is_test:
# use a test topic
self.alert_topic = os.getenv("CONNECTION_TEST_TOPIC")


def __enter__(self):
self.stream = Stream(until_eos=True, auth=self.auth).open(self.alert_topic, 'w')
return self
Expand Down
11 changes: 8 additions & 3 deletions snews_cs/cs_alert_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def id_format(self, num_detectors):
else:
return f'SNEWS_Coincidence_ALERT-UPDATE {date_time}'

def get_cs_alert_schema(self, data):
def get_cs_alert_schema(self, data, which_cache_to_use='main'):
""" Create a message schema for alert.
Internally called in hop_pub

Expand All @@ -50,10 +50,15 @@ def get_cs_alert_schema(self, data):

"""
id = self.id_format(len(data['detector_names']))
alert_type = "TEST "+ data['alert_type'] if which_cache_to_use=='test' else data['alert_type']
try:
far = f"Would happen every {data['false_alarm_prob']:.2e} year"
except:
far = data['false_alarm_prob']
return {"_id": id,
"alert_type":data['alert_type'],
"alert_type":alert_type,
"server_tag": data['server_tag'],
"False Alarm Prob": f"Would happen every {data['false_alarm_prob']:.2e} year",
"False Alarm Prob": far,
"detector_names": data['detector_names'],
"sent_time": id.split(' ')[1],
"p_values": data['p_vals'],
Expand Down
147 changes: 108 additions & 39 deletions snews_cs/snews_coinc.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,24 @@ def __init__(self, env_path=None, drop_db=False, firedrill_mode=True, hb_path=No
self.exit_on_error = False # True
self.initial_set = False
self.alert = AlertPublisher(env_path=env_path, firedrill_mode=firedrill_mode)
self.test_alert = AlertPublisher(env_path=env_path, is_test=True) # overwrites with connection test topic
if firedrill_mode:
self.observation_topic = os.getenv("FIREDRILL_OBSERVATION_TOPIC")
else:
self.observation_topic = os.getenv("OBSERVATION_TOPIC")
# for testing, the alerts will be sent to this topic
self.test_topic = os.getenv("CONNECTION_TEST_TOPIC")
self.alert_schema = CoincidenceTierAlert(env_path)
# handle heartbeat
self.store_heartbeat = bool(os.getenv("STORE_HEARTBEAT", "True"))
self.heartbeat = HeartBeat(env_path=env_path, firedrill_mode=firedrill_mode)

self.stash_time = 86400
self.coinc_data = CacheManager()
self.test_coinc_data = CacheManager() # a separate cache for testing
self.message_count = {}
self.test_message_count = {}
## don't use a storage for the test cache

def clear_cache(self):
""" When a reset cache is passed, recreate the
Expand All @@ -432,29 +438,43 @@ def clear_cache(self):
self.coinc_data = CacheManager()

# ----------------------------------------------------------------------------------------------------------------
def display_table(self):
def display_table(self, which_cache_to_use):
"""
Display each sub list individually using a markdown table.

"""
click.secho(
f'Here is the current coincident table\n',
fg='magenta', bold=True, )
for sub_list in self.coinc_data.cache['sub_group'].unique():
sub_df = self.coinc_data.cache.query(f'sub_group=={sub_list}')
if which_cache_to_use == 'main':
cache_data = self.coinc_data
else:
cache_data = self.test_coinc_data
for sub_list in cache_data.cache['sub_group'].unique():
sub_df = cache_data.cache.query(f'sub_group=={sub_list}')
sub_df = sub_df.drop(columns=['meta', 'machine_time', 'schema_version', 'neutrino_time_as_datetime'])
sub_df = sub_df.sort_values(by=['neutrino_time'])
# snews_bot.send_table(sub_df) # no need to print the table on the server. Logs have the full content
print(sub_df.to_markdown())
print('=' * 168)

def send_alert(self, sub_group_tag, alert_type):
sub_df = self.coinc_data.cache.query('sub_group==@sub_group_tag')
def send_alert(self, sub_group_tag, alert_type, which_cache_to_use="main"):
if which_cache_to_use == 'main':
sub_df = self.coinc_data.cache.query('sub_group==@sub_group_tag')
try:
false_alarm_prob = cache_false_alarm_rate(cache_sub_list=sub_df, hb_cache=self.heartbeat.cache_df)
except:
false_alarm_prob = "(couldn't compute)"
alert_publisher = self.alert
else:
sub_df = self.test_coinc_data.cache.query('sub_group==@sub_group_tag')
false_alarm_prob = "N/A"
alert_publisher = self.test_alert

p_vals = sub_df['p_val'].to_list()
p_vals_avg = np.round(sub_df['p_val'].mean(), decimals=5)
nu_times = sub_df['neutrino_time'].to_list()
detector_names = sub_df['detector_name'].to_list()
false_alarm_prob = cache_false_alarm_rate(cache_sub_list=sub_df, hb_cache=self.heartbeat.cache_df)
alert_data = dict(p_vals=p_vals,
p_val_avg=p_vals_avg,
sub_list_num=int(sub_group_tag),
Expand All @@ -464,30 +484,39 @@ def send_alert(self, sub_group_tag, alert_type):
server_tag=self.server_tag,
alert_type=alert_type)

with self.alert as pub:
alert = self.alert_schema.get_cs_alert_schema(data=alert_data)

with alert_publisher as pub:
alert = self.alert_schema.get_cs_alert_schema(data=alert_data, which_cache_to_use=which_cache_to_use)
pub.send(alert)
if self.send_email:
send_email(alert)
if self.send_slack:
snews_bot.send_table(alert_data,
alert,
is_test=True,
topic=self.observation_topic)
# only check to see if email or slack should be sent if the alert is not a test alert
if which_cache_to_use == 'main':
if self.send_email:
send_email(alert)
if self.send_slack:
snews_bot.send_table(alert_data,
alert,
is_test=True,
topic=self.observation_topic)

# ------------------------------------------------------------------------------------------------------------------
def alert_decider(self):
def alert_decider(self, which_cache_to_use="main"):
"""
This method will publish an alert every time a new detector
submits an observation message

"""
# mkae a pretty terminal output
click.secho(f'{"=" * 100}', fg='bright_red')
# loop through the sub group tag and state
# print(f'TEST {self.coinc_data.sub_group_state}')

for sub_group_tag, state in self.coinc_data.sub_group_state.items():
# decide which cache to use
if which_cache_to_use == 'main':
cache_data = self.coinc_data
_message_count = self.message_count
else:
cache_data = self.test_coinc_data
_message_count = self.test_message_count

for sub_group_tag, state in cache_data.sub_group_state.items():
print('CHECKING FOR ALERTS IN SUB GROUP: ', sub_group_tag)
# if state is none skip the sub group
if state is None:
Expand All @@ -505,7 +534,7 @@ def alert_decider(self):
self.send_alert(sub_group_tag=sub_group_tag, alert_type=state)
continue
# publish a retraction alert for the sub group is its state is RETRACTION
elif state == 'RETRACTION' and len(self.coinc_data.cache.query('sub_group==@sub_group_tag')) < self.message_count[sub_group_tag]:
elif state == 'RETRACTION' and len(cache_data.cache.query('sub_group==@sub_group_tag')) < _message_count[sub_group_tag]:
# yet another pretty terminal output
click.secho(f'SUB GROUP {sub_group_tag}:{"RETRACTION HAS BEEN MADE".upper():^100}', bg='bright_green',
fg='red')
Expand All @@ -522,20 +551,20 @@ def alert_decider(self):
fg='red')
click.secho(f'{"=" * 100}', fg='bright_red')
continue
elif state == 'UPDATE' and len(self.coinc_data.cache.query('sub_group==@sub_group_tag')) == self.message_count[sub_group_tag]:
elif state == 'UPDATE' and len(cache_data.cache.query('sub_group==@sub_group_tag')) == _message_count[sub_group_tag]:
# yet another pretty terminal output
click.secho(f'SUB GROUP {sub_group_tag}:{"A MESSAGE HAS BEEN UPDATED".upper():^100}', bg='bright_green',
fg='red')
log.debug('\t> An UPDATE message is received')
# only publish an alert if the sub group has more than 1 message
if len(self.coinc_data.cache.query('sub_group==@sub_group_tag')) > 1:
if len(cache_data.cache.query('sub_group==@sub_group_tag')) > 1:
click.secho(f'{"Publishing an updated Alert!!!".upper():^100}', bg='bright_green', fg='red')
click.secho(f'{"=" * 100}', fg='bright_red')
# publish update alert
self.send_alert(sub_group_tag=sub_group_tag, alert_type=state)
log.debug('\t> An alert is updated!')
continue
elif state == 'COINC_MSG' and len(self.coinc_data.cache.query('sub_group==@sub_group_tag')) > self.message_count[sub_group_tag]:
elif state == 'COINC_MSG' and len(cache_data.cache.query('sub_group==@sub_group_tag')) > _message_count[sub_group_tag]:
# yet another pretty terminal output
click.secho(f'SUB GROUP {sub_group_tag}:{"NEW COINCIDENT DETECTOR.. ".upper():^100}', bg='bright_green', fg='red')
click.secho(f'{"Published an Alert!!!".upper():^100}', bg='bright_green', fg='red')
Expand All @@ -546,6 +575,60 @@ def alert_decider(self):
continue

# ------------------------------------------------------------------------------------------------------------------
def deal_with_the_cache(self, snews_message):
""" Check if the message is a test or not, then add it to the cache and run the alert decider

Parameters
----------
snews_message: dict read from the Kafka stream.

Returns
-------
adds messages to cache and runs the coincidence decider
"""
if "meta" in snews_message.keys():
is_test = snews_message['meta'].get('is_test', False)
else:
if "is_test" in snews_message.keys():
is_test = snews_message['is_test']
else:
is_test = False

if not is_test:
which_cache_to_use = 'main'
self.coinc_data.add_to_cache(message=snews_message)
# run the search
self.alert_decider(which_cache_to_use)
# update message count
for sub_group_tag in self.coinc_data.cache['sub_group'].unique():
self.message_count[sub_group_tag] = len(
self.coinc_data.cache.query('sub_group==@sub_group_tag'))
self.coinc_data.sub_group_state[sub_group_tag] = None

self.coinc_data.updated = []
# do not have a storage for the tests
if not is_test:
self.storage.insert_coinc_cache(self.coinc_data.cache)
sys.stdout.flush()
self.coinc_data.updated = []
else:
which_cache_to_use = 'test'
self.test_coinc_data.add_to_cache(message=snews_message)
# run the search
self.alert_decider(which_cache_to_use)
# update message count
for sub_group_tag in self.test_coinc_data.cache['sub_group'].unique():
self.test_message_count[sub_group_tag] = len(
self.test_coinc_data.cache.query('sub_group==@sub_group_tag'))
self.test_coinc_data.sub_group_state[sub_group_tag] = None
self.test_coinc_data.updated = []
# do not have a storage for the tests
sys.stdout.flush()

if self.show_table:
self.display_table(which_cache_to_use) ## don't display on the server

#-------------------------------------------------------------------------------------------------------------------
def run_coincidence(self):
"""
As the name states this method runs the coincidence system.
Expand Down Expand Up @@ -586,22 +669,8 @@ def run_coincidence(self):
terminal_output += click.style(f"\t>{snews_message['detector_name']}, {snews_message['received_time']}", fg='bright_blue')
click.secho(terminal_output)
# add to cache
self.coinc_data.add_to_cache(message=snews_message)
if self.show_table:
self.display_table() ## don't display on the server
self.alert_decider()
# update message count
for sub_group_tag in self.coinc_data.cache['sub_group'].unique():
self.message_count[sub_group_tag] = len(
self.coinc_data.cache.query('sub_group==@sub_group_tag'))
self.coinc_data.sub_group_state[sub_group_tag] = None

self.coinc_data.updated = []

self.storage.insert_coinc_cache(self.coinc_data.cache)
sys.stdout.flush()

self.coinc_data.updated = []
### if actual observation, use coincidence cache, else if testing use test cache
self.deal_with_the_cache(snews_message)

# for each read message reduce the retriable err count
if self.retriable_error_count > 1:
Expand Down
21 changes: 21 additions & 0 deletions snews_cs/test/test_01_connection_to_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""Initialization unit tests for the snews_cs module.
"""
import unittest

import snews_cs
import snews_pt.remote_commands as sptrc

import io
import contextlib

class TestServer(unittest.TestCase):
def test_connection(self):
f = io.StringIO()
with contextlib.redirect_stdout(f):
#- Connect to server
sptrc.test_connection(detector_name='XENONnT', firedrill=False, start_at='LATEST', patience=8)

#- Check the output message; it should say "You (XENONnT) have a connection"
confirm_msg = 'You (XENONnT) have a connection to the server'
self.assertTrue(confirm_msg in f.getvalue())
Loading