Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added validate_order method #97

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
68 changes: 68 additions & 0 deletions event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,3 +1354,71 @@ def default(self, obj):
return obj.item()
return obj.tolist()
return json.JSONEncoder.default(self, obj)


def validate_order(run_iterable):
"""
Validates the order of a Bluesky Run.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you copy over the content from #98 here to explain what constraints this is enforcing?


Parameters
---------
run_iterable: iterable
A Bluesky run in the form of an iterable of name, doc pairs.
"""
datum_cache = {}
resource_cache = {}
descriptor_cache = {}
event_cache = defaultdict(list)
last_index = 0

for i, (name, doc) in enumerate(run_iterable):
last_index = i

if name == 'start': start = (i, doc)
if name == 'stop': stop = (i, doc)
if name == 'resource': resource_cache[doc['uid']] = (i, doc)
if name == 'descriptor': descriptor_cache[doc['uid']] = (i, doc)
if name == 'datum': datum_cache[doc['datum_id']] = (i, doc)
if name == 'datum_page':
for datum in unpack_datum_page(doc):
datum_cache[datum['datum_id']] = (i, datum)
if name == 'event': event_cache[doc['descriptor']].append((i, doc))
if name == 'event_page':
for event in unpack_event_page(doc):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not address:

Event[Page]s across streams are in time order up to the time resolution of a Page. That is, if we denote event_page['time'][0] as a_i ("a initial") and event_page['time'][-1] as a_f ("a final") for a given EventPage a, if b follows a and then b_f >= a_i. In English, each EventPage's highest time must be greater than or equal to the preceding EventPages' lowest times.

event_cache[event['descriptor']].append((i, event))

# Check that the start document is the first document.
assert start[0] == 1

# Check the the stop document is the last document.
assert stop[0] == last_index

# For each stream check that events are in timestamp order.
for descriptor_id, event_stream in event_cache.values():
t0 = None
for index, event in event_stream:
t1 = event['time']
if t0:
assert t1 > t0
t0 = t1

# Check that descriptor doc is received before the first event of that
# stream.
for descriptor_id, event_stream in event_cache.values():
assert event_stream[0][0] >
descriptor_cache[event_stream[0]['descriptor']][0]

# For each event check that referenced datum are received first.
for descriptor_id, event_stream in event_cache.items():
external_keys = set(descriptor_cache[descriptor_id]['data_keys']['external'].keys())
for i, event in event_stream:
# Check that the filled keys match the external keys defined in the
# descriptor.
assert external_keys == set(event['filled'].keys())
for key, value in event.items():
if key in external_keys:
assert datum_cache[value][0] < i

# For each datum check that the referenced resource is received first.
for i, datum in datum_cache.values()
assert resource_cache[datum['resource']][0] < i