Skip to content

Commit

Permalink
Updated identifier naming conventions in GroupedDataSubscriber to bet…
Browse files Browse the repository at this point in the history
…ter conform with Python norms
  • Loading branch information
ritchiecarroll committed Jul 30, 2024
1 parent 2a99b35 commit ce8d0a9
Showing 1 changed file with 78 additions and 78 deletions.
156 changes: 78 additions & 78 deletions examples/groupeddatasubscribe/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class GroupedDataSubscriber(Subscriber):
If incoming frame rate is higher than the samples per second, or timestamp alignment does not accurately coinside with
the subsecond distribution, some data will be downsampled. Downsampled data count is tracked and reported to through the
downsampled_count property.
downsampledcount property.
Only a single one-second data buffer will be published at a time. If data cannot be processed within the one-second
window, a warning message will be displayed and the data will be skipped. The number of skipped data sets is tracked
and reported through the process_missed_count property.
and reported through the processmissedcount property.
This example depends on a semi-accurate system clock to group data by timestamp. If the system clock is not accurate,
data may not be grouped as expected.
Expand All @@ -70,133 +70,133 @@ def __init__(self):
self.config = Config()
self.settings = Settings()

self.measurement_window_size = 1
self.measurement_windowsize = 1
"""
Defines measurement window size, in whole seconds, for data grouping.
"""

self.lag_time = 5.0
self.lagtime = 5.0
"""
Defines the lag time, in seconds, for data grouping. Data received outside
of this past time limit, relative to local clock, will be discarded.
"""

self.lead_time = 5.0
self.leadtime = 5.0
"""
Defines the lead time, in seconds, for data grouping. Data received outside
this future time limit, relative to local clock, will be discarded.
"""

self.samples_per_second = 30
self.samplespersecond = 30
"""
Defines the number of samples per second for the data in the stream.
"""

self.display_measurement_summary = False
self.display_measurementsummary = False
"""
Defines if the subscriber should display a summary of received measurements
every few seconds.
"""

self._grouped_data: Dict[np.uint64, Dict[UUID, Measurement]] = {}
self._grouped_data_receiver: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]] = None
self._groupeddata: Dict[np.uint64, Dict[UUID, Measurement]] = {}
self._groupeddata_receiver: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]] = None

self._lastmessage = 0.0

self._downsampled_count_lock = threading.Lock()
self._downsampled_count = 0
self._downsampledcount_lock = threading.Lock()
self._downsampledcount = 0

self._process_lock = threading.Lock()

self._process_missed_count_lock = threading.Lock()
self._process_missed_count = 0
self._processmissedcount_lock = threading.Lock()
self._processmissedcount = 0

# Set up event handlers for STTP API
self.set_subscriptionupdated_receiver(self._subscription_updated)
self.set_newmeasurements_receiver(self._new_measurements)
self.set_connectionterminated_receiver(self._connection_terminated)

@property
def downsampled_count(self) -> int:
def downsampledcount(self) -> int:
"""
Gets the count of downsampled measurements.
"""

with self._downsampled_count_lock:
return self._downsampled_count
with self._downsampledcount_lock:
return self._downsampledcount

@downsampled_count.setter
def downsampled_count(self, value: np.int32):
@downsampledcount.setter
def downsampledcount(self, value: np.int32):
"""
Sets the count of downsampled measurements.
"""

with self._downsampled_count_lock:
self._downsampled_count = value
with self._downsampledcount_lock:
self._downsampledcount = value

@property
def process_missed_count(self) -> int:
def processmissedcount(self) -> int:
"""
Gets the count of missed data processing.
"""

with self._process_missed_count_lock:
return self._process_missed_count
with self._processmissedcount_lock:
return self._processmissedcount

@process_missed_count.setter
def process_missed_count(self, value: np.int32):
@processmissedcount.setter
def processmissedcount(self, value: np.int32):
"""
Sets the count of missed data processing.
"""

with self._process_missed_count_lock:
self._process_missed_count = value
with self._processmissedcount_lock:
self._processmissedcount = value

def set_grouped_data_receiver(self, callback: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]]):
def set_groupeddata_receiver(self, callback: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]]):
"""
Defines the callback function that handles grouped data that has been received.
Defines the callback function that processes grouped data that has been received.
Function signature:
def handle_data(GroupedDataSubscriber subscriber, timestamp: np.uint64, data_buffer: Dict[np.uint64, Dict[UUID, Measurement]]):
def process_data(GroupedDataSubscriber subscriber, timestamp: np.uint64, databuffer: Dict[np.uint64, Dict[UUID, Measurement]]):
pass
"""

self._grouped_data_receiver = callback
self._groupeddata_receiver = callback

