From 7bb26f5e6f0e33826d856a4ccf330d8d6f00cc77 Mon Sep 17 00:00:00 2001 From: John Segrave Date: Tue, 23 Mar 2021 17:47:08 +0000 Subject: [PATCH 1/5] Issue #29 - Add batch capability for running analytics over historical results (e.g. after uploading data from SD cards) --- src/GasExposureAnalytics.py | 69 ++++++++++++++++++++++++++++++++-- src/core_decision_flask_app.py | 33 ++++++++++++++++ 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/src/GasExposureAnalytics.py b/src/GasExposureAnalytics.py index ace0e6b..caddec2 100644 --- a/src/GasExposureAnalytics.py +++ b/src/GasExposureAnalytics.py @@ -507,7 +507,9 @@ def _calculate_TWA_and_gauge_for_all_firefighters(self, sensor_log_chunk_df, ff_ # current_utc_timestamp : The UTC datetime for which to calculate sensor analytics. Defaults to 'now' (UTC). # commit : Utility flag for unit testing - defaults to committing analytic results to # the database. Setting commit=False prevents unit tests from writing to the database. - def run_analytics (self, current_utc_timestamp=None, commit=True) : + # log_mins : How often to log an informational message stating which minute is currently being processed. Defaults + # to every '1' min. Can be set to (eg) every '15' mins when running batches, so the logs are readable. + def run_analytics (self, current_utc_timestamp=None, commit=True, log_mins=1) : # Get the desired timeframe for the analytics run and standardise it to UTC. if current_utc_timestamp is None: @@ -528,9 +530,11 @@ def run_analytics (self, current_utc_timestamp=None, commit=True) : # buffer for the data. timestamp_key = current_utc_timestamp.floor(freq='min') - pd.Timedelta(minutes = 1) - message = ("Running Prometeo Analytics for minute key '%s'" % (timestamp_key.isoformat())) - if not self._from_db : message += " (local CSV file mode)" - self.logger.info(message) + # Log progress regularly (e.g. by default, log_mins is 'every 1 min', but could be set to 'every 15 mins'). + if (timestamp_key == timestamp_key.floor(str(log_mins) + 'T')) : + message = ("Running Prometeo Analytics for minute key '%s'" % (timestamp_key.floor(str(log_mins) + 'T'))) + if not self._from_db : message += " (local CSV file mode)" + self.logger.info(message) # Read a block of sensor logs from the DB, covering the longest window we're calculating over (usually 8hrs). # Note: This has the advantage of always including all known sensor data, even when that data was delayed due @@ -554,3 +558,60 @@ def run_analytics (self, current_utc_timestamp=None, commit=True) : analytics_df.to_sql(ANALYTICS_TABLE, self._db_engine, if_exists='append', dtype={FIREFIGHTER_ID_COL:FIREFIGHTER_ID_COL_TYPE}) return analytics_df + + + # This is the batched version of 'main' - given a minute-by-minute playback schedule, it runs all of the core + # analytics for Prometeo for each of those minutes. + # playback_schedule : A list of UTC datetimes for which to calculate sensor analytics. + # commit : Utility flag for unit testing - defaults to committing analytic results to + # the database. Setting commit=False prevents unit tests from writing to the database. + def batch_run_analytics (self, playback_schedule=[], commit=True) : + + # Calculate exposure for every minute in the playback schedule + all_results = [] + for time in playback_schedule : + + # Calculate exposure + result = self.run_analytics(time, commit, log_mins=15) # only log every 15 mins, so logs remain readable + if result is not None : + all_results.append(result) + + if all_results : + return pd.concat(all_results) + else : + return None + + + # This is a variant of the batched version - given a date, it runs the core analytics for Prometeo for all available + # sensor data on that date. + # date : A UTC date (e.g. '2021-03-01') over which to calculate sensor analytics. Must not include time. + # commit : Utility flag for unit testing - defaults to committing analytic results to + # the database. Setting commit=False prevents unit tests from writing to the database. + def batch_run_analytics_by_date (self, date, commit=True) : + + # Given a date, find the start and end times for the sensor records on that date + start_time, end_time = None, None + if self._from_db : + sql = ("SELECT MIN("+TIMESTAMP_COL+") AS start_time, MAX("+TIMESTAMP_COL+") AS end_time FROM "+SENSOR_LOG_TABLE+" WHERE DATE("+TIMESTAMP_COL+") = '"+date+"'") + start_time, end_time = pd.read_sql_query(sql, self._db_engine).iloc[0,:] + else : + index = self._sensor_log_from_csv_df.sort_index().loc[date:date].index + start_time = index.min() + end_time = index.max() + + # Adjust the end - should allow for the longest time-weighted averaging window to be fully-reported. + longest_window_in_mins = max([window['mins'] for window in self.WINDOWS_AND_LIMITS]) + end_time = end_time + pd.Timedelta(minutes=longest_window_in_mins) + + # Log progress regularly (e.g. by default, log_mins is 'every 1 min', but could be set to 'every 15 mins'). + message = ("Running Prometeo Analytics over batch period '%s' - '%s'" % (start_time.isoformat(), end_time.isoformat())) + if not self._from_db : message += " (local CSV file mode)" + self.logger.info(message) + + # Get a minute-by-minute playback schedule over which to run the analytics + playback_schedule = pd.date_range(start=start_time.floor('min'), end=end_time.floor('min'), freq='min').to_list() + + # Calculate exposure for every minute in the playback schedule + all_results_df = self.batch_run_analytics(playback_schedule, commit) + + return all_results_df diff --git a/src/core_decision_flask_app.py b/src/core_decision_flask_app.py index 54a323f..facb8b3 100644 --- a/src/core_decision_flask_app.py +++ b/src/core_decision_flask_app.py @@ -45,6 +45,7 @@ FIREFIGHTER_ID_COL = 'firefighter_id' TIMESTAMP_COL = 'timestamp_mins' STATUS_LED_COL = 'analytics_status_LED' +DATE_PARAMETER = 'date' # We initialize the prometeo Analytics engine. perMinuteAnalytics = GasExposureAnalytics() @@ -167,6 +168,38 @@ def getStatusDetails(): logger.error(f'Internal Server Error: {e}') abort(500) + +@app.route('/batch_run_analytics_by_date', methods=['GET']) +def batch_run_analytics_by_date(): + + try: + date = request.args.get(DATE_PARAMETER) + + # Return 404 (Not Found) if the record IDs are invalid + if (date is None) : + logger.error('Missing parameters : '+DATE_PARAMETER+' : '+str(date)) + abort(404) + + # Calculate exposure for every minute in the selected day + batchAnalytics = GasExposureAnalytics() + batch_results_df = batchAnalytics.batch_run_analytics_by_date(date) + + # Return 404 (Not Found) if no record is found + if (batch_results_df is None) or (batch_results_df.empty): + logger.error('No analytic results were produced for : ' + DATE_PARAMETER + ' : ' + str(date)) + abort(404) + else: + return + + except HTTPException as e: + logger.error(f'{e}') + raise e + except Exception as e: + # Return 500 (Internal Server Error) if there's any unexpected errors. + logger.error(f'Internal Server Error: {e}') + abort(500) + + @app.route('/get_configuration', methods=['GET']) def getConfiguration(): From 158a3f0da384854553dbac80ed26c053ecc51830 Mon Sep 17 00:00:00 2001 From: John Segrave Date: Tue, 23 Mar 2021 18:57:45 +0000 Subject: [PATCH 2/5] Issue #29 - Add batch capability for running analytics over historical results (e.g. after uploading data from SD cards) --- src/GasExposureAnalytics.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/GasExposureAnalytics.py b/src/GasExposureAnalytics.py index caddec2..fd3165d 100644 --- a/src/GasExposureAnalytics.py +++ b/src/GasExposureAnalytics.py @@ -598,6 +598,10 @@ def batch_run_analytics_by_date (self, date, commit=True) : index = self._sensor_log_from_csv_df.sort_index().loc[date:date].index start_time = index.min() end_time = index.max() + + # If there's no data for the requested day, return None + if (start_time is None) or (end_time is None) : + return None # Adjust the end - should allow for the longest time-weighted averaging window to be fully-reported. longest_window_in_mins = max([window['mins'] for window in self.WINDOWS_AND_LIMITS]) From c7b01dc03cb2855cdb9113446f70aa86af01d5c3 Mon Sep 17 00:00:00 2001 From: John Segrave Date: Thu, 1 Apr 2021 10:13:55 +0100 Subject: [PATCH 3/5] Issue #29 - Fixed review comments - date parameter type-checking and 400 return code. --- src/core_decision_flask_app.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/core_decision_flask_app.py b/src/core_decision_flask_app.py index facb8b3..a74b294 100644 --- a/src/core_decision_flask_app.py +++ b/src/core_decision_flask_app.py @@ -7,6 +7,7 @@ from GasExposureAnalytics import GasExposureAnalytics from dotenv import load_dotenv import time +from datetime import date import atexit from apscheduler.schedulers.background import BackgroundScheduler import logging @@ -173,20 +174,27 @@ def getStatusDetails(): def batch_run_analytics_by_date(): try: - date = request.args.get(DATE_PARAMETER) + date_str = request.args.get(DATE_PARAMETER) + + # Return 400 (Bad Request) if the supplied date is invalid. + if (date_str is None) : + logger.error('Missing parameters : '+DATE_PARAMETER+' : '+date_str) + abort(400) + else : + try : + date.fromisoformat(date_str) + except ValueError as e: + logger.error("Invalid '"+DATE_PARAMETER+"' parameter. '"+date_str+"' should be an ISO-formatted YYYY-MM-DD date string (like 2000-12-31)") + abort(400) - # Return 404 (Not Found) if the record IDs are invalid - if (date is None) : - logger.error('Missing parameters : '+DATE_PARAMETER+' : '+str(date)) - abort(404) # Calculate exposure for every minute in the selected day batchAnalytics = GasExposureAnalytics() - batch_results_df = batchAnalytics.batch_run_analytics_by_date(date) + batch_results_df = batchAnalytics.batch_run_analytics_by_date(date_str) # Return 404 (Not Found) if no record is found if (batch_results_df is None) or (batch_results_df.empty): - logger.error('No analytic results were produced for : ' + DATE_PARAMETER + ' : ' + str(date)) + logger.error('No analytic results were produced for : ' + DATE_PARAMETER + ' : ' + date_str) abort(404) else: return From 9a9131d2d2d848f9226d92e2ea801be982178bda Mon Sep 17 00:00:00 2001 From: John Segrave Date: Thu, 1 Apr 2021 10:25:29 +0100 Subject: [PATCH 4/5] Issue #29 - documented local test command in readme --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e1c7e98..b53efb9 100644 --- a/README.md +++ b/README.md @@ -47,11 +47,15 @@ You can run this solution locally in docker as follows ``` source python3/bin/activate ``` -5. Run the application +5. Test the application + ``` + python manage.py test + ``` +6. Run the application ``` python src/core_decision_flask_app.py 8080 ``` -6. You should see the following output +7. You should see the following output ``` starting application * Serving Flask app "core_decision_flask_app" (lazy loading) From 104264e982227d10eb2e5e7dc26262827935f1a1 Mon Sep 17 00:00:00 2001 From: John Segrave Date: Thu, 1 Apr 2021 15:53:28 +0100 Subject: [PATCH 5/5] Issue #29 - batch can now overwrite existing analytics records in the batch window (also refactored batch_run_analytics to be a little clearer - it takes a start & end time now (rather than a schedule of individual minutes) --- src/GasExposureAnalytics.py | 50 ++++++++++++++++++++-------------- src/core_decision_flask_app.py | 2 +- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/src/GasExposureAnalytics.py b/src/GasExposureAnalytics.py index fd3165d..b2fda7a 100644 --- a/src/GasExposureAnalytics.py +++ b/src/GasExposureAnalytics.py @@ -507,9 +507,10 @@ def _calculate_TWA_and_gauge_for_all_firefighters(self, sensor_log_chunk_df, ff_ # current_utc_timestamp : The UTC datetime for which to calculate sensor analytics. Defaults to 'now' (UTC). # commit : Utility flag for unit testing - defaults to committing analytic results to # the database. Setting commit=False prevents unit tests from writing to the database. + # overwrite : Enables batch execution to overwrite any existing result in the time period being processed. # log_mins : How often to log an informational message stating which minute is currently being processed. Defaults # to every '1' min. Can be set to (eg) every '15' mins when running batches, so the logs are readable. - def run_analytics (self, current_utc_timestamp=None, commit=True, log_mins=1) : + def run_analytics (self, current_utc_timestamp=None, commit=True, overwrite=False, log_mins=1) : # Get the desired timeframe for the analytics run and standardise it to UTC. if current_utc_timestamp is None: @@ -554,25 +555,42 @@ def run_analytics (self, current_utc_timestamp=None, commit=True, log_mins=1) : # Work out all the time-weighted averages and corresponding limit gauges for all firefighters, all limits and all gases. analytics_df = self._calculate_TWA_and_gauge_for_all_firefighters(sensor_log_df, ff_time_spans_df, timestamp_key) + # Write the analytic results to the DB if commit : + + # Remove any pre-existing analytic results before writing new ones. + if overwrite : + with self._db_engine.connect() as connection: # 'with' auto-closes the connection + connection.execute("DELETE FROM " + ANALYTICS_TABLE + " where " + TIMESTAMP_COL + " = '" + timestamp_key.isoformat() + "'") + analytics_df.to_sql(ANALYTICS_TABLE, self._db_engine, if_exists='append', dtype={FIREFIGHTER_ID_COL:FIREFIGHTER_ID_COL_TYPE}) return analytics_df - # This is the batched version of 'main' - given a minute-by-minute playback schedule, it runs all of the core - # analytics for Prometeo for each of those minutes. - # playback_schedule : A list of UTC datetimes for which to calculate sensor analytics. - # commit : Utility flag for unit testing - defaults to committing analytic results to - # the database. Setting commit=False prevents unit tests from writing to the database. - def batch_run_analytics (self, playback_schedule=[], commit=True) : + # This is the batched version of 'main' - given a start time and and end time, it generates a minute-by-minute + # playback schedule and runs all of the core analytics for Prometeo for each of those minutes. + # start_time : The date & time at which to start calculating sensor analytics (UTC datetime). + # end_time : The date & time at which to stop calculating sensor analytics (UTC datetime). + # commit : Utility flag for unit testing. Defaults to committing analytic results to the database for production. + # Setting commit=False prevents unit tests from writing to the database. + def batch_run_analytics (self, start_time=None, end_time=None, commit=True) : + + # Log that a batch procss is starting & what it will look at and what it will over-write. + message = (("Running Prometeo Analytics over batch period '%s' - '%s'. " + + "*** Any existing analytic results in this period will be overwritten. ***") + % (start_time.isoformat(), end_time.isoformat())) + if not self._from_db : message += " (local CSV file mode)" + self.logger.info(message) + + # Get a minute-by-minute playback schedule over which to run the analytics + playback_schedule = pd.date_range(start=start_time.floor('min'), end=end_time.floor('min'), freq='min').to_list() # Calculate exposure for every minute in the playback schedule all_results = [] for time in playback_schedule : - - # Calculate exposure - result = self.run_analytics(time, commit, log_mins=15) # only log every 15 mins, so logs remain readable + # Calculate exposure, overwriting any pre-existing analytic results before new ones are written. + result = self.run_analytics(time, commit, overwrite=True, log_mins=15) # only log every 15 mins, so logs remain readable if result is not None : all_results.append(result) @@ -603,19 +621,11 @@ def batch_run_analytics_by_date (self, date, commit=True) : if (start_time is None) or (end_time is None) : return None - # Adjust the end - should allow for the longest time-weighted averaging window to be fully-reported. + # Adjust the end - allow time for the longest time-weighted averaging window to be fully-reported. longest_window_in_mins = max([window['mins'] for window in self.WINDOWS_AND_LIMITS]) end_time = end_time + pd.Timedelta(minutes=longest_window_in_mins) - # Log progress regularly (e.g. by default, log_mins is 'every 1 min', but could be set to 'every 15 mins'). - message = ("Running Prometeo Analytics over batch period '%s' - '%s'" % (start_time.isoformat(), end_time.isoformat())) - if not self._from_db : message += " (local CSV file mode)" - self.logger.info(message) - - # Get a minute-by-minute playback schedule over which to run the analytics - playback_schedule = pd.date_range(start=start_time.floor('min'), end=end_time.floor('min'), freq='min').to_list() - # Calculate exposure for every minute in the playback schedule - all_results_df = self.batch_run_analytics(playback_schedule, commit) + all_results_df = self.batch_run_analytics(start_time, end_time, commit) return all_results_df diff --git a/src/core_decision_flask_app.py b/src/core_decision_flask_app.py index a74b294..5123147 100644 --- a/src/core_decision_flask_app.py +++ b/src/core_decision_flask_app.py @@ -197,7 +197,7 @@ def batch_run_analytics_by_date(): logger.error('No analytic results were produced for : ' + DATE_PARAMETER + ' : ' + date_str) abort(404) else: - return + return "batch run complete" except HTTPException as e: logger.error(f'{e}')