Skip to content

Commit

Permalink
Modified last call to mirror op-admin implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Nov 8, 2024
1 parent 96682d7 commit f1d2eb2
Showing 1 changed file with 17 additions and 31 deletions.
48 changes: 17 additions & 31 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def _get_and_store_range(user_id, trip_key):
- user_id (str): The UUID of the user.
- trip_key (str): The key representing the trip data in the time series.
"""

time_format = 'YYYY-MM-DD HH:mm:ss'
try:
logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}")

Expand Down Expand Up @@ -245,43 +245,29 @@ def _get_and_store_range(user_id, trip_key):

logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
logging.info(type(user_id))
# Retrieve last GET and PUT calls from stats/server_api_time
docs_cursor = edb.get_timeseries_db().find({
"metadata.key": "stats/server_api_time",
"user_id" : user_id
})
logging.debug("Fetched API call statistics.")

last_get = None
last_put = None

for doc in docs_cursor:
api_call_name = doc.get("data", {}).get("name", "")
api_call_ts = doc.get("data", {}).get("ts")

if not api_call_ts:
logging.warning(f"Missing 'ts' in document: {doc}")
continue

if api_call_name.startswith("GET_"):
if not last_get or api_call_ts > last_get:
last_get = api_call_ts
logging.debug(f"Updated last_get to: {last_get}")
elif api_call_name.startswith("PUT_"):
if not last_put or api_call_ts > last_put:
last_put = api_call_ts
logging.debug(f"Updated last_put to: {last_put}")

# Determine the most recent call
if last_get and last_put:
last_call_ts = max(last_get, last_put)
else:
last_call_ts = last_get or last_put
last_call_ts = ts.get_first_value_for_field(
key='stats/server_api_time',
field='data.ts',
sort_order=pymongo.DESCENDING
)

logging.info(f"Last call timestamp: {last_call_ts}")

# Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call
user = ecwu.User.fromUUID(user_id)
if last_call_ts != -1:
# Format the timestamp using arrow
formatted_last_call = arrow.get(last_call_ts).format(time_format)
# Assign using attribute access or the update method
# Option 1: Attribute Assignment (if supported)
# user.last_call = formatted_last_call

# Option 2: Using the update method
user.update({
"last_call": formatted_last_call
})
user.update({
"pipeline_range": {
"start_ts": start_ts,
Expand Down

0 comments on commit f1d2eb2

Please sign in to comment.