-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathbundle.py
206 lines (178 loc) · 7.45 KB
/
bundle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# Bundle classes and useful functions
# Two bundle classes made here:
# 1. Bundle - a FHIR transaction bundle with resources as entries
# 2. ErrBundle - When a bundle fails it is reorganized with ErrBundle and output
#
# The useful functions are primarily used to get links and resources for patient bundles
import logging
import requests
import json
import os
import pandas as pd
import numpy as np
from datetime import datetime
from uuid import uuid4
from google.cloud import pubsub_v1
from py_mimic_fhir.lookup import MIMIC_BUNDLES_NO_SPLIT_LIST
logger = logging.getLogger(__name__)
# The ErrBundle captures any failed bundles and writes them out to a logfile
class ErrBundle():
def __init__(self, issue, bundle):
self.issue = issue
self.bundle_list = []
self.patient_id = bundle.get_patient_id()
self.bundle_name = bundle.get_bundle_name()
self.set_id_list(bundle)
def json(self):
return self.__dict__
# Capture all fhir ids from resources that were part of the failed bundle
def set_id_list(self, bundle):
for entry in bundle.entry:
if 'meta' in entry['resource']:
profile = entry['resource']['meta']['profile'][0].split('/')[-1]
else:
profile = entry['resource']['resourceType']
fhir_id = entry['resource']['id']
itm = {'fhir_profile': profile, 'id': fhir_id}
self.bundle_list.append(itm)
# Write err bundle to file
def write(self, err_path):
if not os.path.isdir(err_path):
os.mkdir(err_path)
#overwrite each week
day_of_week = datetime.now().strftime('%A').lower()
with open(f'{err_path}err-bundles-{day_of_week}.json', 'a+') as errfile:
json.dump(self.json(), errfile)
errfile.write('\n')
# FHIR bundle class with options to add entries and send requests to the server
class Bundle():
def __init__(self, name, table_list=[], patient_id='no_pat_id'):
self.bundle_name = name
self.id = f'{name}-{str(uuid4())}'
self.table_list = table_list
self.resourceType = 'Bundle'
self.type = 'transaction'
self.entry = []
self.patient_id = patient_id
def get_patient_id(self):
return self.patient_id
def get_bundle_name(self):
return self.bundle_name
# Create bundle entry with given resources
def add_entry(self, resources):
for resource in resources:
if 'resourceType' not in resource:
logger.error(f'Resource has no resourceType: {resource}')
new_request = {}
new_request['method'] = 'PUT'
new_request['url'] = resource['resourceType'] + '/' + resource['id']
new_entry = {}
new_entry['fullUrl'] = resource['id']
new_entry['request'] = new_request
new_entry['resource'] = resource
self.entry.append(new_entry)
def generate(self, patient_id, db_conn):
self.patient_id = patient_id
for table_name in self.table_list:
resources = db_conn.get_resources_by_pat(table_name, patient_id)
self.add_entry(resources)
def json(self):
bundle_json = self.__dict__.copy()
bundle_json.pop('bundle_name')
bundle_json.pop('table_list')
bundle_json.pop('patient_id')
return bundle_json
def publish(self, gcp_args, split_flag=True, bundle_size=200):
output = True # True until proven false
if self.bundle_name in MIMIC_BUNDLES_NO_SPLIT_LIST:
split_flag = False
# Split the entry into smaller bundles to speed up posting
if split_flag:
# Generate smaller bundles
split_count = len(self.entry) // bundle_size
split_count = 1 if split_count == 0 else split_count # for bundles smaller than bundle_size
entry_groups = np.array_split(self.entry, split_count)
for entries in entry_groups:
# Pull out resources from entries
resources = [entry['resource'] for entry in entries]
# Recreate smaller bundles and post
bundle = Bundle(
self.bundle_name,
self.table_list,
patient_id=self.patient_id
)
bundle.add_entry(resources)
output_temp = bundle.publish(
gcp_args,
split_flag=False,
)
if output_temp == False:
output = False
else:
# Post full bundle, no restriction on bundle size
bundle_to_send = json.dumps(self.json()).encode('utf-8')
pub_response = gcp_args.publisher.publish(
gcp_args.topic_path,
bundle_to_send,
patient_id=self.patient_id,
bundle_run=gcp_args.bundle_run,
bundle_group=self.bundle_name,
gcp_project=gcp_args.project,
gcp_location=gcp_args.location,
gcp_bucket=gcp_args.bucket,
gcp_dataset=gcp_args.dataset,
gcp_fhirstore=gcp_args.fhirstore
)
# getting the response from the publisher does not tell us much but takes a lot of time to wait
try:
output = len(pub_response.result()) == 16
except Exception as e:
logger.error(e)
return output
# Send request out to HAPI server, validates on the server
def request(
self,
fhir_server,
err_path=None,
split_flag=True,
bundle_size=60, # optimal based on testing, seems small but if no links is quick!
):
output = True # True until proven false
if self.bundle_name in MIMIC_BUNDLES_NO_SPLIT_LIST:
split_flag = False
# Split the entry into smaller bundles to speed up posting
if split_flag:
# Generate smaller bundles
split_count = len(self.entry) // bundle_size
split_count = 1 if split_count == 0 else split_count # for bundles smaller than bundle_size
entry_groups = np.array_split(self.entry, split_count)
for entries in entry_groups:
# Pull out resources from entries
resources = [entry['resource'] for entry in entries]
# Recreate smaller bundles and post
bundle = Bundle(
self.bundle_name,
self.table_list,
patient_id=self.patient_id
)
bundle.add_entry(resources)
output_temp = bundle.request(
fhir_server, err_path=err_path, split_flag=False
)
if output_temp == False:
output = False
else:
# Post full bundle, no restriction on bundle size
resp = requests.post(
fhir_server,
json=self.json(),
headers={"Content-Type": "application/fhir+json"}
)
if resp.json()['resourceType'] == 'OperationOutcome':
#write out error bundles!
errbundle = ErrBundle(resp.json()['issue'], self)
errbundle.write(err_path)
logger.error(f'------------ bundle_name: {self.bundle_name}')
logger.error(resp.json()['issue'])
output = False
return output