def _time_is_valid(self, timestamp: np.uint64) -> bool:
def _timeisvalid(self, timestamp: np.uint64) -> bool:
"""
Determines if the given timestamp is within the valid time range for data grouping.
"""
distance = Ticks.utcnow() - timestamp
lead_time = self.lead_time * Ticks.PERSECOND
lag_time = self.lag_time * Ticks.PERSECOND
leadtime = self.leadtime * Ticks.PERSECOND
lagtime = self.lagtime * Ticks.PERSECOND

return distance >= -lead_time and distance <= lag_time
return distance >= -leadtime and distance <= lagtime

def _round_to_nearest_second(self, timestamp: np.uint64) -> np.uint64:
def _round_to_nearestsecond(self, timestamp: np.uint64) -> np.uint64:
"""
Rounds the timestamp rounded to the nearest second.
"""

return timestamp - timestamp % Ticks.PERSECOND

def _round_to_subsecond_distribution(self, timestamp: np.uint64) -> np.uint64:
def _round_to_subseconddistribution(self, timestamp: np.uint64) -> np.uint64:
"""
Rounds the timestamp to the nearest subsecond distribution based on the number of samples per second.
"""

# Baseline timestamp to the top of the second
base_ticks = self._round_to_nearest_second(timestamp)
base_ticks = self._round_to_nearestsecond(timestamp)

# Remove the seconds from ticks
ticks_beyond_second = timestamp - base_ticks

# Calculate a frame index between 0 and m_framesPerSecond - 1,
# corresponding to ticks rounded to the nearest frame
frame_index = np.round(ticks_beyond_second / (Ticks.PERSECOND / self.samples_per_second))
frame_index = np.round(ticks_beyond_second / (Ticks.PERSECOND / self.samplespersecond))

# Calculate the timestamp of the nearest frame
destination_ticks = np.uint64(frame_index * Ticks.PERSECOND / self.samples_per_second)
destination_ticks = np.uint64(frame_index * Ticks.PERSECOND / self.samplespersecond)

# Recover the seconds that were removed
destination_ticks += base_ticks
Expand All @@ -210,42 +210,42 @@ def _new_measurements(self, measurements: List[Measurement]):
# Collect data into a map grouped by timestamps to the nearest second
for measurement in measurements:
# Get timestamp rounded to the nearest second
timestamp_second = self._round_to_nearest_second(measurement.timestamp)
timestamp_second = self._round_to_nearestsecond(measurement.timestamp)

if self._time_is_valid(timestamp_second):
if self._timeisvalid(timestamp_second):
# Create a new one-second timestamp map if it doesn't exist
if timestamp_second not in self._grouped_data:
self._grouped_data[timestamp_second] = {}
if timestamp_second not in self._groupeddata:
self._groupeddata[timestamp_second] = {}

# Get timestamp rounded to the nearest subsecond distribution, e.g., 000, 033, 066, 100 ms
timestamp_subsecond = self._round_to_subsecond_distribution(measurement.timestamp)
timestamp_subsecond = self._round_to_subseconddistribution(measurement.timestamp)

# Create a new subsecond timestamp map if it doesn't exist
if timestamp_subsecond not in self._grouped_data[timestamp_second]:
self._grouped_data[timestamp_second][timestamp_subsecond] = {}
if timestamp_subsecond not in self._groupeddata[timestamp_second]:
self._groupeddata[timestamp_second][timestamp_subsecond] = {}

# Append measurement to subsecond timestamp list, tracking downsampled measurements
if measurement.signalid in self._grouped_data[timestamp_second][timestamp_subsecond]:
with self._downsampled_count_lock:
self._downsampled_count += 1
if measurement.signalid in self._groupeddata[timestamp_second][timestamp_subsecond]:
with self._downsampledcount_lock:
self._downsampledcount += 1

self._grouped_data[timestamp_second][timestamp_subsecond][measurement.signalid] = measurement
self._groupeddata[timestamp_second][timestamp_subsecond][measurement.signalid] = measurement

# Check if it's time to publish grouped data, waiting for measurement_window_size to elapse. Note
# that this implementation depends on continuous data reception to trigger data publication. A more
# robust implementation would use a precision timer to trigger data publication.
current_time = Ticks.utcnow()
window_size = np.uint64(self.measurement_window_size * Ticks.PERSECOND)
currenttime = Ticks.utcnow()
windowsize = np.uint64(self.measurement_windowsize * Ticks.PERSECOND)

for timestamp in list(self._grouped_data.keys()):
if current_time - timestamp >= window_size:
grouped_data = self._grouped_data.pop(timestamp)
for timestamp in list(self._groupeddata.keys()):
if currenttime - timestamp >= windowsize:
groupeddata = self._groupeddata.pop(timestamp)

# Call user defined data function handler with one-second grouped data buffer on a separate thread
threading.Thread(target=self._publish_data, args=(timestamp, grouped_data), name="PublishDataThread").start()
threading.Thread(target=self._publish_data, args=(timestamp, groupeddata), name="PublishDataThread").start()

