Skip to content

Commit

Permalink
Refine timing measurement by removing low-significance functions
Browse files Browse the repository at this point in the history
- Removed tracking for additional functions identified as low-impact during iOS and Android local testing.
- Retained tracking for key contributors to overall execution time.
- Suggested broader retesting on staging, production, or different datasets to validate changes.
  • Loading branch information
TeachMeTW committed Nov 23, 2024
1 parent 0d2a2ed commit 87dad3c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,8 @@ def segment_into_trips(self, timeseries, time_query):
# segmentation_points.append(currPoint)

if just_ended:
with ect.Timer() as t_continue_just_ended:
continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/continue_just_ended",
time.time(),
t_continue_just_ended.elapsed
)

continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df)

if continue_flag:
# We have "processed" the currPoint by deciding to glom it
Expand Down Expand Up @@ -126,38 +120,28 @@ def segment_into_trips(self, timeseries, time_query):
)

if trip_ended:
with ect.Timer() as t_get_last_trip_end_point:
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx - 1))
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_last_trip_end_point",
time.time(),
t_get_last_trip_end_point.elapsed
)

with ect.Timer() as t_handle_trip_end:
just_ended = True
# Now, we have finished processing the previous point as a trip
# end or not. But we still need to process this point by seeing
# whether it should represent a new trip start, or a glom to the
# previous trip
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_trip_end",
time.time(),
t_handle_trip_end.elapsed
)

last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx - 1))
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts



just_ended = True
# Now, we have finished processing the previous point as a trip
# end or not. But we still need to process this point by seeing
# whether it should represent a new trip start, or a glom to the
# previous trip
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False

esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,40 +65,20 @@ def segment_into_trips(self, timeseries, time_query):
data that they want from the sensor streams in order to determine the
segmentation points.
"""
with ect.Timer() as t_get_filtered_points_pre:
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0]
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_points_pre_ts_diff_df",
time.time(),
t_get_filtered_points_pre.elapsed
)

with ect.Timer() as t_filter_bogus_points:
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[
(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points",
time.time(),
t_filter_bogus_points.elapsed
)
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0]

with ect.Timer() as t_get_transition_df:
transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df",
time.time(),
t_get_transition_df.elapsed
)
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[
(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)

transition_df = timeseries.get_data_df("statemachine/transition", time_query)

if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
Expand Down Expand Up @@ -216,31 +196,26 @@ def segment_into_trips(self, timeseries, time_query):
t_loop.elapsed
)

with ect.Timer() as t_post_loop:
logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[
(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)
]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
if len(stopped_moving_after_last) > 0:
(unused, last_trip_end_point) = self.get_last_trip_end_point(
filtered_points_df,
last10Points_df,
None
)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/post_loop",
time.time(),
t_post_loop.elapsed
)

logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[
(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)
]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
if len(stopped_moving_after_last) > 0:
(unused, last_trip_end_point) = self.get_last_trip_end_point(
filtered_points_df,
last10Points_df,
None
)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts


return segmentation_points

Expand Down

0 comments on commit 87dad3c

Please sign in to comment.