diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 4e80683a5..2d85294ea 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -215,7 +215,7 @@ def _get_and_store_range(user_id, trip_key): - user_id (str): The UUID of the user. - trip_key (str): The key representing the trip data in the time series. """ - + time_format = 'YYYY-MM-DD HH:mm:ss' try: logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}") @@ -245,43 +245,29 @@ def _get_and_store_range(user_id, trip_key): logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") logging.info(type(user_id)) - # Retrieve last GET and PUT calls from stats/server_api_time - docs_cursor = edb.get_timeseries_db().find({ - "metadata.key": "stats/server_api_time", - "user_id" : user_id - }) logging.debug("Fetched API call statistics.") - last_get = None - last_put = None - - for doc in docs_cursor: - api_call_name = doc.get("data", {}).get("name", "") - api_call_ts = doc.get("data", {}).get("ts") - - if not api_call_ts: - logging.warning(f"Missing 'ts' in document: {doc}") - continue - - if api_call_name.startswith("GET_"): - if not last_get or api_call_ts > last_get: - last_get = api_call_ts - logging.debug(f"Updated last_get to: {last_get}") - elif api_call_name.startswith("PUT_"): - if not last_put or api_call_ts > last_put: - last_put = api_call_ts - logging.debug(f"Updated last_put to: {last_put}") - - # Determine the most recent call - if last_get and last_put: - last_call_ts = max(last_get, last_put) - else: - last_call_ts = last_get or last_put + last_call_ts = ts.get_first_value_for_field( + key='stats/server_api_time', + field='data.ts', + sort_order=pymongo.DESCENDING + ) logging.info(f"Last call timestamp: {last_call_ts}") # Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call user = ecwu.User.fromUUID(user_id) + if last_call_ts != -1: + # Format the timestamp using arrow + formatted_last_call = arrow.get(last_call_ts).format(time_format) + # Assign using attribute access or the update method + # Option 1: Attribute Assignment (if supported) + # user.last_call = formatted_last_call + + # Option 2: Using the update method + user.update({ + "last_call": formatted_last_call + }) user.update({ "pipeline_range": { "start_ts": start_ts,