diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 424d85e32..0b3499e3c 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -13,6 +13,7 @@ from uuid import UUID import time import pymongo +from datetime import datetime import emission.core.get_database as edb import emission.core.timer as ect @@ -207,14 +208,92 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): time.time(), gsr.elapsed) def _get_and_store_range(user_id, trip_key): - ts = esta.TimeSeries.get_time_series(user_id) - start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = None - end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) - if end_ts == -1: - end_ts = None - - user = ecwu.User(user_id) - user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}}) - logging.debug("After updating, new profiles is %s" % user.getProfile()) + """ + Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call. + + Parameters: + - user_id (str): The UUID of the user. + - trip_key (str): The key representing the trip data in the time series. + """ + + try: + logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}") + + # Fetch the time series for the user + ts = esta.TimeSeries.get_time_series(user_id) + logging.debug("Fetched time series data.") + + # Get start timestamp + start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) + start_ts = None if start_ts == -1 else start_ts + logging.debug(f"Start timestamp: {start_ts}") + + # Get end timestamp + end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) + end_ts = None if end_ts == -1 else end_ts + logging.debug(f"End timestamp: {end_ts}") + + # Initialize counters + total_trips = 0 + labeled_trips = 0 + + # Retrieve trip entries as an iterator + trip_entries = ts.find_entries([trip_key], time_query=None) + + # Iterate through trip_entries once to count total_trips and labeled_trips + for trip in trip_entries: + total_trips += 1 + if trip.get('data', {}).get('user_input'): + labeled_trips += 1 + logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + + # Retrieve last GET and PUT calls from stats/server_api_time + docs_cursor = edb.get_timeseries_db().find({ + "metadata.key": "stats/server_api_time", + }) + logging.debug("Fetched API call statistics.") + + last_get = None + last_put = None + + for doc in docs_cursor: + api_call_name = doc.get("data", {}).get("name", "") + api_call_ts = doc.get("data", {}).get("ts") + + if not api_call_ts: + logging.warning(f"Missing 'ts' in document: {doc}") + continue + + if api_call_name.startswith("GET_"): + if not last_get or api_call_ts > last_get: + last_get = api_call_ts + logging.debug(f"Updated last_get to: {last_get}") + elif api_call_name.startswith("PUT_"): + if not last_put or api_call_ts > last_put: + last_put = api_call_ts + logging.debug(f"Updated last_put to: {last_put}") + + # Determine the most recent call + if last_get and last_put: + last_call_ts = max(last_get, last_put) + else: + last_call_ts = last_get or last_put + + logging.info(f"Last call timestamp: {last_call_ts}") + + # Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call + user = ecwu.User.fromUUID(user_id) + user.update({ + "pipeline_range": { + "start_ts": start_ts, + "end_ts": end_ts + }, + "total_trips": total_trips, + "labeled_trips": labeled_trips, + "last_call": last_call_ts + }) + logging.debug("User profile updated successfully.") + logging.debug("After updating, new profile is %s", user.getProfile()) + + except Exception as e: + logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}") \ No newline at end of file