Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Issue #29 - Add batch capability for running analytics over historical results (e.g. after uploading data from SD cards) #30

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
73 changes: 69 additions & 4 deletions src/GasExposureAnalytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,9 @@ def _calculate_TWA_and_gauge_for_all_firefighters(self, sensor_log_chunk_df, ff_
# current_utc_timestamp : The UTC datetime for which to calculate sensor analytics. Defaults to 'now' (UTC).
# commit : Utility flag for unit testing - defaults to committing analytic results to
# the database. Setting commit=False prevents unit tests from writing to the database.
def run_analytics (self, current_utc_timestamp=None, commit=True) :
# log_mins : How often to log an informational message stating which minute is currently being processed. Defaults
# to every '1' min. Can be set to (eg) every '15' mins when running batches, so the logs are readable.
def run_analytics (self, current_utc_timestamp=None, commit=True, log_mins=1) :

# Get the desired timeframe for the analytics run and standardise it to UTC.
if current_utc_timestamp is None:
Expand All @@ -528,9 +530,11 @@ def run_analytics (self, current_utc_timestamp=None, commit=True) :
# buffer for the data.
timestamp_key = current_utc_timestamp.floor(freq='min') - pd.Timedelta(minutes = 1)

message = ("Running Prometeo Analytics for minute key '%s'" % (timestamp_key.isoformat()))
if not self._from_db : message += " (local CSV file mode)"
self.logger.info(message)
# Log progress regularly (e.g. by default, log_mins is 'every 1 min', but could be set to 'every 15 mins').
if (timestamp_key == timestamp_key.floor(str(log_mins) + 'T')) :
message = ("Running Prometeo Analytics for minute key '%s'" % (timestamp_key.floor(str(log_mins) + 'T')))
if not self._from_db : message += " (local CSV file mode)"
self.logger.info(message)

# Read a block of sensor logs from the DB, covering the longest window we're calculating over (usually 8hrs).
# Note: This has the advantage of always including all known sensor data, even when that data was delayed due
Expand All @@ -554,3 +558,64 @@ def run_analytics (self, current_utc_timestamp=None, commit=True) :
analytics_df.to_sql(ANALYTICS_TABLE, self._db_engine, if_exists='append', dtype={FIREFIGHTER_ID_COL:FIREFIGHTER_ID_COL_TYPE})

return analytics_df


# This is the batched version of 'main' - given a minute-by-minute playback schedule, it runs all of the core
# analytics for Prometeo for each of those minutes.
# playback_schedule : A list of UTC datetimes for which to calculate sensor analytics.
# commit : Utility flag for unit testing - defaults to committing analytic results to
# the database. Setting commit=False prevents unit tests from writing to the database.
def batch_run_analytics (self, playback_schedule=[], commit=True) :

# Calculate exposure for every minute in the playback schedule
all_results = []
for time in playback_schedule :

# Calculate exposure
result = self.run_analytics(time, commit, log_mins=15) # only log every 15 mins, so logs remain readable
if result is not None :
all_results.append(result)

if all_results :
return pd.concat(all_results)
else :
return None


# This is a variant of the batched version - given a date, it runs the core analytics for Prometeo for all available
# sensor data on that date.
# date : A UTC date (e.g. '2021-03-01') over which to calculate sensor analytics. Must not include time.
# commit : Utility flag for unit testing - defaults to committing analytic results to
# the database. Setting commit=False prevents unit tests from writing to the database.
def batch_run_analytics_by_date (self, date, commit=True) :

# Given a date, find the start and end times for the sensor records on that date
start_time, end_time = None, None
if self._from_db :
sql = ("SELECT MIN("+TIMESTAMP_COL+") AS start_time, MAX("+TIMESTAMP_COL+") AS end_time FROM "+SENSOR_LOG_TABLE+" WHERE DATE("+TIMESTAMP_COL+") = '"+date+"'")
start_time, end_time = pd.read_sql_query(sql, self._db_engine).iloc[0,:]
else :
index = self._sensor_log_from_csv_df.sort_index().loc[date:date].index
start_time = index.min()
end_time = index.max()

# If there's no data for the requested day, return None
if (start_time is None) or (end_time is None) :
return None

# Adjust the end - should allow for the longest time-weighted averaging window to be fully-reported.
longest_window_in_mins = max([window['mins'] for window in self.WINDOWS_AND_LIMITS])
end_time = end_time + pd.Timedelta(minutes=longest_window_in_mins)

# Log progress regularly (e.g. by default, log_mins is 'every 1 min', but could be set to 'every 15 mins').
message = ("Running Prometeo Analytics over batch period '%s' - '%s'" % (start_time.isoformat(), end_time.isoformat()))
if not self._from_db : message += " (local CSV file mode)"
self.logger.info(message)

# Get a minute-by-minute playback schedule over which to run the analytics
playback_schedule = pd.date_range(start=start_time.floor('min'), end=end_time.floor('min'), freq='min').to_list()

# Calculate exposure for every minute in the playback schedule
all_results_df = self.batch_run_analytics(playback_schedule, commit)

return all_results_df
33 changes: 33 additions & 0 deletions src/core_decision_flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
FIREFIGHTER_ID_COL = 'firefighter_id'
TIMESTAMP_COL = 'timestamp_mins'
STATUS_LED_COL = 'analytics_status_LED'
DATE_PARAMETER = 'date'

# We initialize the prometeo Analytics engine.
perMinuteAnalytics = GasExposureAnalytics()
Expand Down Expand Up @@ -167,6 +168,38 @@ def getStatusDetails():
logger.error(f'Internal Server Error: {e}')
abort(500)


@app.route('/batch_run_analytics_by_date', methods=['GET'])
def batch_run_analytics_by_date():

try:
date = request.args.get(DATE_PARAMETER)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JSegrave-IBM should we validate the date here? We just check for None.


# Return 404 (Not Found) if the record IDs are invalid
if (date is None) :
logger.error('Missing parameters : '+DATE_PARAMETER+' : '+str(date))
abort(404)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment - @Gaurav-Ramakrishna @upkarlidder

Should this error be

FYI @JSegrave-IBM


# Calculate exposure for every minute in the selected day
batchAnalytics = GasExposureAnalytics()
batch_results_df = batchAnalytics.batch_run_analytics_by_date(date)

# Return 404 (Not Found) if no record is found
if (batch_results_df is None) or (batch_results_df.empty):
logger.error('No analytic results were produced for : ' + DATE_PARAMETER + ' : ' + str(date))
abort(404)
else:
return

except HTTPException as e:
logger.error(f'{e}')
raise e
except Exception as e:
# Return 500 (Internal Server Error) if there's any unexpected errors.
logger.error(f'Internal Server Error: {e}')
abort(500)


@app.route('/get_configuration', methods=['GET'])
def getConfiguration():

Expand Down