diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 0b3499e3c..4e80683a5 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -233,23 +233,22 @@ def _get_and_store_range(user_id, trip_key): end_ts = None if end_ts == -1 else end_ts logging.debug(f"End timestamp: {end_ts}") - # Initialize counters - total_trips = 0 - labeled_trips = 0 - - # Retrieve trip entries as an iterator - trip_entries = ts.find_entries([trip_key], time_query=None) - - # Iterate through trip_entries once to count total_trips and labeled_trips - for trip in trip_entries: - total_trips += 1 - if trip.get('data', {}).get('user_input'): - labeled_trips += 1 - logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + # Retrieve trip entries + total_trips = ts.find_entries_count( + key_list=["analysis/confirmed_trip"], + ) + + labeled_trips = ts.find_entries_count( + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'data.user_input': {'$ne': {}}}] + ) + logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + logging.info(type(user_id)) # Retrieve last GET and PUT calls from stats/server_api_time docs_cursor = edb.get_timeseries_db().find({ "metadata.key": "stats/server_api_time", + "user_id" : user_id }) logging.debug("Fetched API call statistics.")