From ce8d0a94608fdc8e7b745ff06bbc7d3fac0ff8d6 Mon Sep 17 00:00:00 2001 From: "J. Ritchie Carroll" Date: Tue, 30 Jul 2024 18:46:12 -0500 Subject: [PATCH] Updated identifier naming conventions in GroupedDataSubscriber to better conform with Python norms --- examples/groupeddatasubscribe/main.py | 156 +++++++++++++------------- 1 file changed, 78 insertions(+), 78 deletions(-) diff --git a/examples/groupeddatasubscribe/main.py b/examples/groupeddatasubscribe/main.py index 4c95211..46cd0f4 100644 --- a/examples/groupeddatasubscribe/main.py +++ b/examples/groupeddatasubscribe/main.py @@ -53,11 +53,11 @@ class GroupedDataSubscriber(Subscriber): If incoming frame rate is higher than the samples per second, or timestamp alignment does not accurately coinside with the subsecond distribution, some data will be downsampled. Downsampled data count is tracked and reported to through the - downsampled_count property. + downsampledcount property. Only a single one-second data buffer will be published at a time. If data cannot be processed within the one-second window, a warning message will be displayed and the data will be skipped. The number of skipped data sets is tracked - and reported through the process_missed_count property. + and reported through the processmissedcount property. This example depends on a semi-accurate system clock to group data by timestamp. If the system clock is not accurate, data may not be grouped as expected. @@ -70,46 +70,46 @@ def __init__(self): self.config = Config() self.settings = Settings() - self.measurement_window_size = 1 + self.measurement_windowsize = 1 """ Defines measurement window size, in whole seconds, for data grouping. """ - self.lag_time = 5.0 + self.lagtime = 5.0 """ Defines the lag time, in seconds, for data grouping. Data received outside of this past time limit, relative to local clock, will be discarded. """ - self.lead_time = 5.0 + self.leadtime = 5.0 """ Defines the lead time, in seconds, for data grouping. Data received outside this future time limit, relative to local clock, will be discarded. """ - self.samples_per_second = 30 + self.samplespersecond = 30 """ Defines the number of samples per second for the data in the stream. """ - self.display_measurement_summary = False + self.display_measurementsummary = False """ Defines if the subscriber should display a summary of received measurements every few seconds. """ - self._grouped_data: Dict[np.uint64, Dict[UUID, Measurement]] = {} - self._grouped_data_receiver: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]] = None + self._groupeddata: Dict[np.uint64, Dict[UUID, Measurement]] = {} + self._groupeddata_receiver: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]] = None self._lastmessage = 0.0 - self._downsampled_count_lock = threading.Lock() - self._downsampled_count = 0 + self._downsampledcount_lock = threading.Lock() + self._downsampledcount = 0 self._process_lock = threading.Lock() - self._process_missed_count_lock = threading.Lock() - self._process_missed_count = 0 + self._processmissedcount_lock = threading.Lock() + self._processmissedcount = 0 # Set up event handlers for STTP API self.set_subscriptionupdated_receiver(self._subscription_updated) @@ -117,86 +117,86 @@ def __init__(self): self.set_connectionterminated_receiver(self._connection_terminated) @property - def downsampled_count(self) -> int: + def downsampledcount(self) -> int: """ Gets the count of downsampled measurements. """ - with self._downsampled_count_lock: - return self._downsampled_count + with self._downsampledcount_lock: + return self._downsampledcount - @downsampled_count.setter - def downsampled_count(self, value: np.int32): + @downsampledcount.setter + def downsampledcount(self, value: np.int32): """ Sets the count of downsampled measurements. """ - with self._downsampled_count_lock: - self._downsampled_count = value + with self._downsampledcount_lock: + self._downsampledcount = value @property - def process_missed_count(self) -> int: + def processmissedcount(self) -> int: """ Gets the count of missed data processing. """ - with self._process_missed_count_lock: - return self._process_missed_count + with self._processmissedcount_lock: + return self._processmissedcount - @process_missed_count.setter - def process_missed_count(self, value: np.int32): + @processmissedcount.setter + def processmissedcount(self, value: np.int32): """ Sets the count of missed data processing. """ - with self._process_missed_count_lock: - self._process_missed_count = value + with self._processmissedcount_lock: + self._processmissedcount = value - def set_grouped_data_receiver(self, callback: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]]): + def set_groupeddata_receiver(self, callback: Optional[Callable[[GroupedDataSubscriber, np.uint64, Dict[np.uint64, Dict[UUID, Measurement]]], None]]): """ - Defines the callback function that handles grouped data that has been received. + Defines the callback function that processes grouped data that has been received. Function signature: - def handle_data(GroupedDataSubscriber subscriber, timestamp: np.uint64, data_buffer: Dict[np.uint64, Dict[UUID, Measurement]]): + def process_data(GroupedDataSubscriber subscriber, timestamp: np.uint64, databuffer: Dict[np.uint64, Dict[UUID, Measurement]]): pass """ - self._grouped_data_receiver = callback + self._groupeddata_receiver = callback - def _time_is_valid(self, timestamp: np.uint64) -> bool: + def _timeisvalid(self, timestamp: np.uint64) -> bool: """ Determines if the given timestamp is within the valid time range for data grouping. """ distance = Ticks.utcnow() - timestamp - lead_time = self.lead_time * Ticks.PERSECOND - lag_time = self.lag_time * Ticks.PERSECOND + leadtime = self.leadtime * Ticks.PERSECOND + lagtime = self.lagtime * Ticks.PERSECOND - return distance >= -lead_time and distance <= lag_time + return distance >= -leadtime and distance <= lagtime - def _round_to_nearest_second(self, timestamp: np.uint64) -> np.uint64: + def _round_to_nearestsecond(self, timestamp: np.uint64) -> np.uint64: """ Rounds the timestamp rounded to the nearest second. """ return timestamp - timestamp % Ticks.PERSECOND - def _round_to_subsecond_distribution(self, timestamp: np.uint64) -> np.uint64: + def _round_to_subseconddistribution(self, timestamp: np.uint64) -> np.uint64: """ Rounds the timestamp to the nearest subsecond distribution based on the number of samples per second. """ # Baseline timestamp to the top of the second - base_ticks = self._round_to_nearest_second(timestamp) + base_ticks = self._round_to_nearestsecond(timestamp) # Remove the seconds from ticks ticks_beyond_second = timestamp - base_ticks # Calculate a frame index between 0 and m_framesPerSecond - 1, # corresponding to ticks rounded to the nearest frame - frame_index = np.round(ticks_beyond_second / (Ticks.PERSECOND / self.samples_per_second)) + frame_index = np.round(ticks_beyond_second / (Ticks.PERSECOND / self.samplespersecond)) # Calculate the timestamp of the nearest frame - destination_ticks = np.uint64(frame_index * Ticks.PERSECOND / self.samples_per_second) + destination_ticks = np.uint64(frame_index * Ticks.PERSECOND / self.samplespersecond) # Recover the seconds that were removed destination_ticks += base_ticks @@ -210,42 +210,42 @@ def _new_measurements(self, measurements: List[Measurement]): # Collect data into a map grouped by timestamps to the nearest second for measurement in measurements: # Get timestamp rounded to the nearest second - timestamp_second = self._round_to_nearest_second(measurement.timestamp) + timestamp_second = self._round_to_nearestsecond(measurement.timestamp) - if self._time_is_valid(timestamp_second): + if self._timeisvalid(timestamp_second): # Create a new one-second timestamp map if it doesn't exist - if timestamp_second not in self._grouped_data: - self._grouped_data[timestamp_second] = {} + if timestamp_second not in self._groupeddata: + self._groupeddata[timestamp_second] = {} # Get timestamp rounded to the nearest subsecond distribution, e.g., 000, 033, 066, 100 ms - timestamp_subsecond = self._round_to_subsecond_distribution(measurement.timestamp) + timestamp_subsecond = self._round_to_subseconddistribution(measurement.timestamp) # Create a new subsecond timestamp map if it doesn't exist - if timestamp_subsecond not in self._grouped_data[timestamp_second]: - self._grouped_data[timestamp_second][timestamp_subsecond] = {} + if timestamp_subsecond not in self._groupeddata[timestamp_second]: + self._groupeddata[timestamp_second][timestamp_subsecond] = {} # Append measurement to subsecond timestamp list, tracking downsampled measurements - if measurement.signalid in self._grouped_data[timestamp_second][timestamp_subsecond]: - with self._downsampled_count_lock: - self._downsampled_count += 1 + if measurement.signalid in self._groupeddata[timestamp_second][timestamp_subsecond]: + with self._downsampledcount_lock: + self._downsampledcount += 1 - self._grouped_data[timestamp_second][timestamp_subsecond][measurement.signalid] = measurement + self._groupeddata[timestamp_second][timestamp_subsecond][measurement.signalid] = measurement # Check if it's time to publish grouped data, waiting for measurement_window_size to elapse. Note # that this implementation depends on continuous data reception to trigger data publication. A more # robust implementation would use a precision timer to trigger data publication. - current_time = Ticks.utcnow() - window_size = np.uint64(self.measurement_window_size * Ticks.PERSECOND) + currenttime = Ticks.utcnow() + windowsize = np.uint64(self.measurement_windowsize * Ticks.PERSECOND) - for timestamp in list(self._grouped_data.keys()): - if current_time - timestamp >= window_size: - grouped_data = self._grouped_data.pop(timestamp) + for timestamp in list(self._groupeddata.keys()): + if currenttime - timestamp >= windowsize: + groupeddata = self._groupeddata.pop(timestamp) # Call user defined data function handler with one-second grouped data buffer on a separate thread - threading.Thread(target=self._publish_data, args=(timestamp, grouped_data), name="PublishDataThread").start() + threading.Thread(target=self._publish_data, args=(timestamp, groupeddata), name="PublishDataThread").start() # Provide user feedback on data reception - if not self.display_measurement_summary or time() - self._lastmessage < 5.0: + if not self.display_measurementsummary or time() - self._lastmessage < 5.0: return try: @@ -267,25 +267,25 @@ def _new_measurements(self, measurements: List[Measurement]): finally: self._lastmessage = time() - def _publish_data(self, timestamp: np.uint64, data_buffer: Dict[np.uint64, Dict[UUID, Measurement]]): - data_buffer_time_str = Ticks.to_shortstring(timestamp).split(".")[0] + def _publish_data(self, timestamp: np.uint64, databuffer: Dict[np.uint64, Dict[UUID, Measurement]]): + databuffer_timestr = Ticks.to_shortstring(timestamp).split(".")[0] if self._process_lock.acquire(False): try: - process_started = time() + processstarted = time() - if self._grouped_data_receiver is not None: - self._grouped_data_receiver(self, timestamp, data_buffer) + if self._groupeddata_receiver is not None: + self._groupeddata_receiver(self, timestamp, databuffer) - self.statusmessage(f"Data publication for buffer at {data_buffer_time_str} processed in {self._get_elapsed_time_str(time() - process_started)}.") + self.statusmessage(f"Data publication for buffer at {databuffer_timestr} processed in {self._elapsed_timestr(time() - processstarted)}.") finally: self._process_lock.release() else: - with self._process_missed_count_lock: - self._process_missed_count += 1 - self.errormessage(f"WARNING: Data publication missed for buffer at {data_buffer_time_str}, a previous data buffer is still processing. {self._process_missed_count:,} data sets missed so far...") + with self._processmissedcount_lock: + self._processmissedcount += 1 + self.errormessage(f"WARNING: Data publication missed for buffer at {databuffer_timestr}, a previous data buffer is still processing. {self._processmissedcount:,} data sets missed so far...") - def _get_elapsed_time_str(self, elapsed: float) -> str: + def _elapsed_timestr(self, elapsed: float) -> str: hours, rem = divmod(elapsed, 3600) minutes, seconds = divmod(rem, 60) milliseconds = (elapsed - int(elapsed)) * 1000 @@ -309,10 +309,10 @@ def _connection_terminated(self): self._lastmessage = 0.0 # Reset grouped data on disconnect - self.downsampled_count = 0 + self.downsampledcount = 0 # Reset process missed count on disconnect - self.process_missed_count = 0 + self.processmissedcount = 0 def main(): parser = argparse.ArgumentParser() @@ -327,7 +327,7 @@ def main(): subscriber = GroupedDataSubscriber() # Set user defined callback function to handle grouped data: - subscriber.set_grouped_data_receiver(process_data) + subscriber.set_groupeddata_receiver(process_data) try: subscriber.subscribe("FILTER ActiveMeasurements WHERE SignalType = 'FREQ'", subscriber.settings) @@ -338,15 +338,15 @@ def main(): finally: subscriber.dispose() -def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_buffer: Dict[np.uint64, Dict[UUID, Measurement]]): +def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, databuffer: Dict[np.uint64, Dict[UUID, Measurement]]): """ User defined callback function that handles grouped data that has been received. Note: This function is called by the subscriber when grouped data is available for processing. The function will only be called once per second with a buffer of grouped data for the second. If the function processing time exceeds the one second window, a warning message will be displayed - and the data will be skipped. The number of skipped data sets is tracked and reported through the - process_missed_count property. + and new data will be skipped. The number of skipped data sets is tracked and reported through the + processmissed count property. Parameters: timestamp: The timestamp, at top of second, for the grouped data @@ -360,7 +360,7 @@ def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_b frequency_count = 0 # Loop through each set of measurement groups in the one second buffer - for measurements in data_buffer.values(): + for measurements in databuffer.values(): # To use subsecond timestamp values, you can use the following loop instead: # for subsecond_timestamp, measurements in data_buffer.items(): @@ -403,9 +403,9 @@ def process_data(subscriber: GroupedDataSubscriber, timestamp: np.uint64, data_b subscriber.statusmessage(f"\nAverage frequency for {frequency_count:,} values in second {Ticks.to_datetime(timestamp).second}: {average_frequency:.6f} Hz") - if subscriber.downsampled_count > 0: - subscriber.statusmessage(f" Downsampled {subscriber.downsampled_count:,} measurements in last measurement set...") - subscriber.downsampled_count = 0 + if subscriber.downsampledcount > 0: + subscriber.statusmessage(f" WARNING: {subscriber.downsampledcount:,} measurements downsampled in last measurement set...") + subscriber.downsampledcount = 0 if __name__ == "__main__": main()