diff --git a/benchmarks/wrappers/aws/python/handler.py b/benchmarks/wrappers/aws/python/handler.py index 907b2c61..a100393a 100644 --- a/benchmarks/wrappers/aws/python/handler.py +++ b/benchmarks/wrappers/aws/python/handler.py @@ -1,18 +1,39 @@ - import datetime, io, json, os, sys, uuid # Add current directory to allow location of packages sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages')) -# TODO: usual trigger -# implement support for S3 and others def handler(event, context): income_timestamp = datetime.datetime.now().timestamp() + # Flag to indicate whether the measurements should be returned as an HTTP + # response or via a result queue. + return_http = True + + # Queue trigger + if ("Records" in event and event["Records"][0]["eventSource"] == 'aws:sqs'): + event = json.loads(event["Records"][0]["body"]) + + return_http = False + + # Storage trigger + if ("Records" in event and "s3" in event["Records"][0]): + bucket_name = event["Records"][0]["s3"]["bucket"]["name"] + file_name = event["Records"][0]["s3"]["object"]["key"] + + from function import storage + storage_inst = storage.storage.get_instance() + + obj = storage_inst.get_object(bucket_name, file_name) + event = json.loads(obj['Body'].read()) + + return_http = False + # HTTP trigger with API Gateaway if 'body' in event: event = json.loads(event['body']) + req_id = context.aws_request_id event['request-id'] = req_id event['income-timestamp'] = income_timestamp @@ -55,17 +76,30 @@ def handler(event, context): if "cold_start" in os.environ: cold_start_var = os.environ["cold_start"] - return { - 'statusCode': 200, - 'body': json.dumps({ - 'begin': begin.strftime('%s.%f'), - 'end': end.strftime('%s.%f'), - 'results_time': results_time, - 'is_cold': is_cold, - 'result': log_data, - 'request_id': context.aws_request_id, - 'cold_start_var': cold_start_var, - 'container_id': container_id, - }) - } + stats = json.dumps({ + 'begin': begin.strftime('%s.%f'), + 'end': end.strftime('%s.%f'), + 'results_time': results_time, + 'is_cold': is_cold, + 'result': log_data, + 'request_id': context.aws_request_id, + 'cold_start_var': cold_start_var, + 'container_id': container_id, + }) + + # HTTP or library trigger: return an HTTP response. + if (return_http): + return { + 'statusCode': 200, + 'body': stats + } + + # Queue or storage trigger: return via a result queue. + arn = context.invoked_function_arn.split(":") + region = arn[3] + account_id = arn[4] + queue_name = f"{arn[6]}-result" + from function import queue + queue_client = queue.queue(queue_name, account_id, region) + queue_client.send_message(stats) diff --git a/benchmarks/wrappers/aws/python/queue.py b/benchmarks/wrappers/aws/python/queue.py new file mode 100644 index 00000000..95cde8a7 --- /dev/null +++ b/benchmarks/wrappers/aws/python/queue.py @@ -0,0 +1,14 @@ +import boto3 + +class queue: + client = None + + def __init__(self, queue_name: str, account_id: str, region: str): + self.client = boto3.client('sqs', region_name=region) + self.queue_url = f"https://sqs.{region}.amazonaws.com/{account_id}/{queue_name}" + + def send_message(self, message: str): + self.client.send_message( + QueueUrl=self.queue_url, + MessageBody=message, + ) diff --git a/benchmarks/wrappers/aws/python/storage.py b/benchmarks/wrappers/aws/python/storage.py index 4be0025e..602319df 100644 --- a/benchmarks/wrappers/aws/python/storage.py +++ b/benchmarks/wrappers/aws/python/storage.py @@ -46,6 +46,9 @@ def download_stream(self, bucket, file): data = io.BytesIO() self.client.download_fileobj(bucket, file, data) return data.getbuffer() + + def get_object(self, bucket, file): + return self.client.get_object(Bucket=bucket, Key=file) def get_instance(): if storage.instance is None: diff --git a/benchmarks/wrappers/azure/python/handler.py b/benchmarks/wrappers/azure/python/handler.py index 5f7f14f2..70843b6e 100644 --- a/benchmarks/wrappers/azure/python/handler.py +++ b/benchmarks/wrappers/azure/python/handler.py @@ -1,18 +1,70 @@ -import datetime, io, json, os, uuid +import base64 +import datetime, io, json, logging, os, uuid + +from azure.identity import ManagedIdentityCredential +from azure.storage.queue import QueueClient import azure.functions as func -# TODO: usual trigger -# implement support for blob and others -def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse: +def handler_http(req: func.HttpRequest, context: func.Context) -> func.HttpResponse: income_timestamp = datetime.datetime.now().timestamp() + req_json = req.get_json() if 'connection_string' in req_json: os.environ['STORAGE_CONNECTION_STRING'] = req_json['connection_string'] + req_json['request-id'] = context.invocation_id req_json['income-timestamp'] = income_timestamp + + return func.HttpResponse(measure(req_json), mimetype="application/json") + +def handler_queue(msg: func.QueueMessage, context: func.Context): + income_timestamp = datetime.datetime.now().timestamp() + + logging.info('Python queue trigger function processed a queue item.') + payload = msg.get_json() + + payload['request-id'] = context.invocation_id + payload['income-timestamp'] = income_timestamp + + stats = measure(payload) + + queue_name = f"{os.getenv('WEBSITE_SITE_NAME')}-result" + storage_account = os.getenv('STORAGE_ACCOUNT') + logging.info(queue_name) + logging.info(storage_account) + + from . import queue + queue_client = queue.queue(queue_name, storage_account) + queue_client.send_message(stats) + +def handler_storage(blob: func.InputStream, context: func.Context): + income_timestamp = datetime.datetime.now().timestamp() + + logging.info('Python Blob trigger function processed %s', blob.name) + payload = json.loads(blob.readline().decode('utf-8')) + + payload['request-id'] = context.invocation_id + payload['income-timestamp'] = income_timestamp + + stats = measure(payload) + + queue_name = f"{os.getenv('WEBSITE_SITE_NAME')}-result" + storage_account = os.getenv('STORAGE_ACCOUNT') + logging.info(queue_name) + logging.info(storage_account) + + from . import queue + queue_client = queue.queue(queue_name, storage_account) + queue_client.send_message(stats) + +# Contains generic logic for gathering measurements for the function at hand, +# given a request JSON. Used by all handlers, regardless of the trigger. +def measure(req_json) -> str: + req_id = req_json['request-id'] + begin = datetime.datetime.now() # We are deployed in the same directory from . import function @@ -30,7 +82,6 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse: from . import storage storage_inst = storage.storage.get_instance() b = req_json.get('logs').get('bucket') - req_id = context.invocation_id storage_inst.upload_stream(b, '{}.json'.format(req_id), io.BytesIO(json.dumps(log_data).encode('utf-8'))) results_end = datetime.datetime.now() @@ -58,8 +109,7 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse: cold_marker = True is_cold_worker = True - return func.HttpResponse( - json.dumps({ + return json.dumps({ 'begin': begin.strftime('%s.%f'), 'end': end.strftime('%s.%f'), 'results_time': results_time, @@ -68,8 +118,5 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse: 'is_cold_worker': is_cold_worker, 'container_id': container_id, 'environ_container_id': os.environ['CONTAINER_NAME'], - 'request_id': context.invocation_id - }), - mimetype="application/json" - ) - + 'request_id': req_id + }) \ No newline at end of file diff --git a/benchmarks/wrappers/azure/python/queue.py b/benchmarks/wrappers/azure/python/queue.py new file mode 100644 index 00000000..93824181 --- /dev/null +++ b/benchmarks/wrappers/azure/python/queue.py @@ -0,0 +1,15 @@ +from azure.identity import ManagedIdentityCredential +from azure.storage.queue import QueueClient + +class queue: + client = None + + def __init__(self, queue_name: str, storage_account: str): + account_url = f"https://{storage_account}.queue.core.windows.net" + managed_credential = ManagedIdentityCredential() + self.client = QueueClient(account_url, + queue_name=queue_name, + credential=managed_credential) + + def send_message(self, message: str): + self.client.send_message(message) diff --git a/benchmarks/wrappers/gcp/python/handler.py b/benchmarks/wrappers/gcp/python/handler.py index b9017b52..51a9d604 100644 --- a/benchmarks/wrappers/gcp/python/handler.py +++ b/benchmarks/wrappers/gcp/python/handler.py @@ -1,16 +1,75 @@ -import datetime, io, json, os, uuid, sys +import base64, datetime, io, json, os, uuid, sys -sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages')) +from google.cloud import storage as gcp_storage +sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages')) -def handler(req): +def handler_http(req): income_timestamp = datetime.datetime.now().timestamp() req_id = req.headers.get('Function-Execution-Id') - req_json = req.get_json() req_json['request-id'] = req_id req_json['income-timestamp'] = income_timestamp + + return measure(req_json), 200, {'ContentType': 'application/json'} + +def handler_queue(data, context): + income_timestamp = datetime.datetime.now().timestamp() + + serialized_payload = data.get('data') + payload = json.loads(base64.b64decode(serialized_payload).decode("utf-8")) + + payload['request-id'] = context.event_id + payload['income-timestamp'] = income_timestamp + + stats = measure(payload) + + # Retrieve the project id and construct the result queue name. + project_id = context.resource.split("/")[1] + topic_name = f"{context.resource.split('/')[3]}-result" + + from function import queue + queue_client = queue.queue(topic_name, project_id) + queue_client.send_message(stats) + +def handler_storage(data, context): + income_timestamp = datetime.datetime.now().timestamp() + + bucket_name = data.get('bucket') + name = data.get('name') + filepath = '/tmp/bucket_contents' + + from function import storage + storage_inst = storage.storage.get_instance() + storage_inst.download(bucket_name, name, filepath) + + payload = {} + + with open(filepath, 'r') as fp: + payload = json.load(fp) + + payload['request-id'] = context.event_id + payload['income-timestamp'] = income_timestamp + + stats = measure(payload) + + # Retrieve the project id and construct the result queue name. + from google.auth import default + # Used to be an env var, now we need an additional request to the metadata + # server to retrieve it. + _, project_id = default() + topic_name = f"{context.resource['name'].split('/')[3]}-result" + + from function import queue + queue_client = queue.queue(topic_name, project_id) + queue_client.send_message(stats) + +# Contains generic logic for gathering measurements for the function at hand, +# given a request JSON. Used by all handlers, regardless of the trigger. +def measure(req_json) -> str: + req_id = req_json['request-id'] + begin = datetime.datetime.now() # We are deployed in the same directorygit status from function import function @@ -61,4 +120,4 @@ def handler(req): 'request_id': req_id, 'cold_start_var': cold_start_var, 'container_id': container_id, - }), 200, {'ContentType': 'application/json'} + }) diff --git a/benchmarks/wrappers/gcp/python/queue.py b/benchmarks/wrappers/gcp/python/queue.py new file mode 100644 index 00000000..b6e009e7 --- /dev/null +++ b/benchmarks/wrappers/gcp/python/queue.py @@ -0,0 +1,14 @@ +from google.cloud import pubsub_v1 + +class queue: + client = None + + def __init__(self, topic_name: str, project_id: str): + self.client = pubsub_v1.PublisherClient() + self.topic_name = 'projects/{project_id}/topics/{topic}'.format( + project_id=project_id, + topic=topic_name, + ) + + def send_message(self, message: str): + self.client.publish(self.topic_name, message.encode("utf-8")) diff --git a/config/example.json b/config/example.json index dc4da9ad..f405a3be 100644 --- a/config/example.json +++ b/config/example.json @@ -6,7 +6,7 @@ "download_results": false, "runtime": { "language": "python", - "version": "3.7" + "version": "3.9" }, "type": "invocation-overhead", "perf-cost": { diff --git a/config/systems.json b/config/systems.json index bf095d3f..bcd1b1cf 100644 --- a/config/systems.json +++ b/config/systems.json @@ -71,7 +71,8 @@ "deployment": { "files": [ "handler.py", - "storage.py" + "storage.py", + "queue.py" ], "packages": [] } @@ -112,10 +113,13 @@ "deployment": { "files": [ "handler.py", - "storage.py" + "storage.py", + "queue.py" ], "packages": [ - "azure-storage-blob" + "azure-storage-blob", + "\nazure-storage-queue", + "\nazure-identity" ] } }, @@ -165,7 +169,8 @@ "deployment": { "files": [ "handler.py", - "storage.py" + "storage.py", + "queue.py" ], "packages": [ "google-cloud-storage" diff --git a/docs/modularity.md b/docs/modularity.md index 7e3c7fcc..f6015b8e 100644 --- a/docs/modularity.md +++ b/docs/modularity.md @@ -267,7 +267,8 @@ Check other platforms to see how configuration is defined, for example, for AWS: "deployment": { "files": [ "handler.py", - "storage.py" + "storage.py", + "queue.py" ], "packages": [] } @@ -303,6 +304,7 @@ Implement this step in the following function: language_version: str, benchmark: str, is_cached: bool, + trigger: Optional[Trigger.TriggerType], ) -> Tuple[str, int] ``` diff --git a/docs/platforms.md b/docs/platforms.md index 27738b6e..506e2131 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -84,9 +84,9 @@ AZURE_SECRET_PASSWORD = XXXXXXXXXXXXX You can pass the credentials either using the environment variables: ``` -export AZURE_SECRET_APPLICATION_ID = XXXXXXXXXXXXXXXX -export AZURE_SECRET_TENANT = XXXXXXXXXXXX -export AZURE_SECRET_PASSWORD = XXXXXXXXXXXXX +export AZURE_SECRET_APPLICATION_ID=XXXXXXXXXXXXXXXX +export AZURE_SECRET_TENANT=XXXXXXXXXXXX +export AZURE_SECRET_PASSWORD=XXXXXXXXXXXXX ``` or in the JSON input configuration: diff --git a/requirements.azure.txt b/requirements.azure.txt index f7d82499..4fed51ac 100644 --- a/requirements.azure.txt +++ b/requirements.azure.txt @@ -1 +1,3 @@ azure-storage-blob==12.10.0 +azure-storage-queue==12.9.0 +azure-identity==1.16.0 diff --git a/requirements.gcp.txt b/requirements.gcp.txt index 9cb90916..3d1aea35 100644 --- a/requirements.gcp.txt +++ b/requirements.gcp.txt @@ -4,3 +4,4 @@ google-api-python-client==1.12.5 google-cloud-monitoring==2.0.0 google-api-python-client-stubs google-cloud-logging==2.0.0 +google-cloud-pubsub=2.23.0 \ No newline at end of file diff --git a/scripts/run_experiments.py b/scripts/run_experiments.py index c18b96c0..c9167553 100755 --- a/scripts/run_experiments.py +++ b/scripts/run_experiments.py @@ -445,6 +445,7 @@ def __init__(self, cache_client, config, docker_client, language): function - function.py - storage.py + - queue.py - resources handler.py diff --git a/sebs.py b/sebs.py index ff7f7769..26bbeb2e 100755 --- a/sebs.py +++ b/sebs.py @@ -178,7 +178,7 @@ def benchmark(): @click.option("--repetitions", default=5, type=int, help="Number of experimental repetitions.") @click.option( "--trigger", - type=click.Choice(["library", "http"]), + type=click.Choice(["library", "http", "queue", "storage"]), default="http", help="Function trigger to be used.", ) @@ -229,6 +229,10 @@ def invoke( if image_tag_prefix is not None: sebs_client.config.image_tag_prefix = image_tag_prefix + # Insert trigger into (experiment) config. Required by Azure when packaging. + trigger = trigger if trigger is not None else "http" + update_nested_dict(config, ["experiments", "trigger"], trigger) + experiment_config = sebs_client.get_experiment_config(config["experiments"]) update_nested_dict(config, ["experiments", "benchmark"], benchmark) benchmark_obj = sebs_client.get_benchmark( @@ -242,9 +246,18 @@ def invoke( if timeout is not None: benchmark_obj.benchmark_config.timeout = timeout + function_name = function_name if function_name else deployment_client.default_function_name(benchmark_obj) + + # GCP and Azure only allow one trigger per function, so augment function name with + # trigger type: _http, _queue etc. + # + # Additionally, Azure requires for the trigger to be defined at deployment time. + if deployment_client.name() == "gcp" or deployment_client.name() == "azure": + function_name = "{}-{}".format(function_name, trigger) + func = deployment_client.get_function( benchmark_obj, - function_name if function_name else deployment_client.default_function_name(benchmark_obj), + function_name, ) storage = deployment_client.get_storage(replace_existing=experiment_config.update_storage) input_config = benchmark_obj.prepare_input(storage=storage, size=benchmark_input_size) diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index 6dc70e52..9bcb52e6 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -119,6 +119,7 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: function - function.py - storage.py + - queue.py - resources handler.py @@ -132,6 +133,7 @@ def package_code( language_version: str, benchmark: str, is_cached: bool, + trigger: Optional[Trigger.TriggerType], ) -> Tuple[str, int]: CONFIG_FILES = { @@ -258,13 +260,19 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFun def cached_function(self, function: Function): - from sebs.aws.triggers import LibraryTrigger + from sebs.aws.triggers import LibraryTrigger, QueueTrigger, StorageTrigger for trigger in function.triggers(Trigger.TriggerType.LIBRARY): trigger.logging_handlers = self.logging_handlers cast(LibraryTrigger, trigger).deployment_client = self for trigger in function.triggers(Trigger.TriggerType.HTTP): trigger.logging_handlers = self.logging_handlers + for trigger in function.triggers(Trigger.TriggerType.QUEUE): + trigger.logging_handlers = self.logging_handlers + cast(QueueTrigger, trigger).deployment_client = self + for trigger in function.triggers(Trigger.TriggerType.STORAGE): + trigger.logging_handlers = self.logging_handlers + cast(StorageTrigger, trigger).deployment_client = self """ Update function code and configuration on AWS. @@ -484,10 +492,11 @@ def download_metrics( ) def create_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> Trigger: - from sebs.aws.triggers import HTTPTrigger + from sebs.aws.triggers import HTTPTrigger, QueueTrigger, StorageTrigger function = cast(LambdaFunction, func) + trigger: Trigger if trigger_type == Trigger.TriggerType.HTTP: api_name = "{}-http-api".format(function.name) @@ -511,6 +520,14 @@ def create_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> T elif trigger_type == Trigger.TriggerType.LIBRARY: # should already exist return func.triggers(Trigger.TriggerType.LIBRARY)[0] + elif trigger_type == Trigger.TriggerType.QUEUE: + trigger = QueueTrigger(func.name, self) + trigger.logging_handlers = self.logging_handlers + self.logging.info(f"Created Queue trigger for {func.name} function.") + elif trigger_type == Trigger.TriggerType.STORAGE: + trigger = StorageTrigger(func.name, self) + trigger.logging_handlers = self.logging_handlers + self.logging.info(f"Created Storage trigger for {func.name} function.") else: raise RuntimeError("Not supported!") diff --git a/sebs/aws/function.py b/sebs/aws/function.py index 27aeb240..24ce4a8d 100644 --- a/sebs/aws/function.py +++ b/sebs/aws/function.py @@ -39,7 +39,7 @@ def serialize(self) -> dict: @staticmethod def deserialize(cached_config: dict) -> "LambdaFunction": from sebs.faas.function import Trigger - from sebs.aws.triggers import LibraryTrigger, HTTPTrigger + from sebs.aws.triggers import LibraryTrigger, HTTPTrigger, QueueTrigger, StorageTrigger cfg = FunctionConfig.deserialize(cached_config["config"]) ret = LambdaFunction( @@ -55,7 +55,12 @@ def deserialize(cached_config: dict) -> "LambdaFunction": for trigger in cached_config["triggers"]: trigger_type = cast( Trigger, - {"Library": LibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + { + "Library": LibraryTrigger, + "HTTP": HTTPTrigger, + "Queue": QueueTrigger, + "Storage": StorageTrigger, + }.get(trigger["type"]), ) assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) diff --git a/sebs/aws/triggers.py b/sebs/aws/triggers.py index f1831459..96b9bc20 100644 --- a/sebs/aws/triggers.py +++ b/sebs/aws/triggers.py @@ -2,10 +2,15 @@ import concurrent.futures import datetime import json -from typing import Dict, Optional # noqa +from typing import Optional +import uuid # noqa + +import boto3 from sebs.aws.aws import AWS +from sebs.aws.queue import SQS from sebs.faas.function import ExecutionResult, Trigger +from sebs.faas.queue import QueueType class LibraryTrigger(Trigger): @@ -123,3 +128,271 @@ def serialize(self) -> dict: @staticmethod def deserialize(obj: dict) -> Trigger: return HTTPTrigger(obj["url"], obj["api-id"]) + + +class QueueTrigger(Trigger): + def __init__( + self, + fname: str, + deployment_client: Optional[AWS] = None, + queue: Optional[SQS] = None, + result_queue: Optional[SQS] = None + ): + super().__init__() + self.name = fname + self._queue = queue + self._result_queue = result_queue + self._deployment_client = deployment_client + + if (not self._queue): + self._queue = SQS( + self.name, + QueueType.TRIGGER, + self.deployment_client.config.region + ) + self.queue.create_queue() + + # Add queue trigger + lambda_client = self.deployment_client.get_lambda_client() + if not len( + lambda_client.list_event_source_mappings( + EventSourceArn=self.queue.queue_arn, FunctionName=self.name + )["EventSourceMappings"] + ): + lambda_client.create_event_source_mapping( + EventSourceArn=self.queue.queue_arn, + FunctionName=self.name, + MaximumBatchingWindowInSeconds=1, + ) + + # Create result queue for communicating benchmark results back to the + # client. + if (not self._result_queue): + self._result_queue = SQS( + fname, + QueueType.RESULT, + self.deployment_client.config.region + ) + self._result_queue.create_queue() + + @staticmethod + def typename() -> str: + return "AWS.QueueTrigger" + + @property + def queue(self) -> SQS: + assert self._queue + return self._queue + + @property + def result_queue(self) -> SQS: + assert self._result_queue + return self._result_queue + + @property + def deployment_client(self) -> AWS: + assert self._deployment_client + return self._deployment_client + + @deployment_client.setter + def deployment_client(self, deployment_client: AWS): + self._deployment_client = deployment_client + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.QUEUE + + def sync_invoke(self, payload: dict) -> ExecutionResult: + + self.logging.debug(f"Invoke function {self.name}") + + # Publish payload to queue + serialized_payload = json.dumps(payload) + begin = datetime.datetime.now() + self.queue.send_message(serialized_payload) + + response = "" + while (response == ""): + response = self.result_queue.receive_message() + + end = datetime.datetime.now() + + result = ExecutionResult.from_times(begin, end) + result.parse_benchmark_output(json.loads(response)) + return result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return { + "type": "Queue", + "name": self.name, + "queue": self.queue.serialize(), + "result_queue": self.result_queue.serialize() + } + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return QueueTrigger( + obj["name"], + None, + SQS.deserialize(obj["queue"]), + SQS.deserialize(obj["result_queue"]) + ) + + +class StorageTrigger(Trigger): + def __init__( + self, + fname: str, + deployment_client: Optional[AWS] = None, + bucket_name: Optional[str] = None, + result_queue: Optional[SQS] = None + ): + super().__init__() + self.name = fname + + self._deployment_client = None + self._bucket_name = None + self._result_queue = None + + if deployment_client: + self._deployment_client = deployment_client + if bucket_name: + self._bucket_name = bucket_name + if result_queue: + self._result_queue = result_queue + + # When creating the trigger for the first time, also create and store + # storage bucket information. + if not self._bucket_name: + # Init clients + s3 = boto3.resource("s3") + lambda_client = self.deployment_client.get_lambda_client() + + # AWS disallows underscores in bucket names + self._bucket_name = self.name.replace("_", "-") + function_arn = lambda_client.get_function(FunctionName=self.name)["Configuration"][ + "FunctionArn" + ] + + # Create bucket + self.logging.info(f"Creating bucket {self.bucket_name}") + + region = self.deployment_client.config.region + if region == "us-east-1": + s3.create_bucket(Bucket=self.bucket_name) + else: + s3.create_bucket( + Bucket=self.bucket_name, + CreateBucketConfiguration={"LocationConstraint": region}, + ) + + self.logging.info("Created bucket") + + lambda_client.add_permission( + FunctionName=self.name, + StatementId=str(uuid.uuid1()), + Action="lambda:InvokeFunction", + Principal="s3.amazonaws.com", + SourceArn=f"arn:aws:s3:::{self.bucket_name}", + ) + + # Add bucket trigger + bucket_notification = s3.BucketNotification(self.bucket_name) + bucket_notification.put( + NotificationConfiguration={ + "LambdaFunctionConfigurations": [ + { + "LambdaFunctionArn": function_arn, + "Events": ["s3:ObjectCreated:*"], + }, + ] + } + ) + + # Create result queue for communicating benchmark results back to the + # client. + if (not self._result_queue): + self._result_queue = SQS( + fname, + QueueType.RESULT, + self.deployment_client.config.region + ) + self._result_queue.create_queue() + + @staticmethod + def typename() -> str: + return "AWS.StorageTrigger" + + @property + def bucket_name(self) -> str: + assert self._bucket_name + return self._bucket_name + + @property + def deployment_client(self) -> AWS: + assert self._deployment_client + return self._deployment_client + + @property + def result_queue(self) -> SQS: + assert self._result_queue + return self._result_queue + + @deployment_client.setter + def deployment_client(self, deployment_client: AWS): + self._deployment_client = deployment_client + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.STORAGE + + def sync_invoke(self, payload: dict) -> ExecutionResult: + + self.logging.debug(f"Invoke function {self.name}") + + serialized_payload = json.dumps(payload) + + # Put object + s3 = boto3.resource("s3") + begin = datetime.datetime.now() + s3.Object(self.bucket_name, "payload.json").put(Body=serialized_payload) + self.logging.info(f"Uploaded payload to bucket {self.bucket_name}") + + response = "" + while (response == ""): + response = self.result_queue.receive_message() + + end = datetime.datetime.now() + + result = ExecutionResult.from_times(begin, end) + result.parse_benchmark_output(json.loads(response)) + return result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return { + "type": "Storage", + "name": self.name, + "bucket_name": self.bucket_name, + "result_queue": self.result_queue.serialize() + } + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return StorageTrigger( + obj["name"], + None, + obj["bucket_name"], + SQS.deserialize(obj["result_queue"]) + ) diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 78e45963..ebe0c618 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -37,6 +37,10 @@ class Azure(System): def name(): return "azure" + @staticmethod + def typename(): + return "Azure" + @property def config(self) -> AzureConfig: return self._config @@ -146,6 +150,61 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: self.storage.replace_existing = replace_existing return self.storage + """ + Composes the JSON config that describes the trigger and bindings configs + for a given function to be run on Azure. + + :param benchmark: + :param exec_files: the files which define and implement the function to be executed + :return: JSON dictionary containing the function configuration + """ + + def create_function_json(self, benchmark, exec_files) -> Dict: + trigger = benchmark.split("-")[-1] + + if trigger == "queue": + return { + "scriptFile": exec_files, + "entryPoint": "handler_queue", + "bindings": [ + { + "name": "msg", + "type": "queueTrigger", + "direction": "in", + "queueName": benchmark, + "connection": "AzureWebJobsStorage", + } + ], + } + elif trigger == "storage": + return { + "scriptFile": exec_files, + "entryPoint": "handler_storage", + "bindings": [ + { + "name": "blob", + "type": "blobTrigger", + "direction": "in", + "path": benchmark, + "connection": "AzureWebJobsStorage", + } + ], + } + return { # HTTP + "scriptFile": exec_files, + "entryPoint": "handler_http", + "bindings": [ + { + "authLevel": "anonymous", + "type": "httpTrigger", + "direction": "in", + "name": "req", + "methods": ["get", "post"], + }, + {"type": "http", "direction": "out", "name": "$return"}, + ], + } + # Directory structure # handler # - source files @@ -161,6 +220,7 @@ def package_code( language_version: str, benchmark: str, is_cached: bool, + trigger: Optional[Trigger.TriggerType], ) -> Tuple[str, int]: # In previous step we ran a Docker container which installed packages @@ -180,23 +240,25 @@ def package_code( source_file = os.path.join(directory, f) shutil.move(source_file, handler_dir) + func_name = ( + "{}-{}-{}-{}-{}".format( + benchmark, + language_name, + language_version, + self.config.resources.resources_id, + trigger, + ) + .replace(".", "-") + .replace("_", "-") + ) + # generate function.json - # TODO: extension to other triggers than HTTP - default_function_json = { - "scriptFile": EXEC_FILES[language_name], - "bindings": [ - { - "authLevel": "anonymous", - "type": "httpTrigger", - "direction": "in", - "name": "req", - "methods": ["get", "post"], - }, - {"type": "http", "direction": "out", "name": "$return"}, - ], - } json_out = os.path.join(directory, "handler", "function.json") - json.dump(default_function_json, open(json_out, "w"), indent=2) + json.dump( + self.create_function_json(func_name, EXEC_FILES[language_name]), + open(json_out, "w"), + indent=2, + ) # generate host.json default_host_json = { @@ -291,9 +353,12 @@ def update_function(self, function: Function, code_package: Benchmark): container_dest = self._mount_function_code(code_package) url = self.publish_function(function, code_package, container_dest, True) - trigger = HTTPTrigger(url, self.config.resources.data_storage_account(self.cli_instance)) - trigger.logging_handlers = self.logging_handlers - function.add_trigger(trigger) + if function.name.endswith("http"): + trigger = HTTPTrigger( + url, self.config.resources.data_storage_account(self.cli_instance) + ) + trigger.logging_handlers = self.logging_handlers + function.add_trigger(trigger) def update_function_configuration(self, function: Function, code_package: Benchmark): # FIXME: this does nothing currently - we don't specify timeout @@ -404,7 +469,6 @@ def create_function(self, code_package: Benchmark, func_name: str) -> AzureFunct return function def cached_function(self, function: Function): - data_storage_account = self.config.resources.data_storage_account(self.cli_instance) for trigger in function.triggers_all(): azure_trigger = cast(AzureTrigger, trigger) @@ -513,12 +577,74 @@ def enforce_cold_start(self, functions: List[Function], code_package: Benchmark) time.sleep(20) """ - The only implemented trigger at the moment is HTTPTrigger. - It is automatically created for each function. + Supports HTTP, queue and storage triggers, as specified by + the user when SeBS is run. """ def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: - raise NotImplementedError() + from sebs.azure.triggers import QueueTrigger, StorageTrigger + + azure_function = cast(AzureFunction, function) + resource_group = self.config.resources.resource_group(self.cli_instance) + storage_account = azure_function.function_storage.account_name + + user_principal_name = self.cli_instance.execute("az ad user list") + + storage_account_scope = self.cli_instance.execute( + ("az storage account show --resource-group {} --name {} --query id").format( + resource_group, storage_account + ) + ) + + self.cli_instance.execute( + ( + 'az role assignment create --assignee "{}" \ + --role "Storage {} Data Contributor" \ + --scope {}' + ).format( + json.loads(user_principal_name.decode("utf-8"))[0]["userPrincipalName"], + "Queue" if trigger_type == Trigger.TriggerType.QUEUE else "Blob", + storage_account_scope.decode("utf-8"), + ) + ) + + trigger: Trigger + if trigger_type == Trigger.TriggerType.QUEUE or trigger_type == Trigger.TriggerType.STORAGE: + resource_group = self.config.resources.resource_group(self.cli_instance) + + # Set the storage account as an env var on the function. + ret = self.cli_instance.execute( + f"az functionapp config appsettings set --name {function.name} " + f" --resource-group {resource_group} " + f" --settings STORAGE_ACCOUNT={storage_account}" + ) + print(ret.decode()) + + # Connect the function app to the result queue via Service + # Connector. + ret = self.cli_instance.execute( + f"az webapp connection create storage-queue " + f" --resource-group {resource_group} " + f" --target-resource-group {resource_group} " + f" --account {storage_account} " + f" --name {function.name} " + f" --system-identity " + ) + print(ret.decode()) + + if trigger_type == Trigger.TriggerType.QUEUE: + trigger = QueueTrigger(function.name, storage_account, self.config.region) + self.logging.info(f"Created Queue trigger for {function.name} function") + elif trigger_type == Trigger.TriggerType.STORAGE: + trigger = StorageTrigger(function.name, storage_account, self.config.region) + self.logging.info(f"Created Storage trigger for {function.name} function") + else: + raise RuntimeError("Not supported!") + + trigger.logging_handlers = self.logging_handlers + function.add_trigger(trigger) + self.cache_client.update_function(function) + return trigger # diff --git a/sebs/azure/function.py b/sebs/azure/function.py index 61ef4c57..375c0b79 100644 --- a/sebs/azure/function.py +++ b/sebs/azure/function.py @@ -1,3 +1,5 @@ +from typing import cast + from sebs.azure.config import AzureResources from sebs.faas.function import Function, FunctionConfig @@ -14,6 +16,10 @@ def __init__( super().__init__(benchmark, name, code_hash, cfg) self.function_storage = function_storage + @staticmethod + def typename() -> str: + return "Azure.AzureFunction" + def serialize(self) -> dict: return { **super().serialize(), @@ -22,6 +28,9 @@ def serialize(self) -> dict: @staticmethod def deserialize(cached_config: dict) -> Function: + from sebs.faas.function import Trigger + from sebs.azure.triggers import HTTPTrigger, QueueTrigger, StorageTrigger + cfg = FunctionConfig.deserialize(cached_config["config"]) ret = AzureFunction( cached_config["name"], @@ -30,10 +39,13 @@ def deserialize(cached_config: dict) -> Function: AzureResources.Storage.deserialize(cached_config["function_storage"]), cfg, ) - from sebs.azure.triggers import HTTPTrigger - for trigger in cached_config["triggers"]: - trigger_type = {"HTTP": HTTPTrigger}.get(trigger["type"]) + trigger_type = cast( + Trigger, + {"HTTP": HTTPTrigger, "Queue": QueueTrigger, "Storage": StorageTrigger}.get( + trigger["type"] + ), + ) assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) return ret diff --git a/sebs/azure/triggers.py b/sebs/azure/triggers.py index 66be8c6d..2a2e96bc 100644 --- a/sebs/azure/triggers.py +++ b/sebs/azure/triggers.py @@ -1,8 +1,19 @@ +import base64 import concurrent.futures +import datetime +import json +import time from typing import Any, Dict, Optional # noqa +from azure.core.exceptions import ResourceExistsError +from azure.identity import DefaultAzureCredential +from azure.storage.blob import BlobServiceClient +from azure.storage.queue import QueueClient + from sebs.azure.config import AzureResources +from sebs.azure.queue import AzureQueue from sebs.faas.function import ExecutionResult, Trigger +from sebs.faas.queue import QueueType class AzureTrigger(Trigger): @@ -45,3 +56,260 @@ def serialize(self) -> dict: @staticmethod def deserialize(obj: dict) -> Trigger: return HTTPTrigger(obj["url"]) + + +class QueueTrigger(Trigger): + def __init__( + self, + fname: str, + storage_account: str, + region: str, + queue: Optional[AzureQueue] = None, + result_queue: Optional[AzureQueue] = None + ): + super().__init__() + self.name = fname + self._storage_account = storage_account + self._region = region + self._queue = queue + self._result_queue = result_queue + + if (not self._queue): + self._queue = AzureQueue( + self.name, + QueueType.TRIGGER, + self.storage_account, + self.region + ) + self.queue.create_queue() + + if (not self._result_queue): + self._result_queue = AzureQueue( + fname, + QueueType.RESULT, + storage_account, + self.region + ) + self._result_queue.create_queue() + + @staticmethod + def typename() -> str: + return "Azure.QueueTrigger" + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.QUEUE + + @property + def storage_account(self) -> str: + assert self._storage_account + return self._storage_account + + @property + def region(self) -> str: + assert self._region + return self._region + + @property + def queue(self) -> AzureQueue: + assert self._queue + return self._queue + + @property + def result_queue(self) -> AzureQueue: + assert self._result_queue + return self._result_queue + + @property + def account_url(self) -> str: + return f"https://{self.storage_account}.queue.core.windows.net" + + @property + def queue_name(self) -> str: + assert self._queue_name + return self._queue_name + + def sync_invoke(self, payload: dict) -> ExecutionResult: + + self.logging.info(f"Invoke function {self.name}") + + # Publish payload to queue + serialized_payload = base64.b64encode(json.dumps(payload).encode("utf-8")).decode("utf-8") + begin = datetime.datetime.now() + self.queue.send_message(serialized_payload) + + response = "" + while (response == ""): + response = self.result_queue.receive_message() + if (response == ""): + time.sleep(5) + + end = datetime.datetime.now() + + result = ExecutionResult.from_times(begin, end) + result.parse_benchmark_output(json.loads(response)) + return result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return { + "type": "Queue", + "name": self.name, + "storage_account": self.storage_account, + "region": self.region, + "queue": self.queue.serialize(), + "result_queue": self.result_queue.serialize() + } + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return QueueTrigger( + obj["name"], + obj["storage_account"], + obj["region"], + AzureQueue.deserialize(obj["queue"]), + AzureQueue.deserialize(obj["result_queue"]) + ) + + +class StorageTrigger(Trigger): + def __init__( + self, + fname: str, + storage_account: str, + region: str, + result_queue: Optional[AzureQueue] = None, + container_name: Optional[str] = None + ): + super().__init__() + self.name = fname + self._storage_account = storage_account + self._region = region + self._result_queue = result_queue + self._container_name = None + + if container_name: + self._container_name = container_name + else: + # Having a container name field is currently a bit contrived - it is mostly + # a device to indicate that a trigger resource exists and is cached. In the + # future, we may adopt a different convention for naming trigger resources, + # at which point this will become truly useful. + self._container_name = self.name + + # Init client + default_credential = DefaultAzureCredential() + blob_service_client = BlobServiceClient(self.account_url, credential=default_credential) + + # Create container + self.logging.info(f"Creating container {self.container_name}") + try: + blob_service_client.create_container(self.container_name) + self.logging.info("Created container") + except ResourceExistsError: + self.logging.info("Container already exists, reusing...") + + if (not self._result_queue): + self._result_queue = AzureQueue( + fname, + QueueType.RESULT, + storage_account, + self.region + ) + self._result_queue.create_queue() + + @staticmethod + def typename() -> str: + return "Azure.StorageTrigger" + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.STORAGE + + @property + def storage_account(self) -> str: + assert self._storage_account + return self._storage_account + + @property + def region(self) -> str: + assert self._region + return self._region + + @property + def result_queue(self) -> AzureQueue: + assert self._result_queue + return self._result_queue + + @property + def account_url(self) -> str: + return f"https://{self.storage_account}.blob.core.windows.net" + + @property + def container_name(self) -> str: + assert self._container_name + return self._container_name + + def sync_invoke(self, payload: dict) -> ExecutionResult: + + self.logging.info(f"Invoke function {self.name}") + + # Prepare blob + file_name = "payload.json" + with open(file_name, "w") as fp: + json.dump(payload, fp) + + # Init client + default_credential = DefaultAzureCredential() + blob_service_client = BlobServiceClient(self.account_url, credential=default_credential) + + # Upload blob + blob_client = blob_service_client.get_blob_client( + container=self.container_name, blob=file_name + ) + begin = datetime.datetime.now() + with open(file=file_name, mode="rb") as payload_data: + blob_client.upload_blob(payload_data, overwrite=True) + self.logging.info(f"Uploaded payload to container {self.container_name}") + + response = "" + while (response == ""): + time.sleep(5) + response = self.result_queue.receive_message() + + end = datetime.datetime.now() + + result = ExecutionResult.from_times(begin, end) + result.parse_benchmark_output(json.loads(response)) + return result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return { + "type": "Storage", + "name": self.name, + "storage_account": self.storage_account, + "region": self.region, + "result_queue": self.result_queue.serialize(), + "container_name": self.container_name, + } + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return StorageTrigger( + obj["name"], + obj["storage_account"], + obj["region"], + AzureQueue.deserialize(obj["result_queue"]), + obj["container_name"] + ) diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 90eed6ae..f0911708 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -5,7 +5,7 @@ import shutil import subprocess from abc import abstractmethod -from typing import Any, Callable, Dict, List, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple import docker @@ -13,6 +13,7 @@ from sebs.cache import Cache from sebs.faas.config import Resources from sebs.utils import find_benchmark, project_absolute_path, LoggingBase +# from sebs.faas.function import Trigger from sebs.faas.storage import PersistentStorage from typing import TYPE_CHECKING @@ -471,6 +472,11 @@ def recalculate_code_size(self): def build( self, deployment_build_step: Callable[[str, str, str, str, bool], Tuple[str, int]] + # TODO(oana) fix? + # self, + # deployment_build_step: Callable[ + # [str, str, str, str, bool, Optional[Trigger.TriggerType]], Tuple[str, int] + # ], ) -> Tuple[bool, str]: # Skip build if files are up to date and user didn't enforce rebuild @@ -505,6 +511,7 @@ def build( self.language_version, self.benchmark, self.is_cached_valid, + self._experiment_config.trigger, ) self.logging.info( ( diff --git a/sebs/cache.py b/sebs/cache.py index 4403e59b..0751ab90 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -162,7 +162,9 @@ def update_storage(self, deployment: str, benchmark: str, config: dict): with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: json.dump(cached_config, fp, indent=2) - def add_code_package(self, deployment_name: str, language_name: str, code_package: "Benchmark"): + def add_code_package( + self, deployment_name: str, language_name: str, code_package: "Benchmark" + ): with self._lock: language = code_package.language_name language_version = code_package.language_version diff --git a/sebs/experiments/config.py b/sebs/experiments/config.py index a5ca3f0b..51cedd52 100644 --- a/sebs/experiments/config.py +++ b/sebs/experiments/config.py @@ -1,6 +1,6 @@ from typing import Dict -from sebs.faas.function import Runtime +from sebs.faas.function import Runtime, Trigger class Config: @@ -11,6 +11,7 @@ def __init__(self): self._flags: Dict[str, bool] = {} self._experiment_configs: Dict[str, dict] = {} self._runtime = Runtime(None, None) + self._trigger: Trigger.TriggerType @property def update_code(self) -> bool: @@ -31,6 +32,10 @@ def check_flag(self, key: str) -> bool: def runtime(self) -> Runtime: return self._runtime + @property + def trigger(self) -> Trigger.TriggerType: + return self._trigger + def experiment_settings(self, name: str) -> dict: return self._experiment_configs[name] @@ -42,6 +47,7 @@ def serialize(self) -> dict: "runtime": self._runtime.serialize(), "flags": self._flags, "experiments": self._experiment_configs, + "trigger": self._trigger, } return out @@ -55,6 +61,7 @@ def deserialize(config: dict) -> "Config": cfg._download_results = config["download_results"] cfg._runtime = Runtime.deserialize(config["runtime"]) cfg._flags = config["flags"] if "flags" in config else {} + cfg._trigger = config["trigger"] if "trigger" in config else {} from sebs.experiments import ( NetworkPingPong, diff --git a/sebs/faas/function.py b/sebs/faas/function.py index c2226cee..df732360 100644 --- a/sebs/faas/function.py +++ b/sebs/faas/function.py @@ -179,6 +179,7 @@ class TriggerType(Enum): HTTP = "http" LIBRARY = "library" STORAGE = "storage" + QUEUE = "queue" @staticmethod def get(name: str) -> "Trigger.TriggerType": diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 2576a0ef..31b7e25a 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -167,6 +167,7 @@ def package_code( language_version: str, benchmark: str, is_cached: bool, + trigger: Optional[Trigger.TriggerType], ) -> Tuple[str, int]: pass diff --git a/sebs/gcp/function.py b/sebs/gcp/function.py index 6736c1ca..09cab242 100644 --- a/sebs/gcp/function.py +++ b/sebs/gcp/function.py @@ -30,7 +30,7 @@ def serialize(self) -> dict: @staticmethod def deserialize(cached_config: dict) -> "GCPFunction": from sebs.faas.function import Trigger - from sebs.gcp.triggers import LibraryTrigger, HTTPTrigger + from sebs.gcp.triggers import LibraryTrigger, HTTPTrigger, QueueTrigger, StorageTrigger cfg = FunctionConfig.deserialize(cached_config["config"]) ret = GCPFunction( @@ -43,7 +43,12 @@ def deserialize(cached_config: dict) -> "GCPFunction": for trigger in cached_config["triggers"]: trigger_type = cast( Trigger, - {"Library": LibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + { + "Library": LibraryTrigger, + "HTTP": HTTPTrigger, + "Queue": QueueTrigger, + "Storage": StorageTrigger, + }.get(trigger["type"]), ) assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index c99850b2..39874f9b 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -103,6 +103,94 @@ def get_storage( self.storage.replace_existing = replace_existing return self.storage + """ + Provide the fully qualified name of a trigger resource (queue or storage). + """ + + def get_trigger_resource_name(self, func_name: str) -> str: + trigger = func_name.split("-")[-1] + + assert trigger == "queue" or trigger == "storage" + + if trigger == "queue": + return "projects/{project_name}/topics/{topic}".format( + project_name=self.config.project_name, topic=func_name + ) + else: + return "projects/{project_name}/buckets/{bucket}".format( + project_name=self.config.project_name, bucket=func_name + ) + + """ + Trigger resources (queue, bucket) must exist on GCP before the + corresponding function is first deployed. + + This function creates the required resources and returns a dict + containing trigger information required by create_req inside of + create_function. + + :param func_name: the name of the function to be deployed, + including its trigger + + :param cached: when True, skip the creation of the actual resource + - merely create the configuration required to deploy the function. + This option is used in update_function() only. + + :return: JSON/dict with the trigger configuration required by GCP + on function creation/update + """ + + def create_trigger_resource(self, func_name: str, cached=False) -> Dict: + trigger = func_name.split("-")[-1] + + if trigger == "queue": + topic_name = self.get_trigger_resource_name(func_name) + + if not cached: + pub_sub = build("pubsub", "v1", cache_discovery=False) + + self.logging.info(f"Creating queue '{topic_name}'") + try: + pub_sub.projects().topics().create(name=topic_name).execute() + self.logging.info("Created queue") + except HttpError as http_error: + if http_error.resp.status == 409: + self.logging.info("Queue already exists, reusing...") + + return { + "eventTrigger": { + "eventType": "providers/cloud.pubsub/eventTypes/topic.publish", + "resource": topic_name, + }, + "entryPoint": "handler_queue", + } + elif trigger == "storage": + bucket_name = self.get_trigger_resource_name(func_name) + + if not cached: + storage = build("storage", "v1", cache_discovery=False) + + self.logging.info(f"Creating storage bucket '{bucket_name}'") + try: + storage.buckets().insert( + project=self.config.project_name, + body={"name": func_name}, + ).execute() + self.logging.info("Created storage bucket") + except HttpError as http_error: + if http_error.resp.status == 409: + self.logging.info("Storage bucket already exists, reusing...") + + return { + "eventTrigger": { + "eventType": "google.storage.object.finalize", + "resource": bucket_name, + }, + "entryPoint": "handler_storage", + } + # HTTP triggers do not require resource creation + return {"httpsTrigger": {}, "entryPoint": "handler_http"} + @staticmethod def default_function_name(code_package: Benchmark) -> str: # Create function name @@ -140,6 +228,7 @@ def package_code( language_version: str, benchmark: str, is_cached: bool, + trigger: Optional[Trigger.TriggerType], ) -> Tuple[str, int]: CONFIG_FILES = { @@ -159,7 +248,8 @@ def package_code( shutil.move(file, function_dir) requirements = open(os.path.join(directory, "requirements.txt"), "w") - requirements.write("google-cloud-storage") + requirements.write("google-cloud-storage\n") + requirements.write("google-cloud-pubsub") requirements.close() # rename handler function.py since in gcp it has to be caled main.py @@ -218,6 +308,10 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "GCPFuncti try: get_req.execute() except HttpError: + # Before creating the function, ensure all trigger resources (queue, + # bucket) exist on GCP. + trigger_info = self.create_trigger_resource(func_name) + create_req = ( self.function_client.projects() .locations() @@ -228,14 +322,13 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "GCPFuncti ), body={ "name": full_func_name, - "entryPoint": "handler", "runtime": code_package.language_name + language_runtime.replace(".", ""), "availableMemoryMb": memory, "timeout": str(timeout) + "s", - "httpsTrigger": {}, "ingressSettings": "ALLOW_ALL", "sourceArchiveUrl": "gs://" + code_bucket + "/" + code_prefix, - }, + } + | trigger_info, ) ) create_req.execute() @@ -284,28 +377,43 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "GCPFuncti return function def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: - from sebs.gcp.triggers import HTTPTrigger + from sebs.gcp.triggers import HTTPTrigger, QueueTrigger, StorageTrigger - if trigger_type == Trigger.TriggerType.HTTP: + location = self.config.region + project_name = self.config.project_name + full_func_name = GCP.get_full_function_name(project_name, location, function.name) + self.logging.info(f"Function {function.name} - waiting for deployment...") + our_function_req = ( + self.function_client.projects().locations().functions().get(name=full_func_name) + ) + deployed = False + while not deployed: + status_res = our_function_req.execute() + if status_res["status"] == "ACTIVE": + deployed = True + else: + time.sleep(3) + self.logging.info(f"Function {function.name} - deployed!") - location = self.config.region - project_name = self.config.project_name - full_func_name = GCP.get_full_function_name(project_name, location, function.name) - self.logging.info(f"Function {function.name} - waiting for deployment...") - our_function_req = ( - self.function_client.projects().locations().functions().get(name=full_func_name) - ) - deployed = False - while not deployed: - status_res = our_function_req.execute() - if status_res["status"] == "ACTIVE": - deployed = True - else: - time.sleep(3) - self.logging.info(f"Function {function.name} - deployed!") + trigger: Trigger + if trigger_type == Trigger.TriggerType.HTTP: invoke_url = status_res["httpsTrigger"]["url"] - trigger = HTTPTrigger(invoke_url) + self.logging.info(f"Created HTTP trigger for {function.name} function") + elif trigger_type == Trigger.TriggerType.QUEUE: + trigger = QueueTrigger( + function.name, + self.get_trigger_resource_name(function.name), + self.config.region + ) + self.logging.info(f"Created Queue trigger for {function.name} function") + elif trigger_type == Trigger.TriggerType.STORAGE: + trigger = StorageTrigger( + function.name, + self.get_trigger_resource_name(function.name), + self.config.region + ) + self.logging.info(f"Created Storage trigger for {function.name} function") else: raise RuntimeError("Not supported!") @@ -317,12 +425,20 @@ def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) def cached_function(self, function: Function): from sebs.faas.function import Trigger - from sebs.gcp.triggers import LibraryTrigger + from sebs.gcp.triggers import LibraryTrigger, QueueTrigger, StorageTrigger + gcp_trigger: Trigger for trigger in function.triggers(Trigger.TriggerType.LIBRARY): gcp_trigger = cast(LibraryTrigger, trigger) gcp_trigger.logging_handlers = self.logging_handlers gcp_trigger.deployment_client = self + for trigger in function.triggers(Trigger.TriggerType.QUEUE): + gcp_trigger = cast(QueueTrigger, trigger) + gcp_trigger.logging_handlers = self.logging_handlers + gcp_trigger.deployment_client = self + for trigger in function.triggers(Trigger.TriggerType.STORAGE): + gcp_trigger = cast(StorageTrigger, trigger) + gcp_trigger.logging_handlers = self.logging_handlers def update_function(self, function: Function, code_package: Benchmark): @@ -337,6 +453,11 @@ def update_function(self, function: Function, code_package: Benchmark): full_func_name = GCP.get_full_function_name( self.config.project_name, self.config.region, function.name ) + + # Before creating the function, ensure all trigger resources (queue, + # bucket) exist on GCP. + trigger_info = self.create_trigger_resource(function.name, cached=True) + req = ( self.function_client.projects() .locations() @@ -345,13 +466,12 @@ def update_function(self, function: Function, code_package: Benchmark): name=full_func_name, body={ "name": full_func_name, - "entryPoint": "handler", "runtime": code_package.language_name + language_runtime.replace(".", ""), "availableMemoryMb": function.config.memory, "timeout": str(function.config.timeout) + "s", - "httpsTrigger": {}, "sourceArchiveUrl": "gs://" + bucket + "/" + code_package_name, - }, + } + | trigger_info, ) ) res = req.execute() diff --git a/sebs/gcp/triggers.py b/sebs/gcp/triggers.py index 13cc3d6c..41fbe18c 100644 --- a/sebs/gcp/triggers.py +++ b/sebs/gcp/triggers.py @@ -1,11 +1,19 @@ +import base64 import concurrent.futures import datetime import json +import os import time +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError from typing import Dict, Optional # noqa +from google.cloud import storage as gcp_storage + from sebs.gcp.gcp import GCP +from sebs.gcp.queue import GCPQueue from sebs.faas.function import ExecutionResult, Trigger +from sebs.faas.queue import QueueType class LibraryTrigger(Trigger): @@ -111,3 +119,208 @@ def serialize(self) -> dict: @staticmethod def deserialize(obj: dict) -> Trigger: return HTTPTrigger(obj["url"]) + + +class QueueTrigger(Trigger): + def __init__( + self, + fname: str, + queue_name: str, + region: str, + result_queue: Optional[GCPQueue] = None + ): + super().__init__() + self.name = fname + self._queue_name = queue_name + self._region = region + self._result_queue = result_queue + + # Create result queue for communicating benchmark results back to the + # client. + if (not self._result_queue): + self._result_queue = GCPQueue( + fname, + QueueType.RESULT, + self.region + ) + self._result_queue.create_queue() + + @staticmethod + def typename() -> str: + return "GCP.QueueTrigger" + + @property + def queue_name(self) -> str: + assert self._queue_name + return self._queue_name + + @property + def region(self) -> str: + assert self._region + return self._region + + @property + def result_queue(self) -> GCPQueue: + assert self._result_queue + return self._result_queue + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.QUEUE + + def sync_invoke(self, payload: dict) -> ExecutionResult: + + self.logging.info(f"Invoke function {self.name}") + + # Init client + pub_sub = build("pubsub", "v1", cache_discovery=False) + + # Prepare payload + # GCP is very particular with data encoding... + serialized_payload = base64.b64encode(json.dumps(payload).encode("utf-8")) + + # Publish payload to queue + begin = datetime.datetime.now() + pub_sub.projects().topics().publish( + topic=self.queue_name, + body={ + "messages": [{"data": serialized_payload.decode("utf-8")}], + }, + ).execute() + + response = "" + while (response == ""): + response = self.result_queue.receive_message() + + end = datetime.datetime.now() + + result = ExecutionResult.from_times(begin, end) + result.parse_benchmark_output(json.loads(response)) + return result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return { + "type": "Queue", + "name": self.name, + "queue_name": self.queue_name, + "region": self.region, + "result_queue": self.result_queue.serialize() + } + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return QueueTrigger( + obj["name"], + obj["queue_name"], + obj["region"], + GCPQueue.deserialize(obj["result_queue"]) + ) + + +class StorageTrigger(Trigger): + def __init__( + self, + fname: str, + bucket_name: str, + region: str, + result_queue: Optional[GCPQueue] = None + ): + super().__init__() + self.name = fname + self._bucket_name = bucket_name + self._region = region + self._result_queue = result_queue + + # Create result queue for communicating benchmark results back to the + # client. + if (not self._result_queue): + self._result_queue = GCPQueue( + fname, + QueueType.RESULT, + self.region + ) + self._result_queue.create_queue() + + @staticmethod + def typename() -> str: + return "GCP.StorageTrigger" + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.STORAGE + + @property + def bucket_name(self) -> str: + assert self._bucket_name + return self._bucket_name + + @property + def region(self) -> str: + assert self._region + return self._region + + @property + def result_queue(self) -> GCPQueue: + assert self._result_queue + return self._result_queue + + def sync_invoke(self, payload: dict) -> ExecutionResult: + + self.logging.info(f"Invoke function {self.name}") + + # Init clients + client = gcp_storage.Client() + bucket_instance = client.bucket(self.name) + + # Prepare payload + file_name = "payload.json" + with open(file_name, "w") as fp: + json.dump(payload, fp) + + # Upload object + gcp_storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024 + blob = bucket_instance.blob(blob_name=file_name, chunk_size=4 * 1024 * 1024) + begin = datetime.datetime.now() + blob.upload_from_filename(file_name) + + self.logging.info(f"Uploaded payload to bucket {self.bucket_name}") + + response = "" + while (response == ""): + response = self.result_queue.receive_message() + + end = datetime.datetime.now() + + result = ExecutionResult.from_times(begin, end) + result.parse_benchmark_output(json.loads(response)) + return result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return { + "type": "Storage", + "name": self.name, + "bucket_name": self.bucket_name, + "region": self.region, + "result_queue": self.result_queue.serialize() + } + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return StorageTrigger( + obj["name"], + obj["bucket_name"], + obj["region"], + GCPQueue.deserialize(obj["result_queue"]) + ) diff --git a/sebs/local/local.py b/sebs/local/local.py index 5a4eb18f..2477f8ba 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -134,6 +134,7 @@ def package_code( language_version: str, benchmark: str, is_cached: bool, + trigger: Optional[Trigger.TriggerType], ) -> Tuple[str, int]: CONFIG_FILES = { diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 00660de9..43c9cd54 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -208,6 +208,7 @@ def package_code( language_version: str, benchmark: str, is_cached: bool, + trigger: Optional[Trigger.TriggerType], ) -> Tuple[str, int]: # Regardless of Docker image status, we need to create .zip file diff --git a/tests/aws/create_function.py b/tests/aws/create_function.py index e672cc89..bb22cfb0 100644 --- a/tests/aws/create_function.py +++ b/tests/aws/create_function.py @@ -35,8 +35,8 @@ class AWSCreateFunction(unittest.TestCase): } } package_files = { - "python": ["handler.py", "function/storage.py", "requirements.txt", '.python_packages/'], - "nodejs": ["handler.js", "function/storage.js", "package.json", "node_modules/"] + "python": ["handler.py", "function/storage.py", "function/queue.py", "requirements.txt", '.python_packages/'], + "nodejs": ["handler.js", "function/storage.js", "function/queue.js", "package.json", "node_modules/"] } benchmark = "110.dynamic-html" function_name_suffixes = []