# Provide user feedback on data reception
if not self.display_measurement_summary or time() - self._lastmessage < 5.0:
if not self.display_measurementsummary or time() - self._lastmessage < 5.0:
return

try:
Expand All @@ -267,25 +267,25 @@ def _new_measurements(self, measurements: List[Measurement]):
finally:
self._lastmessage = time()

def _publish_data(self, timestamp: np.uint64, data_buffer: Dict[np.uint64, Dict[UUID, Measurement]]):
data_buffer_time_str = Ticks.to_shortstring(timestamp).split(".")[0]
def _publish_data(self, timestamp: np.uint64, databuffer: Dict[np.uint64, Dict[UUID, Measurement]]):
databuffer_timestr = Ticks.to_shortstring(timestamp).split(".")[0]

if self._process_lock.acquire(False):
try:
process_started = time()
processstarted = time()

if self._grouped_data_receiver is not None:
self._grouped_data_receiver(self, timestamp, data_buffer)
if self._groupeddata_receiver is not None:
self._groupeddata_receiver(self, timestamp, databuffer)

self.statusmessage(f"Data publication for buffer at {data_buffer_time_str} processed in {self._get_elapsed_time_str(time() - process_started)}.")
self.statusmessage(f"Data publication for buffer at {databuffer_timestr} processed in {self._elapsed_timestr(time() - processstarted)}.")
finally:
self._process_lock.release()
else:
with self._process_missed_count_lock:
self._process_missed_count += 1
self.errormessage(f"WARNING: Data publication missed for buffer at {data_buffer_time_str}, a previous data buffer is still processing. {self._process_missed_count:,} data sets missed so far...")
with self._processmissedcount_lock:
self._processmissedcount += 1
self.errormessage(f"WARNING: Data publication missed for buffer at {databuffer_timestr}, a previous data buffer is still processing. {self._processmissedcount:,} data sets missed so far...")

def _get_elapsed_time_str(self, elapsed: float) -> str:
def _elapsed_timestr(self, elapsed: float) -> str:
hours, rem = divmod(elapsed, 3600)
minutes, seconds = divmod(rem, 60)
milliseconds = (elapsed - int(elapsed)) * 1000
Expand All @@ -309,10 +309,10 @@ def _connection_terminated(self):
self._lastmessage = 0.0

# Reset grouped data on disconnect
self.downsampled_count = 0
self.downsampledcount = 0

# Reset process missed count on disconnect
self.process_missed_count = 0
self.processmissedcount = 0

def main():
parser = argparse.ArgumentParser()
Expand All @@ -327,7 +327,7 @@ def main():
subscriber = GroupedDataSubscriber()

# Set user defined callback function to handle grouped data:
subscriber.set_grouped_data_receiver(process_data)
subscriber.set_groupeddata_receiver(process_data)

try:
subscriber.subscribe("FILTER ActiveMeasurements WHERE SignalType = 'FREQ'", subscriber.settings)
Expand All @@ -338,15 +338,15 @@ def main():
finally:
subscriber.dispose()

def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_buffer: Dict[np.uint64, Dict[UUID, Measurement]]):
def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, databuffer: Dict[np.uint64, Dict[UUID, Measurement]]):
"""
User defined callback function that handles grouped data that has been received.
Note: This function is called by the subscriber when grouped data is available for processing.
The function will only be called once per second with a buffer of grouped data for the second.
If the function processing time exceeds the one second window, a warning message will be displayed
and the data will be skipped. The number of skipped data sets is tracked and reported through the
process_missed_count property.
and new data will be skipped. The number of skipped data sets is tracked and reported through the
processmissed count property.
Parameters:
timestamp: The timestamp, at top of second, for the grouped data
Expand All @@ -360,7 +360,7 @@ def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_b
frequency_count = 0

# Loop through each set of measurement groups in the one second buffer
for measurements in data_buffer.values():
for measurements in databuffer.values():
# To use subsecond timestamp values, you can use the following loop instead:
# for subsecond_timestamp, measurements in data_buffer.items():

Expand Down Expand Up @@ -403,9 +403,9 @@ def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_b

subscriber.statusmessage(f"\nAverage frequency for {frequency_count:,} values in second {Ticks.to_datetime(timestamp).second}: {average_frequency:.6f} Hz")

if subscriber.downsampled_count > 0:
subscriber.statusmessage(f" Downsampled {subscriber.downsampled_count:,} measurements in last measurement set...")
subscriber.downsampled_count = 0
if subscriber.downsampledcount > 0:
subscriber.statusmessage(f" WARNING: {subscriber.downsampledcount:,} measurements downsampled in last measurement set...")
subscriber.downsampledcount = 0

if __name__ == "__main__":
main()

0 comments on commit ce8d0a9

Please sign in to comment.