Skip to content

Commit

Permalink
Merge pull request #73 from dbmi-pitt/karlburke/CommonsS3WorkerRevision
Browse files Browse the repository at this point in the history
Rework usage of S3 code from hubmap-commons for introduction of stash…
  • Loading branch information
yuanzhou authored Aug 14, 2024
2 parents dacac43 + 601d5cd commit 93dea91
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 93 deletions.
92 changes: 47 additions & 45 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ def __init__(self, config, translator_module, blueprint=None, ubkg_instance=None
,'aws_object_url_expiration_in_secs': self.AWS_OBJECT_URL_EXPIRATION_IN_SECS
,'service_configured_obj_prefix': self.AWS_S3_OBJECT_PREFIX}

## Initialize an S3Worker from hubmap-commons
try:
self.anS3Worker = S3Worker( ACCESS_KEY_ID=self.S3_settings_dict['aws_access_key_id']
, SECRET_ACCESS_KEY=self.S3_settings_dict['aws_secret_access_key']
, S3_BUCKET_NAME=self.S3_settings_dict['aws_s3_bucket_name']
, S3_OBJECT_URL_EXPIRATION_IN_SECS=self.S3_settings_dict['aws_object_url_expiration_in_secs']
, LARGE_RESPONSE_THRESHOLD=self.S3_settings_dict['large_response_threshold']
, SERVICE_S3_OBJ_PREFIX=self.S3_settings_dict['service_configured_obj_prefix'])
logger.info("anS3Worker initialized")
except Exception as s3exception:
logger.critical(s3exception, exc_info=True)

# Specify the absolute path of the instance folder and use the config file relative to the instance path
self.app = Flask(__name__, instance_path=os.path.join(os.path.abspath(os.path.dirname(__file__))))

Expand Down Expand Up @@ -279,19 +291,13 @@ def search(self):
es_url = self.INDICES['indices'][self.DEFAULT_INDEX_WITHOUT_PREFIX]['elasticsearch']['url'].strip(
'/')

# Set a prefix used for naming any objects that end up in S3 which is
# specific to this service and this function.
function_name = inspect.currentframe().f_code.co_name
self.S3_settings_dict['service_configured_obj_prefix'] = \
f"{self.AWS_S3_OBJECT_PREFIX.replace('unspecified-function',function_name)}"

response = execute_query( query_against='_search'
,request=request
,index=target_index
,es_url=es_url
,query=None
,request_params=None
,large_response_settings_dict=self.S3_settings_dict)
, request=request
, index=target_index
, es_url=es_url
, s3_worker=self.anS3Worker
, query=None
, request_params=None)
generate_manifest = False

if bool(request.args):
Expand Down Expand Up @@ -461,12 +467,6 @@ def param_search_index(self, plural_entity_type):
,f"Parameterized query of OpenSearch using composite_index={composite_index}"
f" target_index={target_index}, entity_type={entity_type}, oss_base_url={oss_base_url}")

# Set a prefix used for naming any objects that end up in S3 which is
# specific to this service and this function.
function_name = inspect.currentframe().f_code.co_name
self.S3_settings_dict['service_configured_obj_prefix'] = \
f"{self.AWS_S3_OBJECT_PREFIX.replace('unspecified-function', function_name)}"

generate_manifest = False
if bool(request.args):
produce_manifest = request.args.get('produce-clt-manifest')
Expand All @@ -475,8 +475,8 @@ def param_search_index(self, plural_entity_type):
if produce_manifest and produce_manifest.lower() == "true":
generate_manifest = True

# The following usage of execute_opensearch_query() followed by size_response_for_gateway() replaces
# the functionality of execute_query(), so that the JSON can be manipulated between those calls.
# The following usage of execute_opensearch_query() followed by self.anS3Worker.stash_response_body_if_big()
# replaces the functionality of execute_query(), so that the JSON can be manipulated between those calls.

