Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#64] Queue and Storage triggers for AWS, GCP and Azure #201

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions benchmarks/wrappers/aws/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@

import datetime, io, json, os, sys, uuid

# Add current directory to allow location of packages
sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))

# TODO: usual trigger
# implement support for S3 and others
def handler(event, context):

income_timestamp = datetime.datetime.now().timestamp()

# Queue trigger
if ("Records" in event and event["Records"][0]["eventSource"] == 'aws:sqs'):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question here: are we certain we always receive a single event? do we need to add loop here?

event = json.loads(event["Records"][0]["body"])

# Storage trigger
if ("Records" in event and "s3" in event["Records"][0]):
bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
file_name = event["Records"][0]["s3"]["object"]["key"]

from function import storage
storage_inst = storage.storage.get_instance()

obj = storage_inst.get_object(bucket_name, file_name)
event = json.loads(obj['Body'].read())

# HTTP trigger with API Gateaway
if 'body' in event:
event = json.loads(event['body'])

req_id = context.aws_request_id
event['request-id'] = req_id
event['income-timestamp'] = income_timestamp
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/wrappers/aws/python/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def download_stream(self, bucket, file):
data = io.BytesIO()
self.client.download_fileobj(bucket, file, data)
return data.getbuffer()

def get_object(self, bucket, file):
return self.client.get_object(Bucket=bucket, Key=file)

def get_instance():
if storage.instance is None:
Expand Down
24 changes: 20 additions & 4 deletions benchmarks/wrappers/azure/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@

import datetime, io, json, os, uuid
import base64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The base64 import is unused and should be removed.

- import base64
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import base64
Tools
Ruff

2-2: base64 imported but unused

Remove unused import: base64

(F401)

import datetime, io, json, logging, os, uuid

import azure.functions as func


# TODO: usual trigger
# implement support for blob and others
def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
def handler_http(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
income_timestamp = datetime.datetime.now().timestamp()
req_json = req.get_json()
if 'connection_string' in req_json:
Expand Down Expand Up @@ -73,3 +72,20 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
mimetype="application/json"
)

def handler_queue(msg: func.QueueMessage):
logging.info('Python queue trigger function processed a queue item.')
payload = msg.get_body().decode('utf-8')

from . import function
ret = function.handler(payload)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused variable ret.

The ret variable is assigned but never used.

- ret = function.handler(payload)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = function.handler(payload)
function.handler(payload)
Tools
Ruff

80-80: Local variable ret is assigned to but never used

Remove assignment to unused variable ret

(F841)


# TODO(oana)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we missing something here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is missing the core part of making measurements and returning values. Maybe we can just do a bit of restructuring to have a single invocation + measurement code, called from three trigger interfaces?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed the measurements infrastructure - all invocations are end-to-end complete.


def handler_storage(blob: func.InputStream):
logging.info('Python Blob trigger function processed %s', blob.name)
payload = blob.readline().decode('utf-8') # TODO(oana)

from . import function
ret = function.handler(payload)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused variable ret.

The ret variable is assigned but never used.

- ret = function.handler(payload)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = function.handler(payload)
function.handler(payload)
Tools
Ruff

89-89: Local variable ret is assigned to but never used

Remove assignment to unused variable ret

(F841)


# TODO(oana)
oanarosca marked this conversation as resolved.
Show resolved Hide resolved
35 changes: 32 additions & 3 deletions benchmarks/wrappers/gcp/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime, io, json, os, uuid, sys
import base64, datetime, io, json, os, uuid, sys

sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))
from google.cloud import storage as gcp_storage
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The google.cloud.storage import is unused and should be removed.

- from google.cloud import storage as gcp_storage
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from google.cloud import storage as gcp_storage
Tools
Ruff

3-3: google.cloud.storage imported but unused

Remove unused import: google.cloud.storage

(F401)


sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))

def handler(req):
def handler_http(req):
income_timestamp = datetime.datetime.now().timestamp()
req_id = req.headers.get('Function-Execution-Id')

Expand Down Expand Up @@ -62,3 +63,31 @@ def handler(req):
'cold_start_var': cold_start_var,
'container_id': container_id,
}), 200, {'ContentType': 'application/json'}

def handler_queue(data, context):
serialized_payload = data.get('data')
payload = json.loads(base64.b64decode(serialized_payload).decode("utf-8"))

from function import function
ret = function.handler(payload)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused variable ret.

The ret variable is assigned but never used.

- ret = function.handler(payload)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = function.handler(payload)
Tools
Ruff

72-72: Local variable ret is assigned to but never used

Remove assignment to unused variable ret

(F841)


# TODO(oana)
oanarosca marked this conversation as resolved.
Show resolved Hide resolved

