diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8e03175..732fc7a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -52,8 +52,38 @@ jobs: - name: Install SNEWS_Coincidence_System run: | python -m poetry install - + + - name: Install hop-client + run: | + pip install setuptools wheel + wget https://files.pythonhosted.org/packages/64/d1/108cea042128c7ea7790e15e12e3e5ed595bfcf4b051c34fe1064924beba/hop-client-0.9.0.tar.gz + tar -xzf hop-client-0.9.0.tar.gz + cd hop-client-0.9.0 + python setup.py install + cd /home/runner/work/SNEWS_Coincidence_System/SNEWS_Coincidence_System + + - shell: bash + env: + USERNAME: ${{ secrets.hop_user_name }} + PASSWORD: ${{ secrets.hop_user_password }} + run: | + sudo apt-get install -y expect + which expect + /usr/bin/expect << HOP + spawn hop auth add + expect "Username:" + send "$USERNAME\n" + expect "Password:" + send "$PASSWORD\n" + expect "Hostname (may be empty):" + send "kafka.scimma.org\n" + expect "Token endpoint (empty if not applicable):" + send "\n" + expect eof + HOP + hop auth locate + # Run the unit tests - name: Test with pytest run: | - python -m poetry run pytest + python -m poetry run pytest snews_cs diff --git a/logs/.gitignore b/logs/.gitignore deleted file mode 100644 index 8b13789..0000000 --- a/logs/.gitignore +++ /dev/null @@ -1 +0,0 @@ - diff --git a/snews_cs/alert_pub.py b/snews_cs/alert_pub.py index 6391ca5..d8a912d 100644 --- a/snews_cs/alert_pub.py +++ b/snews_cs/alert_pub.py @@ -15,7 +15,7 @@ class AlertPublisher: """ Class to publish SNEWS SuperNova Alerts based on coincidence """ - def __init__(self, env_path=None, verbose=True, auth=True, firedrill_mode=True): + def __init__(self, env_path=None, verbose=True, auth=True, firedrill_mode=True, is_test=False): """ Alert publisher constructor Parameters @@ -36,6 +36,11 @@ def __init__(self, env_path=None, verbose=True, auth=True, firedrill_mode=True): self.alert_topic = os.getenv("ALERT_TOPIC") self.verbose = verbose + if is_test: + # use a test topic + self.alert_topic = os.getenv("CONNECTION_TEST_TOPIC") + + def __enter__(self): self.stream = Stream(until_eos=True, auth=self.auth).open(self.alert_topic, 'w') return self diff --git a/snews_cs/cs_alert_schema.py b/snews_cs/cs_alert_schema.py index 1030568..13cb7dc 100644 --- a/snews_cs/cs_alert_schema.py +++ b/snews_cs/cs_alert_schema.py @@ -34,7 +34,7 @@ def id_format(self, num_detectors): else: return f'SNEWS_Coincidence_ALERT-UPDATE {date_time}' - def get_cs_alert_schema(self, data): + def get_cs_alert_schema(self, data, which_cache_to_use='main'): """ Create a message schema for alert. Internally called in hop_pub @@ -50,10 +50,15 @@ def get_cs_alert_schema(self, data): """ id = self.id_format(len(data['detector_names'])) + alert_type = "TEST "+ data['alert_type'] if which_cache_to_use=='test' else data['alert_type'] + try: + far = f"Would happen every {data['false_alarm_prob']:.2e} year" + except: + far = data['false_alarm_prob'] return {"_id": id, - "alert_type":data['alert_type'], + "alert_type":alert_type, "server_tag": data['server_tag'], - "False Alarm Prob": f"Would happen every {data['false_alarm_prob']:.2e} year", + "False Alarm Prob": far, "detector_names": data['detector_names'], "sent_time": id.split(' ')[1], "p_values": data['p_vals'], diff --git a/snews_cs/snews_coinc.py b/snews_cs/snews_coinc.py index d5c55e7..6483e41 100644 --- a/snews_cs/snews_coinc.py +++ b/snews_cs/snews_coinc.py @@ -409,10 +409,13 @@ def __init__(self, env_path=None, drop_db=False, firedrill_mode=True, hb_path=No self.exit_on_error = False # True self.initial_set = False self.alert = AlertPublisher(env_path=env_path, firedrill_mode=firedrill_mode) + self.test_alert = AlertPublisher(env_path=env_path, is_test=True) # overwrites with connection test topic if firedrill_mode: self.observation_topic = os.getenv("FIREDRILL_OBSERVATION_TOPIC") else: self.observation_topic = os.getenv("OBSERVATION_TOPIC") + # for testing, the alerts will be sent to this topic + self.test_topic = os.getenv("CONNECTION_TEST_TOPIC") self.alert_schema = CoincidenceTierAlert(env_path) # handle heartbeat self.store_heartbeat = bool(os.getenv("STORE_HEARTBEAT", "True")) @@ -420,7 +423,10 @@ def __init__(self, env_path=None, drop_db=False, firedrill_mode=True, hb_path=No self.stash_time = 86400 self.coinc_data = CacheManager() + self.test_coinc_data = CacheManager() # a separate cache for testing self.message_count = {} + self.test_message_count = {} + ## don't use a storage for the test cache def clear_cache(self): """ When a reset cache is passed, recreate the @@ -432,7 +438,7 @@ def clear_cache(self): self.coinc_data = CacheManager() # ---------------------------------------------------------------------------------------------------------------- - def display_table(self): + def display_table(self, which_cache_to_use): """ Display each sub list individually using a markdown table. @@ -440,21 +446,35 @@ def display_table(self): click.secho( f'Here is the current coincident table\n', fg='magenta', bold=True, ) - for sub_list in self.coinc_data.cache['sub_group'].unique(): - sub_df = self.coinc_data.cache.query(f'sub_group=={sub_list}') + if which_cache_to_use == 'main': + cache_data = self.coinc_data + else: + cache_data = self.test_coinc_data + for sub_list in cache_data.cache['sub_group'].unique(): + sub_df = cache_data.cache.query(f'sub_group=={sub_list}') sub_df = sub_df.drop(columns=['meta', 'machine_time', 'schema_version', 'neutrino_time_as_datetime']) sub_df = sub_df.sort_values(by=['neutrino_time']) # snews_bot.send_table(sub_df) # no need to print the table on the server. Logs have the full content print(sub_df.to_markdown()) print('=' * 168) - def send_alert(self, sub_group_tag, alert_type): - sub_df = self.coinc_data.cache.query('sub_group==@sub_group_tag') + def send_alert(self, sub_group_tag, alert_type, which_cache_to_use="main"): + if which_cache_to_use == 'main': + sub_df = self.coinc_data.cache.query('sub_group==@sub_group_tag') + try: + false_alarm_prob = cache_false_alarm_rate(cache_sub_list=sub_df, hb_cache=self.heartbeat.cache_df) + except: + false_alarm_prob = "(couldn't compute)" + alert_publisher = self.alert + else: + sub_df = self.test_coinc_data.cache.query('sub_group==@sub_group_tag') + false_alarm_prob = "N/A" + alert_publisher = self.test_alert + p_vals = sub_df['p_val'].to_list() p_vals_avg = np.round(sub_df['p_val'].mean(), decimals=5) nu_times = sub_df['neutrino_time'].to_list() detector_names = sub_df['detector_name'].to_list() - false_alarm_prob = cache_false_alarm_rate(cache_sub_list=sub_df, hb_cache=self.heartbeat.cache_df) alert_data = dict(p_vals=p_vals, p_val_avg=p_vals_avg, sub_list_num=int(sub_group_tag), @@ -464,19 +484,22 @@ def send_alert(self, sub_group_tag, alert_type): server_tag=self.server_tag, alert_type=alert_type) - with self.alert as pub: - alert = self.alert_schema.get_cs_alert_schema(data=alert_data) + + with alert_publisher as pub: + alert = self.alert_schema.get_cs_alert_schema(data=alert_data, which_cache_to_use=which_cache_to_use) pub.send(alert) - if self.send_email: - send_email(alert) - if self.send_slack: - snews_bot.send_table(alert_data, - alert, - is_test=True, - topic=self.observation_topic) + # only check to see if email or slack should be sent if the alert is not a test alert + if which_cache_to_use == 'main': + if self.send_email: + send_email(alert) + if self.send_slack: + snews_bot.send_table(alert_data, + alert, + is_test=True, + topic=self.observation_topic) # ------------------------------------------------------------------------------------------------------------------ - def alert_decider(self): + def alert_decider(self, which_cache_to_use="main"): """ This method will publish an alert every time a new detector submits an observation message @@ -484,10 +507,16 @@ def alert_decider(self): """ # mkae a pretty terminal output click.secho(f'{"=" * 100}', fg='bright_red') - # loop through the sub group tag and state - # print(f'TEST {self.coinc_data.sub_group_state}') - for sub_group_tag, state in self.coinc_data.sub_group_state.items(): + # decide which cache to use + if which_cache_to_use == 'main': + cache_data = self.coinc_data + _message_count = self.message_count + else: + cache_data = self.test_coinc_data + _message_count = self.test_message_count + + for sub_group_tag, state in cache_data.sub_group_state.items(): print('CHECKING FOR ALERTS IN SUB GROUP: ', sub_group_tag) # if state is none skip the sub group if state is None: @@ -505,7 +534,7 @@ def alert_decider(self): self.send_alert(sub_group_tag=sub_group_tag, alert_type=state) continue # publish a retraction alert for the sub group is its state is RETRACTION - elif state == 'RETRACTION' and len(self.coinc_data.cache.query('sub_group==@sub_group_tag')) < self.message_count[sub_group_tag]: + elif state == 'RETRACTION' and len(cache_data.cache.query('sub_group==@sub_group_tag')) < _message_count[sub_group_tag]: # yet another pretty terminal output click.secho(f'SUB GROUP {sub_group_tag}:{"RETRACTION HAS BEEN MADE".upper():^100}', bg='bright_green', fg='red') @@ -522,20 +551,20 @@ def alert_decider(self): fg='red') click.secho(f'{"=" * 100}', fg='bright_red') continue - elif state == 'UPDATE' and len(self.coinc_data.cache.query('sub_group==@sub_group_tag')) == self.message_count[sub_group_tag]: + elif state == 'UPDATE' and len(cache_data.cache.query('sub_group==@sub_group_tag')) == _message_count[sub_group_tag]: # yet another pretty terminal output click.secho(f'SUB GROUP {sub_group_tag}:{"A MESSAGE HAS BEEN UPDATED".upper():^100}', bg='bright_green', fg='red') log.debug('\t> An UPDATE message is received') # only publish an alert if the sub group has more than 1 message - if len(self.coinc_data.cache.query('sub_group==@sub_group_tag')) > 1: + if len(cache_data.cache.query('sub_group==@sub_group_tag')) > 1: click.secho(f'{"Publishing an updated Alert!!!".upper():^100}', bg='bright_green', fg='red') click.secho(f'{"=" * 100}', fg='bright_red') # publish update alert self.send_alert(sub_group_tag=sub_group_tag, alert_type=state) log.debug('\t> An alert is updated!') continue - elif state == 'COINC_MSG' and len(self.coinc_data.cache.query('sub_group==@sub_group_tag')) > self.message_count[sub_group_tag]: + elif state == 'COINC_MSG' and len(cache_data.cache.query('sub_group==@sub_group_tag')) > _message_count[sub_group_tag]: # yet another pretty terminal output click.secho(f'SUB GROUP {sub_group_tag}:{"NEW COINCIDENT DETECTOR.. ".upper():^100}', bg='bright_green', fg='red') click.secho(f'{"Published an Alert!!!".upper():^100}', bg='bright_green', fg='red') @@ -546,6 +575,60 @@ def alert_decider(self): continue # ------------------------------------------------------------------------------------------------------------------ + def deal_with_the_cache(self, snews_message): + """ Check if the message is a test or not, then add it to the cache and run the alert decider + + Parameters + ---------- + snews_message: dict read from the Kafka stream. + + Returns + ------- + adds messages to cache and runs the coincidence decider + """ + if "meta" in snews_message.keys(): + is_test = snews_message['meta'].get('is_test', False) + else: + if "is_test" in snews_message.keys(): + is_test = snews_message['is_test'] + else: + is_test = False + + if not is_test: + which_cache_to_use = 'main' + self.coinc_data.add_to_cache(message=snews_message) + # run the search + self.alert_decider(which_cache_to_use) + # update message count + for sub_group_tag in self.coinc_data.cache['sub_group'].unique(): + self.message_count[sub_group_tag] = len( + self.coinc_data.cache.query('sub_group==@sub_group_tag')) + self.coinc_data.sub_group_state[sub_group_tag] = None + + self.coinc_data.updated = [] + # do not have a storage for the tests + if not is_test: + self.storage.insert_coinc_cache(self.coinc_data.cache) + sys.stdout.flush() + self.coinc_data.updated = [] + else: + which_cache_to_use = 'test' + self.test_coinc_data.add_to_cache(message=snews_message) + # run the search + self.alert_decider(which_cache_to_use) + # update message count + for sub_group_tag in self.test_coinc_data.cache['sub_group'].unique(): + self.test_message_count[sub_group_tag] = len( + self.test_coinc_data.cache.query('sub_group==@sub_group_tag')) + self.test_coinc_data.sub_group_state[sub_group_tag] = None + self.test_coinc_data.updated = [] + # do not have a storage for the tests + sys.stdout.flush() + + if self.show_table: + self.display_table(which_cache_to_use) ## don't display on the server + + #------------------------------------------------------------------------------------------------------------------- def run_coincidence(self): """ As the name states this method runs the coincidence system. @@ -586,22 +669,8 @@ def run_coincidence(self): terminal_output += click.style(f"\t>{snews_message['detector_name']}, {snews_message['received_time']}", fg='bright_blue') click.secho(terminal_output) # add to cache - self.coinc_data.add_to_cache(message=snews_message) - if self.show_table: - self.display_table() ## don't display on the server - self.alert_decider() - # update message count - for sub_group_tag in self.coinc_data.cache['sub_group'].unique(): - self.message_count[sub_group_tag] = len( - self.coinc_data.cache.query('sub_group==@sub_group_tag')) - self.coinc_data.sub_group_state[sub_group_tag] = None - - self.coinc_data.updated = [] - - self.storage.insert_coinc_cache(self.coinc_data.cache) - sys.stdout.flush() - - self.coinc_data.updated = [] + ### if actual observation, use coincidence cache, else if testing use test cache + self.deal_with_the_cache(snews_message) # for each read message reduce the retriable err count if self.retriable_error_count > 1: diff --git a/snews_cs/test/test_01_connection_to_server.py b/snews_cs/test/test_01_connection_to_server.py new file mode 100644 index 0000000..b9ea696 --- /dev/null +++ b/snews_cs/test/test_01_connection_to_server.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +"""Initialization unit tests for the snews_cs module. +""" +import unittest + +import snews_cs +import snews_pt.remote_commands as sptrc + +import io +import contextlib + +class TestServer(unittest.TestCase): + def test_connection(self): + f = io.StringIO() + with contextlib.redirect_stdout(f): + #- Connect to server + sptrc.test_connection(detector_name='XENONnT', firedrill=False, start_at='LATEST', patience=8) + + #- Check the output message; it should say "You (XENONnT) have a connection" + confirm_msg = 'You (XENONnT) have a connection to the server' + self.assertTrue(confirm_msg in f.getvalue())