# Return the elasticsearch resulting json data as json string
opensearch_response = execute_opensearch_query(query_against='_search'
Expand Down Expand Up @@ -535,15 +535,23 @@ def param_search_index(self, plural_entity_type):
resp_json = convenience_json

# Check the size of what is to be returned through the AWS Gateway, and replace it with
# a response that links to AWS S3, if appropriate.
s3_response = size_response_for_gateway(response_json=json.dumps(resp_json)
, large_response_settings_dict=self.S3_settings_dict)
if s3_response is not None:
return s3_response
else:
return Response(response=json.dumps(resp_json)
, status=opensearch_response.status_code
, mimetype='application/json')
# a response that links to an Object in the AWS S3 Bucket, if appropriate.
try:
s3_url = self.anS3Worker.stash_response_body_if_big(json.dumps(resp_json).encode('utf-8'))
if s3_url is not None:
return Response(response=s3_url
, status=303) # See Other
except Exception as s3exception:
logger.error(f"Error using anS3Worker to handle len(json.dumps(sample_prov_list).encode('utf-8'))="
f"{len(json.dumps(sample_prov_list).encode('utf-8'))}.")
logger.error(s3exception, exc_info=True)
return Response(response=f"Unexpected error storing large results in S3. See logs."
, status=500)

# Return a regular response through the AWS Gateway
return Response(response=json.dumps(resp_json)
, status=opensearch_response.status_code
, mimetype='application/json')
else:
logger.error(f"Unable to return ['hits']['hits'] content of opensearch_response with"
f" status_code={opensearch_response.status_code}"
Expand Down Expand Up @@ -641,20 +649,14 @@ def search_by_index(self, index_without_prefix):
# get URL for that index
es_url = self.INDICES['indices'][index_without_prefix]['elasticsearch']['url'].strip('/')

# Set a prefix used for naming any objects that end up in S3 which is
# specific to this service and this function.
function_name = inspect.currentframe().f_code.co_name
self.S3_settings_dict['service_configured_obj_prefix'] = \
f"{self.AWS_S3_OBJECT_PREFIX.replace('unspecified-function',function_name)}"

# Return the elasticsearch resulting json data as json string
response = execute_query( query_against='_search'
,request=request
,index=target_index
,es_url=es_url
,query=None
,request_params=None
,large_response_settings_dict=self.S3_settings_dict)
, request=request
, index=target_index
, es_url=es_url
, s3_worker=self.anS3Worker
, query=None
, request_params=None)
generate_manifest = False

if bool(request.args):
Expand Down Expand Up @@ -691,7 +693,7 @@ def mget(self):
'/')

# Return the elasticsearch resulting json data as json string
return execute_query('_mget', request, target_index, es_url)
return execute_query('_mget', request, target_index, es_url, self.anS3Worker)

# Info
def mget_by_index(self, index_without_prefix):
Expand All @@ -716,7 +718,7 @@ def mget_by_index(self, index_without_prefix):
es_url = self.INDICES['indices'][index_without_prefix]['elasticsearch']['url'].strip('/')

# Return the elasticsearch resulting json data as json string
return execute_query('_mget', request, target_index, es_url)
return execute_query('_mget', request, target_index, es_url, self.anS3Worker)

# HTTP GET can be used to execute search with body against ElasticSearch REST API.
def count(self):
Expand All @@ -732,7 +734,7 @@ def count(self):
es_url = self.INDICES['indices'][self.DEFAULT_INDEX_WITHOUT_PREFIX]['elasticsearch']['url'].strip('/')

# Return the elasticsearch resulting json data as json string
return execute_query('_count', request, target_index, es_url)
return execute_query('_count', request, target_index, es_url, self.anS3Worker)

# HTTP GET can be used to execute search with body against ElasticSearch REST API.
# Note: the index in URL is not he real index in Elasticsearch, it's that index without prefix
Expand All @@ -753,7 +755,7 @@ def count_by_index(self, index_without_prefix):
es_url = self.INDICES['indices'][index_without_prefix]['elasticsearch']['url'].strip('/')

# Return the elasticsearch resulting json data as json string
return execute_query('_count', request, target_index, es_url)
return execute_query('_count', request, target_index, es_url, self.anS3Worker)

# Get a list of indices
def indices(self):
Expand Down
74 changes: 26 additions & 48 deletions src/opensearch_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ def get_uuids_from_es(index, es_url):
logger.debug("Searching ES for uuids...")
logger.debug(es_url)