def handler_storage(data, context):
bucket_name = data.get('bucket')
name = data.get('name')
filepath = '/tmp/bucket_contents'

from function import storage
storage_inst = storage.storage.get_instance()
storage_inst.download(bucket_name, name, filepath)

payload = {}

with open(filepath, 'r') as fp:
payload = json.load(fp)

from function import function
ret = function.handler(payload)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused variable ret.

The ret variable is assigned but never used.

- ret = function.handler(payload)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = function.handler(payload)
function.handler(payload)
Tools
Ruff

91-91: Local variable ret is assigned to but never used

Remove assignment to unused variable ret

(F841)


# TODO(oana)
2 changes: 1 addition & 1 deletion config/example.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"download_results": false,
"runtime": {
"language": "python",
"version": "3.7"
"version": "3.9"
},
"type": "invocation-overhead",
"perf-cost": {
Expand Down
3 changes: 2 additions & 1 deletion config/systems.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"python": {
"base_images": {
"3.7": "python:3.7-slim",
"3.8": "python:3.8-slim"
"3.8": "python:3.8-slim",
"3.9": "python:3.9-slim"
oanarosca marked this conversation as resolved.
Show resolved Hide resolved
},
"images": [
"run",
Expand Down
1 change: 1 addition & 0 deletions docs/modularity.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ Implement this step in the following function:
language_version: str,
benchmark: str,
is_cached: bool,
trigger: Optional[Trigger.TriggerType],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review the addition of the trigger parameter in the function signature.

The addition of the trigger parameter to the function signature is a critical update. It allows the function to handle an optional trigger type, which can significantly alter its behavior based on the presence or absence of this parameter.

Verify that all references to this function in the documentation and codebase have been updated to reflect this new parameter. Additionally, provide examples or scenarios where this parameter would be used, enhancing the understanding and applicability of this change.

) -> Tuple[str, int]
```

Expand Down
6 changes: 3 additions & 3 deletions docs/platforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ AZURE_SECRET_PASSWORD = XXXXXXXXXXXXX
You can pass the credentials either using the environment variables:

```
export AZURE_SECRET_APPLICATION_ID = XXXXXXXXXXXXXXXX
export AZURE_SECRET_TENANT = XXXXXXXXXXXX
export AZURE_SECRET_PASSWORD = XXXXXXXXXXXXX
export AZURE_SECRET_APPLICATION_ID=XXXXXXXXXXXXXXXX
oanarosca marked this conversation as resolved.
Show resolved Hide resolved
export AZURE_SECRET_TENANT=XXXXXXXXXXXX
export AZURE_SECRET_PASSWORD=XXXXXXXXXXXXX
```

or in the JSON input configuration:
Expand Down
2 changes: 2 additions & 0 deletions requirements.azure.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
azure-storage-blob==12.10.0
azure-storage-queue==12.9.0
azure-identity==1.16.0
17 changes: 15 additions & 2 deletions sebs.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def benchmark():
@click.option("--repetitions", default=5, type=int, help="Number of experimental repetitions.")
@click.option(
"--trigger",
type=click.Choice(["library", "http"]),
type=click.Choice(["library", "http", "queue", "storage"]),
default="http",
help="Function trigger to be used.",
)
Expand Down Expand Up @@ -224,6 +224,10 @@ def invoke(
if image_tag_prefix is not None:
sebs_client.config.image_tag_prefix = image_tag_prefix

# Insert trigger into (experiment) config. Required by Azure when packaging.
trigger = trigger if trigger is not None else "http"
update_nested_dict(config, ["experiments", "trigger"], trigger)

experiment_config = sebs_client.get_experiment_config(config["experiments"])
update_nested_dict(config, ["experiments", "benchmark"], benchmark)
benchmark_obj = sebs_client.get_benchmark(
Expand All @@ -237,9 +241,18 @@ def invoke(
if timeout is not None:
benchmark_obj.benchmark_config.timeout = timeout

function_name = function_name if function_name else deployment_client.default_function_name(benchmark_obj)

# GCP and Azure only allow one trigger per function, so augment function name with
# trigger type: _http, _queue etc.
#
# Additionally, Azure requires for the trigger to be defined at deployment time.
if deployment_client.name() == "gcp" or deployment_client.name() == "azure":
oanarosca marked this conversation as resolved.
Show resolved Hide resolved
function_name = "{}-{}".format(function_name, trigger)

func = deployment_client.get_function(
benchmark_obj,
function_name if function_name else deployment_client.default_function_name(benchmark_obj),
function_name,
)
storage = deployment_client.get_storage(replace_existing=experiment_config.update_storage)
input_config = benchmark_obj.prepare_input(storage=storage, size=benchmark_input_size)
Expand Down
Loading