From 110bbb122aa7ac8ce9c8acb33dd083df674270eb Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 20 Aug 2024 14:56:25 -0500 Subject: [PATCH 1/8] feat(block_analysis): new function to extract `pellet_times` WIP --- aeon/dj_pipeline/analysis/block_analysis.py | 62 +++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index e05c70d8..36d6b0a9 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -798,3 +798,65 @@ class AnalysisNote(dj.Manual): note_type='': varchar(64) note: varchar(3000) """ + +# ---- Helper Functions ---- + + +def get_pellets(patch_key, block_start, block_end): + """ + 1. Get all patch state update timestamps: let's call these events "A" + 2. Remove all "A" events near manual pellet delivery events (so we don't include manual pellet delivery events in downstream analysis) + 3. For the remaining "A" events, find the nearest delivery event within 1s: for this delivery event, check if there are any repeat delivery events within 0.5 seconds - take the last of these as the pellet delivery timestamp (discard all "A" events that don't have such a corresponding delivery event) + 4. Now for these 'clean' "A" events, go back in time to the SECOND preceding pellet threshold value: this is the threshold value for this pellet delivery (as seen in this image we discussed before) + """ + # key = {'experiment_name': 'social0.3-aeon3', 'block_start': '2024-06-26 10:52:10.001984', + # 'block_end': '2024-06-26 12:57:18'} + chunk_restriction = acquisition.create_chunk_restriction( + patch_key["experiment_name"], block_start, block_end + ) + # pellet delivery and patch threshold data + beam_break_df = fetch_stream( + streams.UndergroundFeederBeamBreak & patch_key & chunk_restriction + )[block_start:block_end] + depletion_state_df = fetch_stream( + streams.UndergroundFeederDepletionState & patch_key & chunk_restriction + )[block_start:block_end] + # remove NaNs from threshold column + depletion_state_df = depletion_state_df.dropna(subset=["threshold"]) + # identify & remove invalid indices where the time difference is less than 1 second + invalid_indices = np.where(depletion_state_df.index.to_series().diff().dt.total_seconds() < 1)[0] + depletion_state_df = depletion_state_df.drop(depletion_state_df.index[invalid_indices]) + + # find pellet times approximately coincide with each threshold update (nearest pellet delivery within 1s) + delivered_pellet_ts = beam_break_df.index + pellet_ts_threshold_df = depletion_state_df.copy() + pellet_ts_threshold_df["pellet_timestamp"] = pd.NaT + for threshold_idx in range(len(pellet_ts_threshold_df)): + threshold_time = pellet_ts_threshold_df.index[threshold_idx] + pellet_time = delivered_pellet_ts[np.searchsorted(delivered_pellet_ts, threshold_time)] + if abs(pellet_time - threshold_time) > pd.Timedelta(seconds=1): + # no pellet delivery within 1s of threshold update (this is unexpected) + continue + pellet_ts_threshold_df.pellet_timestamp.iloc[threshold_idx] = pellet_time + # remove NaNs from pellet_timestamp column (last row) + pellet_ts_threshold_df = pellet_ts_threshold_df.dropna(subset=["pellet_timestamp"]) + + + # find pellet times associated with each threshold update + # for each threshold, find the time of the next threshold update, + # find the closest beam break after this update time, + # and use this beam break time as the delivery time for the initial threshold + pellet_ts_threshold_df = depletion_state_df.copy() + pellet_ts_threshold_df["pellet_timestamp"] = pd.NaT + for threshold_idx in range(len(pellet_ts_threshold_df) - 1): + if np.isnan(pellet_ts_threshold_df.threshold.iloc[threshold_idx]): + continue + next_threshold_time = pellet_ts_threshold_df.index[threshold_idx + 1] + post_thresh_pellet_ts = beam_break_df.index[beam_break_df.index > next_threshold_time] + if post_thresh_pellet_ts.empty: + break + next_beam_break = post_thresh_pellet_ts[np.searchsorted(post_thresh_pellet_ts, next_threshold_time)] + pellet_ts_threshold_df.pellet_timestamp.iloc[threshold_idx] = next_beam_break + # remove NaNs from pellet_timestamp column (last row) + pellet_ts_threshold_df = pellet_ts_threshold_df.dropna(subset=["pellet_timestamp"]) + From 2371dc37cc89291e2a15d0f5e0a7f4b549f9a498 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 21 Aug 2024 16:51:36 -0500 Subject: [PATCH 2/8] feat: new logic to identify delivered pellet associated with each threshold update --- aeon/dj_pipeline/analysis/block_analysis.py | 78 ++++++--------------- 1 file changed, 21 insertions(+), 57 deletions(-) diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 36d6b0a9..d0d7a362 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -190,35 +190,11 @@ def make(self, key): for patch_key, patch_name in zip(patch_keys, patch_names): # pellet delivery and patch threshold data - beam_break_df = fetch_stream( - streams.UndergroundFeederBeamBreak & patch_key & chunk_restriction - )[block_start:block_end] depletion_state_df = fetch_stream( streams.UndergroundFeederDepletionState & patch_key & chunk_restriction )[block_start:block_end] - # remove NaNs from threshold column - depletion_state_df = depletion_state_df.dropna(subset=["threshold"]) - # identify & remove invalid indices where the time difference is less than 1 second - invalid_indices = np.where(depletion_state_df.index.to_series().diff().dt.total_seconds() < 1)[0] - depletion_state_df = depletion_state_df.drop(depletion_state_df.index[invalid_indices]) - - # find pellet times associated with each threshold update - # for each threshold, find the time of the next threshold update, - # find the closest beam break after this update time, - # and use this beam break time as the delivery time for the initial threshold - pellet_ts_threshold_df = depletion_state_df.copy() - pellet_ts_threshold_df["pellet_timestamp"] = pd.NaT - for threshold_idx in range(len(pellet_ts_threshold_df) - 1): - if np.isnan(pellet_ts_threshold_df.threshold.iloc[threshold_idx]): - continue - next_threshold_time = pellet_ts_threshold_df.index[threshold_idx + 1] - post_thresh_pellet_ts = beam_break_df.index[beam_break_df.index > next_threshold_time] - if post_thresh_pellet_ts.empty: - break - next_beam_break = post_thresh_pellet_ts[np.searchsorted(post_thresh_pellet_ts, next_threshold_time)] - pellet_ts_threshold_df.pellet_timestamp.iloc[threshold_idx] = next_beam_break - # remove NaNs from pellet_timestamp column (last row) - pellet_ts_threshold_df = pellet_ts_threshold_df.dropna(subset=["pellet_timestamp"]) + + pellet_ts_threshold_df = get_threshold_associated_pellets(patch_key, block_start, block_end) # wheel encoder data encoder_df = fetch_stream(streams.UndergroundFeederEncoder & patch_key & chunk_restriction)[ @@ -802,61 +778,49 @@ class AnalysisNote(dj.Manual): # ---- Helper Functions ---- -def get_pellets(patch_key, block_start, block_end): +def get_threshold_associated_pellets(patch_key, start, end): """ + Retrieve the pellet delivery timestamps associated with each patch threshold update within the specified start-end time. 1. Get all patch state update timestamps: let's call these events "A" 2. Remove all "A" events near manual pellet delivery events (so we don't include manual pellet delivery events in downstream analysis) 3. For the remaining "A" events, find the nearest delivery event within 1s: for this delivery event, check if there are any repeat delivery events within 0.5 seconds - take the last of these as the pellet delivery timestamp (discard all "A" events that don't have such a corresponding delivery event) 4. Now for these 'clean' "A" events, go back in time to the SECOND preceding pellet threshold value: this is the threshold value for this pellet delivery (as seen in this image we discussed before) """ - # key = {'experiment_name': 'social0.3-aeon3', 'block_start': '2024-06-26 10:52:10.001984', - # 'block_end': '2024-06-26 12:57:18'} chunk_restriction = acquisition.create_chunk_restriction( - patch_key["experiment_name"], block_start, block_end + patch_key["experiment_name"], start, end ) # pellet delivery and patch threshold data - beam_break_df = fetch_stream( - streams.UndergroundFeederBeamBreak & patch_key & chunk_restriction - )[block_start:block_end] + delivered_pellet_df = fetch_stream( + streams.UndergroundFeederDeliverPellet & patch_key & chunk_restriction + )[start:end] depletion_state_df = fetch_stream( streams.UndergroundFeederDepletionState & patch_key & chunk_restriction - )[block_start:block_end] + )[start:end] # remove NaNs from threshold column depletion_state_df = depletion_state_df.dropna(subset=["threshold"]) # identify & remove invalid indices where the time difference is less than 1 second invalid_indices = np.where(depletion_state_df.index.to_series().diff().dt.total_seconds() < 1)[0] depletion_state_df = depletion_state_df.drop(depletion_state_df.index[invalid_indices]) - # find pellet times approximately coincide with each threshold update (nearest pellet delivery within 1s) - delivered_pellet_ts = beam_break_df.index + # find pellet times approximately coincide with each threshold update + # i.e. nearest pellet delivery within 100ms before or after threshold update + delivered_pellet_ts = delivered_pellet_df.index pellet_ts_threshold_df = depletion_state_df.copy() pellet_ts_threshold_df["pellet_timestamp"] = pd.NaT for threshold_idx in range(len(pellet_ts_threshold_df)): threshold_time = pellet_ts_threshold_df.index[threshold_idx] - pellet_time = delivered_pellet_ts[np.searchsorted(delivered_pellet_ts, threshold_time)] - if abs(pellet_time - threshold_time) > pd.Timedelta(seconds=1): - # no pellet delivery within 1s of threshold update (this is unexpected) + within_range_pellet_ts = np.logical_and(delivered_pellet_ts >= threshold_time - pd.Timedelta(milliseconds=100), + delivered_pellet_ts <= threshold_time + pd.Timedelta(milliseconds=100)) + if not within_range_pellet_ts.any(): continue + pellet_time = delivered_pellet_ts[within_range_pellet_ts][-1] pellet_ts_threshold_df.pellet_timestamp.iloc[threshold_idx] = pellet_time - # remove NaNs from pellet_timestamp column (last row) - pellet_ts_threshold_df = pellet_ts_threshold_df.dropna(subset=["pellet_timestamp"]) - - # find pellet times associated with each threshold update - # for each threshold, find the time of the next threshold update, - # find the closest beam break after this update time, - # and use this beam break time as the delivery time for the initial threshold - pellet_ts_threshold_df = depletion_state_df.copy() - pellet_ts_threshold_df["pellet_timestamp"] = pd.NaT - for threshold_idx in range(len(pellet_ts_threshold_df) - 1): - if np.isnan(pellet_ts_threshold_df.threshold.iloc[threshold_idx]): - continue - next_threshold_time = pellet_ts_threshold_df.index[threshold_idx + 1] - post_thresh_pellet_ts = beam_break_df.index[beam_break_df.index > next_threshold_time] - if post_thresh_pellet_ts.empty: - break - next_beam_break = post_thresh_pellet_ts[np.searchsorted(post_thresh_pellet_ts, next_threshold_time)] - pellet_ts_threshold_df.pellet_timestamp.iloc[threshold_idx] = next_beam_break + # remove rows of threshold updates without corresponding pellet times from i.e. pellet_timestamp is NaN + pellet_ts_threshold_df = pellet_ts_threshold_df.dropna(subset=["pellet_timestamp"]) + # shift back the pellet_timestamp values by 1 to match the pellet_timestamp with the previous threshold update + pellet_ts_threshold_df.pellet_timestamp = pellet_ts_threshold_df.pellet_timestamp.shift(-1) # remove NaNs from pellet_timestamp column (last row) pellet_ts_threshold_df = pellet_ts_threshold_df.dropna(subset=["pellet_timestamp"]) + return pellet_ts_threshold_df From 764fdb018d97314f4df53cb2abc16f348fc282c0 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 22 Aug 2024 09:09:58 -0500 Subject: [PATCH 3/8] fix: minor bugfix --- aeon/dj_pipeline/analysis/block_analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index d0d7a362..30e8b258 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -162,7 +162,7 @@ def make(self, key): chunk_keys = (acquisition.Chunk & key & chunk_restriction).fetch("KEY") streams_tables = ( streams.UndergroundFeederDepletionState, - streams.UndergroundFeederBeamBreak, + streams.UndergroundFeederDeliverPellet, streams.UndergroundFeederEncoder, tracking.SLEAPTracking, ) From 352a41dc99a16f5724dc9a548b1c8a9e385e54b1 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 22 Aug 2024 17:26:43 -0500 Subject: [PATCH 4/8] feat(acquisition): add `EnvironmentActiveConfig` --- aeon/dj_pipeline/acquisition.py | 41 +++++++++++++++++++++++++++++++++ aeon/schema/schemas.py | 4 ++-- aeon/schema/social_03.py | 22 ++++++++++++++++++ 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index b20c1a0c..e7f0b66d 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -563,6 +563,47 @@ def make(self, key): ) +@schema +class EnvironmentActiveConfig(dj.Imported): + definition = """ # Environment Active Configuration + -> Chunk + """ + + class Name(dj.Part): + definition = """ + -> master + time: datetime(6) # time when the configuration is applied to the environment + --- + name: varchar(32) # name of the environment configuration + value: longblob # dictionary of the configuration + """ + + def make(self, key): + chunk_start, chunk_end = (Chunk & key).fetch1("chunk_start", "chunk_end") + data_dirs = Experiment.get_data_directories(key) + devices_schema = getattr( + aeon_schemas, + (Experiment.DevicesSchema & {"experiment_name": key["experiment_name"]}).fetch1( + "devices_schema_name" + ), + ) + device = devices_schema.Environment + stream_reader = device.ActiveConfiguration # expecting columns: time, name, value + stream_data = io_api.load( + root=data_dirs, + reader=stream_reader, + start=pd.Timestamp(chunk_start), + end=pd.Timestamp(chunk_end), + ) + + stream_data.reset_index(inplace=True) + for k, v in key.items(): + stream_data[k] = v + + self.insert1(key) + self.Name.insert(stream_data) + + # ---- HELPERS ---- diff --git a/aeon/schema/schemas.py b/aeon/schema/schemas.py index 2738d522..a1d26f75 100644 --- a/aeon/schema/schemas.py +++ b/aeon/schema/schemas.py @@ -116,7 +116,7 @@ social03 = DotMap( [ Device("Metadata", stream.Metadata), - Device("Environment", social_02.Environment, social_02.SubjectData), + Device("Environment", social_02.Environment, social_02.SubjectData, social_03.ActiveConfiguration), Device("CameraTop", stream.Video, social_03.Pose), Device("CameraNorth", stream.Video), Device("CameraSouth", stream.Video), @@ -147,7 +147,7 @@ social04 = DotMap( [ Device("Metadata", stream.Metadata), - Device("Environment", social_02.Environment, social_02.SubjectData), + Device("Environment", social_02.Environment, social_02.SubjectData, social_03.ActiveConfiguration), Device("CameraTop", stream.Video, social_03.Pose), Device("CameraNorth", stream.Video), Device("CameraSouth", stream.Video), diff --git a/aeon/schema/social_03.py b/aeon/schema/social_03.py index 558b39c9..483a7c86 100644 --- a/aeon/schema/social_03.py +++ b/aeon/schema/social_03.py @@ -1,3 +1,5 @@ +import json +import pandas as pd import aeon.io.reader as _reader from aeon.schema.streams import Stream @@ -6,3 +8,23 @@ class Pose(Stream): def __init__(self, path): super().__init__(_reader.Pose(f"{path}_202_*")) + + +class EnvActiveConfigReader(_reader.Reader): + def __init__(self, pattern, columns=["name", "value"], extension="jsonl"): + super().__init__(pattern, columns, extension) + + def read(self, file): + """Reads data from the specified jsonl file.""" + with open(file, "r") as f: + df = pd.read_json(f, lines=True) + df["name"] = df["value"].apply(lambda x: x["name"]) + df["time"] = pd.to_datetime(df["seconds"], unit="s") + df.set_index("time", inplace=True) + return df[self.columns] + + +class ActiveConfiguration(Stream): + + def __init__(self, path): + super().__init__(EnvActiveConfigReader(f"{path}_ActiveConfiguration_*")) From aa03752a0c063d0a04cef6d9592c1dfe99ee4d7e Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 29 Aug 2024 11:08:28 -0500 Subject: [PATCH 5/8] feat(reader): add a generic JsonList reader class --- aeon/io/reader.py | 16 ++++++++++++++++ aeon/schema/social_03.py | 16 ++++++---------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/aeon/io/reader.py b/aeon/io/reader.py index e5c86d5d..b82dee2d 100644 --- a/aeon/io/reader.py +++ b/aeon/io/reader.py @@ -135,6 +135,22 @@ def read(self, file): ) +class JsonList(Reader): + """Extracts data from json list (.jsonl) files, where the key "seconds" + stores the Aeon timestamp, in seconds. + """ + + def __init__(self, pattern, columns=None, extension="jsonl"): + super().__init__(pattern, columns, extension) + + def read(self, file): + """Reads data from the specified jsonl file.""" + with open(file, "r") as f: + df = pd.read_json(f, lines=True) + df.set_index("seconds", inplace=True) + return df + + class Subject(Csv): """Extracts metadata for subjects entering and exiting the environment. diff --git a/aeon/schema/social_03.py b/aeon/schema/social_03.py index 483a7c86..20b8544b 100644 --- a/aeon/schema/social_03.py +++ b/aeon/schema/social_03.py @@ -10,18 +10,14 @@ def __init__(self, path): super().__init__(_reader.Pose(f"{path}_202_*")) -class EnvActiveConfigReader(_reader.Reader): - def __init__(self, pattern, columns=["name", "value"], extension="jsonl"): - super().__init__(pattern, columns, extension) +class EnvActiveConfigReader(_reader.JsonList): + def __init__(self, pattern): + super().__init__(pattern) def read(self, file): - """Reads data from the specified jsonl file.""" - with open(file, "r") as f: - df = pd.read_json(f, lines=True) - df["name"] = df["value"].apply(lambda x: x["name"]) - df["time"] = pd.to_datetime(df["seconds"], unit="s") - df.set_index("time", inplace=True) - return df[self.columns] + data = super().read(file) + data["name"] = data["value"].apply(lambda x: x["name"]) + return data class ActiveConfiguration(Stream): From 55e4b14049b854a627f1b337a598f7127e2da88b Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 12 Sep 2024 09:18:58 -0500 Subject: [PATCH 6/8] fix(env_active_config): rename, apply PR review suggestions --- aeon/dj_pipeline/acquisition.py | 4 ++-- aeon/schema/schemas.py | 4 ++-- aeon/schema/social_03.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index e7f0b66d..dd83c7d4 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -564,7 +564,7 @@ def make(self, key): @schema -class EnvironmentActiveConfig(dj.Imported): +class EnvironmentActiveConfiguration(dj.Imported): definition = """ # Environment Active Configuration -> Chunk """ @@ -588,7 +588,7 @@ def make(self, key): ), ) device = devices_schema.Environment - stream_reader = device.ActiveConfiguration # expecting columns: time, name, value + stream_reader = device.EnvActiveConfiguration # expecting columns: time, name, value stream_data = io_api.load( root=data_dirs, reader=stream_reader, diff --git a/aeon/schema/schemas.py b/aeon/schema/schemas.py index a1d26f75..acbdfea8 100644 --- a/aeon/schema/schemas.py +++ b/aeon/schema/schemas.py @@ -116,7 +116,7 @@ social03 = DotMap( [ Device("Metadata", stream.Metadata), - Device("Environment", social_02.Environment, social_02.SubjectData, social_03.ActiveConfiguration), + Device("Environment", social_02.Environment, social_02.SubjectData, social_03.EnvActiveConfiguration), Device("CameraTop", stream.Video, social_03.Pose), Device("CameraNorth", stream.Video), Device("CameraSouth", stream.Video), @@ -147,7 +147,7 @@ social04 = DotMap( [ Device("Metadata", stream.Metadata), - Device("Environment", social_02.Environment, social_02.SubjectData, social_03.ActiveConfiguration), + Device("Environment", social_02.Environment, social_02.SubjectData, social_03.EnvActiveConfiguration), Device("CameraTop", stream.Video, social_03.Pose), Device("CameraNorth", stream.Video), Device("CameraSouth", stream.Video), diff --git a/aeon/schema/social_03.py b/aeon/schema/social_03.py index 20b8544b..a2f80580 100644 --- a/aeon/schema/social_03.py +++ b/aeon/schema/social_03.py @@ -10,7 +10,7 @@ def __init__(self, path): super().__init__(_reader.Pose(f"{path}_202_*")) -class EnvActiveConfigReader(_reader.JsonList): +class EnvActiveConfigurationReader(_reader.JsonList): def __init__(self, pattern): super().__init__(pattern) @@ -20,7 +20,7 @@ def read(self, file): return data -class ActiveConfiguration(Stream): +class EnvActiveConfiguration(Stream): def __init__(self, path): - super().__init__(EnvActiveConfigReader(f"{path}_ActiveConfiguration_*")) + super().__init__(EnvActiveConfigurationReader(f"{path}_ActiveConfiguration_*")) From a1f46009b2f176d422b6713c5a2f49268d94818a Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 12 Sep 2024 14:30:31 -0500 Subject: [PATCH 7/8] feat(reader): handle `columns` in `JsonList` reader --- aeon/io/reader.py | 6 +++++- aeon/schema/social_03.py | 12 +----------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/aeon/io/reader.py b/aeon/io/reader.py index b82dee2d..4d455432 100644 --- a/aeon/io/reader.py +++ b/aeon/io/reader.py @@ -140,14 +140,18 @@ class JsonList(Reader): stores the Aeon timestamp, in seconds. """ - def __init__(self, pattern, columns=None, extension="jsonl"): + def __init__(self, pattern, columns=(), root_key="value", extension="jsonl"): super().__init__(pattern, columns, extension) + self.columns = columns + self.root_key = root_key def read(self, file): """Reads data from the specified jsonl file.""" with open(file, "r") as f: df = pd.read_json(f, lines=True) df.set_index("seconds", inplace=True) + for column in self.columns: + df[column] = df[self.root_key].apply(lambda x: x[column]) return df diff --git a/aeon/schema/social_03.py b/aeon/schema/social_03.py index a2f80580..2102c6a0 100644 --- a/aeon/schema/social_03.py +++ b/aeon/schema/social_03.py @@ -10,17 +10,7 @@ def __init__(self, path): super().__init__(_reader.Pose(f"{path}_202_*")) -class EnvActiveConfigurationReader(_reader.JsonList): - def __init__(self, pattern): - super().__init__(pattern) - - def read(self, file): - data = super().read(file) - data["name"] = data["value"].apply(lambda x: x["name"]) - return data - - class EnvActiveConfiguration(Stream): def __init__(self, path): - super().__init__(EnvActiveConfigurationReader(f"{path}_ActiveConfiguration_*")) + super().__init__(_reader.JsonList(f"{path}_ActiveConfiguration_*", columns=["name"])) From 9880eb681aa041f42629ff3bf717125894a07714 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 18 Sep 2024 09:19:30 -0500 Subject: [PATCH 8/8] fix(environment_active_config): rename stream class name --- aeon/dj_pipeline/acquisition.py | 2 +- aeon/schema/schemas.py | 4 ++-- aeon/schema/social_03.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index dd83c7d4..eac5b0e2 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -588,7 +588,7 @@ def make(self, key): ), ) device = devices_schema.Environment - stream_reader = device.EnvActiveConfiguration # expecting columns: time, name, value + stream_reader = device.EnvironmentActiveConfiguration # expecting columns: time, name, value stream_data = io_api.load( root=data_dirs, reader=stream_reader, diff --git a/aeon/schema/schemas.py b/aeon/schema/schemas.py index acbdfea8..a843b097 100644 --- a/aeon/schema/schemas.py +++ b/aeon/schema/schemas.py @@ -116,7 +116,7 @@ social03 = DotMap( [ Device("Metadata", stream.Metadata), - Device("Environment", social_02.Environment, social_02.SubjectData, social_03.EnvActiveConfiguration), + Device("Environment", social_02.Environment, social_02.SubjectData, social_03.EnvironmentActiveConfiguration), Device("CameraTop", stream.Video, social_03.Pose), Device("CameraNorth", stream.Video), Device("CameraSouth", stream.Video), @@ -147,7 +147,7 @@ social04 = DotMap( [ Device("Metadata", stream.Metadata), - Device("Environment", social_02.Environment, social_02.SubjectData, social_03.EnvActiveConfiguration), + Device("Environment", social_02.Environment, social_02.SubjectData, social_03.EnvironmentActiveConfiguration), Device("CameraTop", stream.Video, social_03.Pose), Device("CameraNorth", stream.Video), Device("CameraSouth", stream.Video), diff --git a/aeon/schema/social_03.py b/aeon/schema/social_03.py index 2102c6a0..a3bc2cdf 100644 --- a/aeon/schema/social_03.py +++ b/aeon/schema/social_03.py @@ -10,7 +10,7 @@ def __init__(self, path): super().__init__(_reader.Pose(f"{path}_202_*")) -class EnvActiveConfiguration(Stream): +class EnvironmentActiveConfiguration(Stream): def __init__(self, path): super().__init__(_reader.JsonList(f"{path}_ActiveConfiguration_*", columns=["name"]))