resp = execute_query('_search', None, index, es_url, query)
resp = execute_opensearch_query( query_against='_search'
, request=None
, index=index
, es_url=es_url
, query=query)

logger.debug(f"OpenSearch '_search' query response returned resp.status_code={resp.status_code}.")
# @TODO-If a 303 response is returned, retrieve the JSON from an AWS bucket (not currently anticipated to happen.)

ret_obj = resp.get_json()
uuids.extend(hit['_id'] for hit in ret_obj.get('hits').get('hits'))
Expand Down Expand Up @@ -139,40 +142,7 @@ def execute_opensearch_query(query_against, request, index, es_url, query=None,

return requests.post(url=target_url, json=json_data)

def size_response_for_gateway(response_json=None, large_response_settings_dict=None):

if large_response_settings_dict is not None:
# Since the calling service passed in a dictionary of settings for AWS S3, stash
# any large responses there. Otherwise, allow the response to be returned directly
# as this function exits.
if len(response_json.encode('utf-8')) >= large_response_settings_dict['large_response_threshold']:
anS3Worker = None
try:
anS3Worker = S3Worker( large_response_settings_dict['aws_access_key_id']
,large_response_settings_dict['aws_secret_access_key']
,large_response_settings_dict['aws_s3_bucket_name']
,large_response_settings_dict['aws_object_url_expiration_in_secs'])
logger.info("anS3Worker initialized")
obj_key = anS3Worker.stash_text_as_object( response_json
,large_response_settings_dict['service_configured_obj_prefix'])
aws_presigned_url = anS3Worker.create_URL_for_object(obj_key)
return Response( response=aws_presigned_url
, status=303) # See Other
except Exception as s3exception:
logger.error( f"Error getting anS3Worker to handle len(results)="
f"{len(response_json.encode('utf-8'))}.")
logger.error(s3exception, exc_info=True)
return Response( response=f"Unexpected error storing large results in S3. See logs."
,status=500)
else:
# Since the calling service did not pass in a dictionary of settings for AWS S3, execute the
# traditional handling to check for responses over 10MB and return more useful message instead
# of AWS API Gateway's default 500 message.
# Note Content-length header is not always provided, we have to calculate
check_response_payload_size(response_json)
return None

def execute_query(query_against, request, index, es_url, query=None, request_params=None, large_response_settings_dict=None):
def execute_query(query_against, request, index, es_url, s3_worker, query=None, request_params=None):
opensearch_response = execute_opensearch_query(query_against=query_against
,request=request
,index=index
Expand All @@ -182,18 +152,26 @@ def execute_query(query_against, request, index, es_url, query=None, request_par

# Continue on using the exact JSON returned by the OpenSearch query. Use cases which need to
# manipulate the JSON for their response should do their own execute_opensearch_query() and
# size_response_for_gateway(), with the manipulation between the calls.

s3_response = size_response_for_gateway(response_json=json.dumps(opensearch_response.json())
, large_response_settings_dict=large_response_settings_dict)
if s3_response is not None:
return s3_response
else:
# Convert the requests.models.Response to a flask.wrappers.Response to
# return results through the view.
return Response(response=json.dumps(opensearch_response.json())
, status=opensearch_response.status_code
, mimetype='application/json')
# S3Worker.stash_response_body_if_big(), with the manipulation between the calls.

# Check the size of what is to be returned through the AWS Gateway, and replace it with
# a response that links to an Object in the AWS S3 Bucket, if appropriate.
resp_body = json.dumps(opensearch_response.json())
try:
s3_url = s3_worker.stash_response_body_if_big(resp_body)
if s3_url is not None:
return Response(response=s3_url
, status=303) # See Other
except Exception as s3exception:
logger.error(f"Error using s3_worker to handle len(resp_body)="
f"{len(resp_body)}.")
logger.error(s3exception, exc_info=True)
return Response(response=f"Unexpected error storing large results in S3. See logs."
, status=500)
# Return a regular response through the AWS Gateway
return Response(response=json.dumps(opensearch_response.json())
, status=opensearch_response.status_code
, mimetype='application/json')

# Get the query string from orignal request
def get_query_string(url):
Expand Down

0 comments on commit 93dea91

Please sign in to comment.