diff --git a/.gitignore b/.gitignore index b852a25a..f118df36 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,9 @@ perf-cost* python-venv *cache* +minio-volume +scylladb-volume + # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/.mypy.ini b/.mypy.ini index a1adeaed..e202650e 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -27,6 +27,15 @@ ignore_missing_imports = True [mypy-google.cloud] ignore_missing_imports = True +[mypy-google.cloud.logging] +ignore_missing_imports = True + +[mypy-google.cloud.monitoring_v3] +ignore_missing_imports = True + +[mypy-google.cloud.storage] +ignore_missing_imports = True + [mypy-google.api_core] ignore_missing_imports = True diff --git a/benchmarks/100.webapps/110.dynamic-html/config.json b/benchmarks/100.webapps/110.dynamic-html/config.json index 69bd3a57..25254c24 100644 --- a/benchmarks/100.webapps/110.dynamic-html/config.json +++ b/benchmarks/100.webapps/110.dynamic-html/config.json @@ -1,5 +1,6 @@ { "timeout": 10, "memory": 128, - "languages": ["python", "nodejs"] + "languages": ["python", "nodejs"], + "modules": [] } diff --git a/benchmarks/100.webapps/110.dynamic-html/input.py b/benchmarks/100.webapps/110.dynamic-html/input.py index ec57a224..98dac88b 100644 --- a/benchmarks/100.webapps/110.dynamic-html/input.py +++ b/benchmarks/100.webapps/110.dynamic-html/input.py @@ -5,10 +5,7 @@ 'large': 100000 } -def buckets_count(): - return (0, 0) - -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): input_config = {'username': 'testname'} input_config['random_len'] = size_generators[size] return input_config diff --git a/benchmarks/100.webapps/120.uploader/config.json b/benchmarks/100.webapps/120.uploader/config.json index cd8566fc..cbc63567 100644 --- a/benchmarks/100.webapps/120.uploader/config.json +++ b/benchmarks/100.webapps/120.uploader/config.json @@ -1,5 +1,6 @@ { "timeout": 30, "memory": 128, - "languages": ["python", "nodejs"] + "languages": ["python", "nodejs"], + "modules": ["storage"] } diff --git a/benchmarks/100.webapps/120.uploader/input.py b/benchmarks/100.webapps/120.uploader/input.py index 19eb7d2e..ce6169cc 100644 --- a/benchmarks/100.webapps/120.uploader/input.py +++ b/benchmarks/100.webapps/120.uploader/input.py @@ -11,7 +11,7 @@ def buckets_count(): return (0, 1) -def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func, nosql_func): input_config = {'object': {}, 'bucket': {}} input_config['object']['url'] = url_generators[size] input_config['bucket']['bucket'] = benchmarks_bucket diff --git a/benchmarks/100.webapps/130.crud-api/config.json b/benchmarks/100.webapps/130.crud-api/config.json new file mode 100644 index 00000000..25c6cb05 --- /dev/null +++ b/benchmarks/100.webapps/130.crud-api/config.json @@ -0,0 +1,11 @@ +{ + "timeout": 30, + "memory": 128, + "languages": [ + "python", + "nodejs" + ], + "modules": [ + "nosql" + ] +} diff --git a/benchmarks/100.webapps/130.crud-api/input.py b/benchmarks/100.webapps/130.crud-api/input.py new file mode 100644 index 00000000..bc7c0e6f --- /dev/null +++ b/benchmarks/100.webapps/130.crud-api/input.py @@ -0,0 +1,96 @@ +import uuid + + +def allocate_nosql() -> dict: + return {"shopping_cart": {"primary_key": "cart_id", "secondary_key": "product_id"}} + + +def generate_input( + data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func, nosql_func +): + + input_config = {} + + cart_id = str(uuid.uuid4().hex) + write_cart_id = str(uuid.uuid4().hex) + + # Set initial data + + nosql_func( + "130.crud-api", + "shopping_cart", + {"name": "Gothic Game", "price": 42, "quantity": 2}, + ("cart_id", cart_id), + ("product_id", "game-gothic"), + ) + nosql_func( + "130.crud-api", + "shopping_cart", + {"name": "Gothic 2", "price": 142, "quantity": 3}, + ("cart_id", cart_id), + ("product_id", "game-gothic-2"), + ) + nosql_func( + "130.crud-api", + "shopping_cart", + {"name": "SeBS Benchmark", "price": 1000, "quantity": 1}, + ("cart_id", cart_id), + ("product_id", "sebs-benchmark"), + ) + nosql_func( + "130.crud-api", + "shopping_cart", + {"name": "Mint Linux", "price": 0, "quantity": 5}, + ("cart_id", cart_id), + ("product_id", "mint-linux"), + ) + + requests = [] + + if size == "test": + # retrieve a single entry + requests.append( + { + "route": "GET /cart/{id}", + "path": {"id": "game-gothic"}, + "body": { + "cart": cart_id, + }, + } + ) + elif size == "small": + requests.append( + { + "route": "GET /cart", + "body": { + "cart": cart_id, + }, + } + ) + elif size == "large": + # add many new entries + for i in range(5): + requests.append( + { + "route": "PUT /cart", + "body": { + "cart": write_cart_id, + "product_id": f"new-id-{i}", + "name": f"Test Item {i}", + "price": 100 * i, + "quantity": i, + }, + } + ) + requests.append( + { + "route": "GET /cart", + "body": { + "cart": write_cart_id, + }, + } + ) + + input_config["requests"] = requests + + return input_config diff --git a/benchmarks/100.webapps/130.crud-api/python/function.py b/benchmarks/100.webapps/130.crud-api/python/function.py new file mode 100644 index 00000000..0b5e0e8c --- /dev/null +++ b/benchmarks/100.webapps/130.crud-api/python/function.py @@ -0,0 +1,67 @@ +from . import nosql + +nosql_client = nosql.nosql.get_instance() + +nosql_table_name = "shopping_cart" + + +def add_product(cart_id: str, product_id: str, product_name: str, price: float, quantity: int): + + nosql_client.insert( + nosql_table_name, + ("cart_id", cart_id), + ("product_id", product_id), + {"price": price, "quantity": quantity, "name": product_name}, + ) + + +def get_products(cart_id: str, product_id: str): + return nosql_client.get(nosql_table_name, ("cart_id", cart_id), ("product_id", product_id)) + + +def query_products(cart_id: str): + + res = nosql_client.query( + nosql_table_name, + ("cart_id", cart_id), + "product_id", + ) + + products = [] + price_sum = 0 + quantity_sum = 0 + for product in res: + + products.append(product["name"]) + price_sum += product["price"] + quantity_sum += product["quantity"] + + avg_price = price_sum / quantity_sum if quantity_sum > 0 else 0.0 + + return {"products": products, "total_cost": price_sum, "avg_price": avg_price} + + +def handler(event): + + results = [] + + for request in event["requests"]: + + route = request["route"] + body = request["body"] + + if route == "PUT /cart": + add_product( + body["cart"], body["product_id"], body["name"], body["price"], body["quantity"] + ) + res = {} + elif route == "GET /cart/{id}": + res = get_products(body["cart"], request["path"]["id"]) + elif route == "GET /cart": + res = query_products(body["cart"]) + else: + raise RuntimeError(f"Unknown request route: {route}") + + results.append(res) + + return {"result": results} diff --git a/benchmarks/200.multimedia/210.thumbnailer/config.json b/benchmarks/200.multimedia/210.thumbnailer/config.json index e9fe5a45..8edb99e5 100644 --- a/benchmarks/200.multimedia/210.thumbnailer/config.json +++ b/benchmarks/200.multimedia/210.thumbnailer/config.json @@ -1,5 +1,6 @@ { "timeout": 60, "memory": 256, - "languages": ["python", "nodejs"] + "languages": ["python", "nodejs"], + "modules": ["storage"] } diff --git a/benchmarks/200.multimedia/210.thumbnailer/input.py b/benchmarks/200.multimedia/210.thumbnailer/input.py index d063e258..8943effe 100644 --- a/benchmarks/200.multimedia/210.thumbnailer/input.py +++ b/benchmarks/200.multimedia/210.thumbnailer/input.py @@ -12,7 +12,7 @@ def buckets_count(): :param output_buckets: :param upload_func: upload function taking three params(bucket_idx, key, filepath) ''' -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): for file in glob.glob(os.path.join(data_dir, '*.jpg')): img = os.path.relpath(file, data_dir) diff --git a/benchmarks/200.multimedia/220.video-processing/config.json b/benchmarks/200.multimedia/220.video-processing/config.json index 75de3d4f..94ede792 100644 --- a/benchmarks/200.multimedia/220.video-processing/config.json +++ b/benchmarks/200.multimedia/220.video-processing/config.json @@ -1,5 +1,6 @@ { "timeout": 60, "memory": 512, - "languages": ["python"] + "languages": ["python"], + "modules": ["storage"] } diff --git a/benchmarks/200.multimedia/220.video-processing/input.py b/benchmarks/200.multimedia/220.video-processing/input.py index abd02290..6da31647 100644 --- a/benchmarks/200.multimedia/220.video-processing/input.py +++ b/benchmarks/200.multimedia/220.video-processing/input.py @@ -12,7 +12,7 @@ def buckets_count(): :param output_buckets: :param upload_func: upload function taking three params(bucket_idx, key, filepath) ''' -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): for file in glob.glob(os.path.join(data_dir, '*.mp4')): img = os.path.relpath(file, data_dir) upload_func(0, img, file) diff --git a/benchmarks/300.utilities/311.compression/config.json b/benchmarks/300.utilities/311.compression/config.json index e9fe5a45..8edb99e5 100644 --- a/benchmarks/300.utilities/311.compression/config.json +++ b/benchmarks/300.utilities/311.compression/config.json @@ -1,5 +1,6 @@ { "timeout": 60, "memory": 256, - "languages": ["python", "nodejs"] + "languages": ["python", "nodejs"], + "modules": ["storage"] } diff --git a/benchmarks/300.utilities/311.compression/input.py b/benchmarks/300.utilities/311.compression/input.py index c69d68e5..5f88bc91 100644 --- a/benchmarks/300.utilities/311.compression/input.py +++ b/benchmarks/300.utilities/311.compression/input.py @@ -22,7 +22,7 @@ def upload_files(data_root, data_dir, upload_func): :param output_buckets: :param upload_func: upload function taking three params(bucket_idx, key, filepath) ''' -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): # upload different datasets datasets = [] diff --git a/benchmarks/400.inference/411.image-recognition/config.json b/benchmarks/400.inference/411.image-recognition/config.json index 75de3d4f..94ede792 100644 --- a/benchmarks/400.inference/411.image-recognition/config.json +++ b/benchmarks/400.inference/411.image-recognition/config.json @@ -1,5 +1,6 @@ { "timeout": 60, "memory": 512, - "languages": ["python"] + "languages": ["python"], + "modules": ["storage"] } diff --git a/benchmarks/400.inference/411.image-recognition/input.py b/benchmarks/400.inference/411.image-recognition/input.py index 45ea3cee..45d7215a 100644 --- a/benchmarks/400.inference/411.image-recognition/input.py +++ b/benchmarks/400.inference/411.image-recognition/input.py @@ -21,7 +21,7 @@ def upload_files(data_root, data_dir, upload_func): :param output_buckets: :param upload_func: upload function taking three params(bucket_idx, key, filepath) ''' -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): # upload model model_name = 'resnet50-19c8e357.pth' diff --git a/benchmarks/500.scientific/501.graph-pagerank/config.json b/benchmarks/500.scientific/501.graph-pagerank/config.json index 40336fd0..e80fb435 100644 --- a/benchmarks/500.scientific/501.graph-pagerank/config.json +++ b/benchmarks/500.scientific/501.graph-pagerank/config.json @@ -1,5 +1,6 @@ { "timeout": 120, "memory": 512, - "languages": ["python"] + "languages": ["python"], + "modules": [] } diff --git a/benchmarks/500.scientific/501.graph-pagerank/input.py b/benchmarks/500.scientific/501.graph-pagerank/input.py index a8f3f6c9..025c28ca 100644 --- a/benchmarks/500.scientific/501.graph-pagerank/input.py +++ b/benchmarks/500.scientific/501.graph-pagerank/input.py @@ -4,8 +4,5 @@ 'large': 100000 } -def buckets_count(): - return (0, 0) - -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): return { 'size': size_generators[size] } diff --git a/benchmarks/500.scientific/502.graph-mst/config.json b/benchmarks/500.scientific/502.graph-mst/config.json index 40336fd0..e80fb435 100644 --- a/benchmarks/500.scientific/502.graph-mst/config.json +++ b/benchmarks/500.scientific/502.graph-mst/config.json @@ -1,5 +1,6 @@ { "timeout": 120, "memory": 512, - "languages": ["python"] + "languages": ["python"], + "modules": [] } diff --git a/benchmarks/500.scientific/502.graph-mst/input.py b/benchmarks/500.scientific/502.graph-mst/input.py index a8f3f6c9..025c28ca 100644 --- a/benchmarks/500.scientific/502.graph-mst/input.py +++ b/benchmarks/500.scientific/502.graph-mst/input.py @@ -4,8 +4,5 @@ 'large': 100000 } -def buckets_count(): - return (0, 0) - -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): return { 'size': size_generators[size] } diff --git a/benchmarks/500.scientific/503.graph-bfs/config.json b/benchmarks/500.scientific/503.graph-bfs/config.json index 40336fd0..e80fb435 100644 --- a/benchmarks/500.scientific/503.graph-bfs/config.json +++ b/benchmarks/500.scientific/503.graph-bfs/config.json @@ -1,5 +1,6 @@ { "timeout": 120, "memory": 512, - "languages": ["python"] + "languages": ["python"], + "modules": [] } diff --git a/benchmarks/500.scientific/503.graph-bfs/input.py b/benchmarks/500.scientific/503.graph-bfs/input.py index a8f3f6c9..025c28ca 100644 --- a/benchmarks/500.scientific/503.graph-bfs/input.py +++ b/benchmarks/500.scientific/503.graph-bfs/input.py @@ -4,8 +4,5 @@ 'large': 100000 } -def buckets_count(): - return (0, 0) - -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): return { 'size': size_generators[size] } diff --git a/benchmarks/500.scientific/504.dna-visualisation/config.json b/benchmarks/500.scientific/504.dna-visualisation/config.json index 712e3a5e..ff297ac5 100644 --- a/benchmarks/500.scientific/504.dna-visualisation/config.json +++ b/benchmarks/500.scientific/504.dna-visualisation/config.json @@ -1,5 +1,6 @@ { "timeout": 60, "memory": 2048, - "languages": ["python"] + "languages": ["python"], + "modules": ["storage"] } diff --git a/benchmarks/500.scientific/504.dna-visualisation/input.py b/benchmarks/500.scientific/504.dna-visualisation/input.py index 3f13010f..a9f376ea 100644 --- a/benchmarks/500.scientific/504.dna-visualisation/input.py +++ b/benchmarks/500.scientific/504.dna-visualisation/input.py @@ -3,7 +3,7 @@ def buckets_count(): return (1, 1) -def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_func, nosql_func): for file in glob.glob(os.path.join(data_dir, '*.fasta')): data = os.path.relpath(file, data_dir) diff --git a/benchmarks/wrappers/aws/python/nosql.py b/benchmarks/wrappers/aws/python/nosql.py new file mode 100644 index 00000000..72bc2d9d --- /dev/null +++ b/benchmarks/wrappers/aws/python/nosql.py @@ -0,0 +1,121 @@ +from decimal import Decimal +from os import environ +from typing import List, Optional, Union, Tuple + +import boto3 + + +class nosql: + + instance: Optional["nosql"] = None + + def __init__(self): + self.client = boto3.resource("dynamodb") + self._tables = {} + + # Based on: https://github.com/boto/boto3/issues/369#issuecomment-157205696 + def _remove_decimals(self, data: dict) -> Union[dict, list, int, float]: + + if isinstance(data, list): + return [self._remove_decimals(x) for x in data] + elif isinstance(data, dict): + return {k: self._remove_decimals(v) for k, v in data.items()} + elif isinstance(data, Decimal): + if data.as_integer_ratio()[1] == 1: + return int(data) + else: + return float(data) + else: + return data + + def _get_table(self, table_name: str): + + if table_name not in self._tables: + + env_name = f"NOSQL_STORAGE_TABLE_{table_name}" + + if env_name in environ: + aws_name = environ[env_name] + self._tables[table_name] = self.client.Table(aws_name) + else: + raise RuntimeError( + f"Couldn't find an environment variable {env_name} for table {table_name}" + ) + + return self._tables[table_name] + + def insert( + self, + table_name: str, + primary_key: Tuple[str, str], + secondary_key: Tuple[str, str], + data: dict, + ): + for key in (primary_key, secondary_key): + data[key[0]] = key[1] + + self._get_table(table_name).put_item(Item=data) + + def get( + self, table_name: str, primary_key: Tuple[str, str], secondary_key: Tuple[str, str] + ) -> dict: + + data = {} + for key in (primary_key, secondary_key): + data[key[0]] = key[1] + + res = self._get_table(table_name).get_item(Key=data) + return self._remove_decimals(res["Item"]) + + def update( + self, + table_name: str, + primary_key: Tuple[str, str], + secondary_key: Tuple[str, str], + updates: dict, + ): + + key_data = {} + for key in (primary_key, secondary_key): + key_data[key[0]] = key[1] + + update_expression = "SET " + update_values = {} + update_names = {} + + # We use attribute names because DynamoDB reserves some keywords, like 'status' + for key, value in updates.items(): + + update_expression += f" #{key}_name = :{key}_value, " + update_values[f":{key}_value"] = value + update_names[f"#{key}_name"] = key + + update_expression = update_expression[:-2] + + self._get_table(table_name).update_item( + Key=key_data, + UpdateExpression=update_expression, + ExpressionAttributeValues=update_values, + ExpressionAttributeNames=update_names, + ) + + def query(self, table_name: str, primary_key: Tuple[str, str], _: str) -> List[dict]: + + res = self._get_table(table_name).query( + KeyConditionExpression=f"{primary_key[0]} = :keyvalue", + ExpressionAttributeValues={":keyvalue": primary_key[1]}, + )["Items"] + return self._remove_decimals(res) + + def delete(self, table_name: str, primary_key: Tuple[str, str], secondary_key: Tuple[str, str]): + data = {} + for key in (primary_key, secondary_key): + data[key[0]] = key[1] + + self._get_table(table_name).delete_item(Key=data) + + @staticmethod + def get_instance(): + if nosql.instance is None: + nosql.instance = nosql() + return nosql.instance diff --git a/benchmarks/wrappers/azure/python/handler.py b/benchmarks/wrappers/azure/python/handler.py index 5f7f14f2..88e44baf 100644 --- a/benchmarks/wrappers/azure/python/handler.py +++ b/benchmarks/wrappers/azure/python/handler.py @@ -4,13 +4,27 @@ import azure.functions as func +if 'NOSQL_STORAGE_DATABASE' in os.environ: + + from . import nosql + + nosql.nosql.get_instance( + os.environ['NOSQL_STORAGE_DATABASE'], + os.environ['NOSQL_STORAGE_URL'], + os.environ['NOSQL_STORAGE_CREDS'] + ) + +if 'STORAGE_CONNECTION_STRING' in os.environ: + + from . import storage + client = storage.storage.get_instance(os.environ['STORAGE_CONNECTION_STRING']) + # TODO: usual trigger # implement support for blob and others def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse: income_timestamp = datetime.datetime.now().timestamp() req_json = req.get_json() - if 'connection_string' in req_json: - os.environ['STORAGE_CONNECTION_STRING'] = req_json['connection_string'] + req_json['request-id'] = context.invocation_id req_json['income-timestamp'] = income_timestamp begin = datetime.datetime.now() diff --git a/benchmarks/wrappers/azure/python/nosql.py b/benchmarks/wrappers/azure/python/nosql.py new file mode 100644 index 00000000..f7dd9485 --- /dev/null +++ b/benchmarks/wrappers/azure/python/nosql.py @@ -0,0 +1,94 @@ +from typing import Dict, List, Optional, Tuple + +from azure.cosmos import CosmosClient, ContainerProxy + + +class nosql: + instance = None + client = None + + def __init__(self, url: str, credential: str, database: str): + self._client = CosmosClient(url=url, credential=credential) + self._db_client = self._client.get_database_client(database) + self._containers: Dict[str, ContainerProxy] = {} + + def _get_table(self, table_name: str): + + if table_name not in self._containers: + self._containers[table_name] = self._db_client.get_container_client(table_name) + + return self._containers[table_name] + + def insert( + self, + table_name: str, + primary_key: Tuple[str, str], + secondary_key: Tuple[str, str], + data: dict, + ): + + data[primary_key[0]] = primary_key[1] + # secondary key must have that name in CosmosDB + data["id"] = secondary_key[1] + + self._get_table(table_name).upsert_item(data) + + def get( + self, table_name: str, primary_key: Tuple[str, str], secondary_key: Tuple[str, str] + ) -> dict: + res = self._get_table(table_name).read_item( + item=secondary_key[1], partition_key=primary_key[1] + ) + res[secondary_key[0]] = secondary_key[1] + + return res + + def update( + self, + table_name: str, + primary_key: Tuple[str, str], + secondary_key: Tuple[str, str], + updates: dict, + ): + + ops = [] + for key, value in updates.items(): + ops.append({"op": "add", "path": f"/{key}", "value": value}) + + self._get_table(table_name).patch_item( + item=secondary_key[1], partition_key=primary_key[1], patch_operations=ops + ) + + """ + This query must involve partition key - it does not scan across partitions. + """ + + def query( + self, table_name: str, primary_key: Tuple[str, str], secondary_key_name: str + ) -> List[dict]: + + res = list( + self._get_table(table_name).query_items( + f"SELECT * FROM c WHERE c.{primary_key[0]} = '{primary_key[1]}'", + enable_cross_partition_query=False, + ) + ) + + # Emulate the kind key + for item in res: + item[secondary_key_name] = item["id"] + + return res + + def delete(self, table_name: str, primary_key: Tuple[str, str], secondary_key: Tuple[str, str]): + + self._get_table(table_name).delete_item(item=secondary_key[1], partition_key=primary_key[1]) + + @staticmethod + def get_instance( + database: Optional[str] = None, url: Optional[str] = None, credential: Optional[str] = None + ): + if nosql.instance is None: + assert database is not None and url is not None and credential is not None + nosql.instance = nosql(url, credential, database) + return nosql.instance diff --git a/benchmarks/wrappers/azure/python/storage.py b/benchmarks/wrappers/azure/python/storage.py index 74c08307..42b129c8 100644 --- a/benchmarks/wrappers/azure/python/storage.py +++ b/benchmarks/wrappers/azure/python/storage.py @@ -1,6 +1,7 @@ import os import uuid +from typing import Optional from azure.storage.blob import BlobServiceClient @@ -8,10 +9,8 @@ class storage: instance = None client = None - def __init__(self): - self.client = BlobServiceClient.from_connection_string( - os.getenv('STORAGE_CONNECTION_STRING') - ) + def __init__(self, connection_string: str): + self.client = BlobServiceClient.from_connection_string(connection_string) @staticmethod def unique_name(name): @@ -52,7 +51,9 @@ def download_stream(self, container, file): client = self.client.get_blob_client(container=container, blob=file) return client.download_blob().readall() - def get_instance(): + @staticmethod + def get_instance(connection_string: Optional[str] = None): if storage.instance is None: - storage.instance = storage() + assert connection_string is not None + storage.instance = storage(connection_string) return storage.instance diff --git a/benchmarks/wrappers/gcp/python/handler.py b/benchmarks/wrappers/gcp/python/handler.py index b9017b52..87fa4200 100644 --- a/benchmarks/wrappers/gcp/python/handler.py +++ b/benchmarks/wrappers/gcp/python/handler.py @@ -3,6 +3,14 @@ sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages')) +if 'NOSQL_STORAGE_DATABASE' in os.environ: + from function import nosql + + nosql.nosql.get_instance( + os.environ['NOSQL_STORAGE_DATABASE'] + ) + + def handler(req): income_timestamp = datetime.datetime.now().timestamp() req_id = req.headers.get('Function-Execution-Id') diff --git a/benchmarks/wrappers/gcp/python/nosql.py b/benchmarks/wrappers/gcp/python/nosql.py new file mode 100644 index 00000000..40871285 --- /dev/null +++ b/benchmarks/wrappers/gcp/python/nosql.py @@ -0,0 +1,131 @@ +from typing import List, Optional, Tuple + +from google.cloud import datastore + + +class nosql: + + instance: Optional["nosql"] = None + + """ + Each benchmark supports up to two keys - one for grouping items, + and for unique identification of each item. + + In Google Cloud Datastore, we determine different tables by using + its value for `kind` name. + + The primary key is assigned to the `kind` value. + + To implement sorting semantics, we use the ancestor relation: + the sorting key is used as the parent. + It is the assumption that all related items will have the same parent. + """ + + def __init__(self, database: str): + self._client = datastore.Client(database=database) + + def insert( + self, + table_name: str, + primary_key: Tuple[str, str], + secondary_key: Tuple[str, str], + data: dict, + ): + + parent_key = self._client.key(primary_key[0], primary_key[1]) + key = self._client.key( + # kind determines the table + table_name, + # main ID key + secondary_key[1], + # organization key + parent=parent_key, + ) + + val = datastore.Entity(key=key) + val.update(data) + self._client.put(val) + + def update( + self, + table_name: str, + primary_key: Tuple[str, str], + secondary_key: Tuple[str, str], + data: dict, + ): + # There is no direct update - we have to fetch the entire entity and manually change fields. + parent_key = self._client.key(primary_key[0], primary_key[1]) + key = self._client.key( + # kind determines the table + table_name, + # main ID key + secondary_key[1], + # organization key + parent=parent_key, + ) + res = self._client.get(key) + if res is None: + res = datastore.Entity(key=key) + res.update(data) + self._client.put(res) + + def get( + self, table_name: str, primary_key: Tuple[str, str], secondary_key: Tuple[str, str] + ) -> Optional[dict]: + + parent_key = self._client.key(primary_key[0], primary_key[1]) + key = self._client.key( + # kind determines the table + table_name, + # main ID key + secondary_key[1], + # organization key + parent=parent_key, + ) + + res = self._client.get(key) + if res is None: + return None + + # Emulate the kind key + res[secondary_key[0]] = secondary_key[1] + + return res + + """ + This query must involve partition key - it does not scan across partitions. + """ + + def query( + self, table_name: str, primary_key: Tuple[str, str], secondary_key_name: str + ) -> List[dict]: + + ancestor = self._client.key(primary_key[0], primary_key[1]) + query = self._client.query(kind=table_name, ancestor=ancestor) + res = list(query.fetch()) + + # Emulate the kind key + for item in res: + item[secondary_key_name] = item.key.name + + return res + + def delete(self, table_name: str, primary_key: Tuple[str, str], secondary_key: Tuple[str, str]): + parent_key = self._client.key(primary_key[0], primary_key[1]) + key = self._client.key( + # kind determines the table + table_name, + # main ID key + secondary_key[1], + # organization key + parent=parent_key, + ) + + return self._client.delete(key) + + @staticmethod + def get_instance(database: Optional[str] = None): + if nosql.instance is None: + assert database is not None + nosql.instance = nosql(database) + return nosql.instance diff --git a/benchmarks/wrappers/local/python/nosql.py b/benchmarks/wrappers/local/python/nosql.py new file mode 100644 index 00000000..0e816954 --- /dev/null +++ b/benchmarks/wrappers/local/python/nosql.py @@ -0,0 +1,131 @@ +from decimal import Decimal +from os import environ +from typing import List, Optional, Union, Tuple + +import boto3 + + +class nosql: + + instance: Optional["nosql"] = None + + def __init__(self): + + if environ["NOSQL_STORAGE_TYPE"] != "scylladb": + raise RuntimeError(f"Unsupported NoSQL storage type: {environ['NOSQL_STORAGE_TYPE']}!") + + self.client = boto3.resource( + "dynamodb", + region_name="None", + aws_access_key_id="None", + aws_secret_access_key="None", + endpoint_url=f"http://{environ['NOSQL_STORAGE_ENDPOINT']}", + ) + self._tables = {} + + # Based on: https://github.com/boto/boto3/issues/369#issuecomment-157205696 + def _remove_decimals(self, data: dict) -> Union[dict, list, int, float]: + + if isinstance(data, list): + return [self._remove_decimals(x) for x in data] + elif isinstance(data, dict): + return {k: self._remove_decimals(v) for k, v in data.items()} + elif isinstance(data, Decimal): + if data.as_integer_ratio()[1] == 1: + return int(data) + else: + return float(data) + else: + return data + + def _get_table(self, table_name: str): + + if table_name not in self._tables: + + env_name = f"NOSQL_STORAGE_TABLE_{table_name}" + + if env_name in environ: + aws_name = environ[env_name] + self._tables[table_name] = self.client.Table(aws_name) + else: + raise RuntimeError( + f"Couldn't find an environment variable {env_name} for table {table_name}" + ) + + return self._tables[table_name] + + def insert( + self, + table_name: str, + primary_key: Tuple[str, str], + secondary_key: Tuple[str, str], + data: dict, + ): + for key in (primary_key, secondary_key): + data[key[0]] = key[1] + + self._get_table(table_name).put_item(Item=data) + + def get( + self, table_name: str, primary_key: Tuple[str, str], secondary_key: Tuple[str, str] + ) -> dict: + + data = {} + for key in (primary_key, secondary_key): + data[key[0]] = key[1] + + res = self._get_table(table_name).get_item(Key=data) + return self._remove_decimals(res["Item"]) + + def update( + self, + table_name: str, + primary_key: Tuple[str, str], + secondary_key: Tuple[str, str], + updates: dict, + ): + + key_data = {} + for key in (primary_key, secondary_key): + key_data[key[0]] = key[1] + + update_expression = "SET " + update_values = {} + update_names = {} + + # We use attribute names because DynamoDB reserves some keywords, like 'status' + for key, value in updates.items(): + + update_expression += f" #{key}_name = :{key}_value, " + update_values[f":{key}_value"] = value + update_names[f"#{key}_name"] = key + + update_expression = update_expression[:-2] + + self._get_table(table_name).update_item( + Key=key_data, + UpdateExpression=update_expression, + ExpressionAttributeValues=update_values, + ExpressionAttributeNames=update_names, + ) + + def query(self, table_name: str, primary_key: Tuple[str, str], _: str) -> List[dict]: + + res = self._get_table(table_name).query( + KeyConditionExpression=f"{primary_key[0]} = :keyvalue", + ExpressionAttributeValues={":keyvalue": primary_key[1]}, + )["Items"] + return self._remove_decimals(res) + + def delete(self, table_name: str, primary_key: Tuple[str, str], secondary_key: Tuple[str, str]): + data = {} + for key in (primary_key, secondary_key): + data[key[0]] = key[1] + + self._get_table(table_name).delete_item(Key=data) + + @staticmethod + def get_instance(): + if nosql.instance is None: + nosql.instance = nosql() + return nosql.instance diff --git a/config/storage.json b/config/storage.json new file mode 100644 index 00000000..9ea14d31 --- /dev/null +++ b/config/storage.json @@ -0,0 +1,20 @@ +{ + "object": { + "type": "minio", + "minio": { + "mapped_port": 9011, + "version": "RELEASE.2024-07-16T23-46-41Z", + "data_volume": "minio-volume" + } + }, + "nosql": { + "type": "scylladb", + "scylladb": { + "mapped_port": 9012, + "version": "6.0", + "cpus": 1, + "memory": "750", + "data_volume": "scylladb-volume" + } + } +} diff --git a/config/systems.json b/config/systems.json index bf095d3f..caf6a4e0 100644 --- a/config/systems.json +++ b/config/systems.json @@ -32,7 +32,8 @@ "files": [ "storage.py" ], - "packages": [] + "packages": [], + "module_packages": {} } }, "nodejs": { @@ -71,9 +72,11 @@ "deployment": { "files": [ "handler.py", - "storage.py" + "storage.py", + "nosql.py" ], - "packages": [] + "packages": [], + "module_packages": {} } }, "nodejs": { @@ -112,11 +115,18 @@ "deployment": { "files": [ "handler.py", - "storage.py" + "storage.py", + "nosql.py" ], - "packages": [ - "azure-storage-blob" - ] + "packages": [], + "module_packages": { + "storage": [ + "azure-storage-blob" + ], + "nosql": [ + "azure-cosmos" + ] + } } }, "nodejs": { @@ -165,11 +175,18 @@ "deployment": { "files": [ "handler.py", - "storage.py" + "storage.py", + "nosql.py" ], - "packages": [ - "google-cloud-storage" - ] + "packages": [], + "module_packages": { + "storage": [ + "google-cloud-storage" + ], + "nosql": [ + "google-cloud-datastore" + ] + } } }, "nodejs": { @@ -196,6 +213,11 @@ } } } + }, + "images": { + "manage": { + "username": "docker_user" + } } }, "openwhisk": { @@ -217,8 +239,12 @@ "storage.py", "setup.py" ], - "packages": { - "minio": "^5.0.10" + "packages": [], + "module_packages": { + "storage": { + "minio": "^5.0.10" + }, + "nosql": [] } } }, diff --git a/dockerfiles/gcp/Dockerfile.manage b/dockerfiles/gcp/Dockerfile.manage new file mode 100644 index 00000000..8af120b2 --- /dev/null +++ b/dockerfiles/gcp/Dockerfile.manage @@ -0,0 +1,13 @@ +FROM ubuntu:24.04 + +RUN apt-get clean && apt-get update\ + && apt-get install -y ca-certificates curl gnupg apt-transport-https\ + && curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg\ + && echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list\ + && apt-get update\ + && apt-get install -y google-cloud-cli\ + && apt-get purge -y --auto-remove curl lsb-release gnupg + +ENV GOOGLE_APPLICATION_CREDENTIALS=/credentials.json + +ENTRYPOINT ["/bin/bash"] diff --git a/docs/platforms.md b/docs/platforms.md index 27738b6e..1123fbfb 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -12,6 +12,12 @@ points for each platform. > [!WARNING] > On many platforms, credentials can be provided as environment variables or through the SeBS configuration. SeBS will not store your credentials in the cache. When saving results, SeBS stores user benchmark and experiment configuration for documentation and reproducibility, except for credentials that are erased. If you provide the credentials through JSON input configuration, do not commit nor publish these files anywhere. +Supported platforms +* [Amazon Web Services (AWS) Lambda](#aws-lambda) +* [Microsoft Azure Functions](#azure-functions) +* [Google Cloud (GCP) Functions](#google-cloud-functions) +* [OpenWhisk](#openwhisk) + ### Cloud Account Identifiers SeBS ensures that all locally cached cloud resources are valid by storing a unique identifier associated with each cloud account. Furthermore, we store this identifier in experiment results to easily match results with the cloud account or subscription that was used to obtain them. We use non-sensitive identifiers such as account IDs on AWS, subscription IDs on Azure, and Google Cloud project IDs. @@ -125,6 +131,8 @@ The Google Cloud Free Tier gives free resources. It has two parts: - Always Free, which provides limited access to many common Google Cloud resources, free of charge. You need to create an account and add [service account](https://cloud.google.com/iam/docs/service-accounts) to permit operating on storage and functions. From the cloud problem, download the cloud credentials saved as a JSON file. +You should have at least write access to **Cloud Functions** (`Cloud Functions Admin`) and **Logging** Furthermore, SeBS needs the permissions to create Firestore databases through +Google Cloud CLI tool; the `Firestore Service Agent` role allows for that. You can pass the credentials either using the default GCP-specific environment variable: diff --git a/requirements.azure.txt b/requirements.azure.txt index f7d82499..f5f8a5dc 100644 --- a/requirements.azure.txt +++ b/requirements.azure.txt @@ -1 +1,2 @@ azure-storage-blob==12.10.0 +azure-cosmos diff --git a/requirements.gcp.txt b/requirements.gcp.txt index 9cb90916..60f59150 100644 --- a/requirements.gcp.txt +++ b/requirements.gcp.txt @@ -4,3 +4,4 @@ google-api-python-client==1.12.5 google-cloud-monitoring==2.0.0 google-api-python-client-stubs google-cloud-logging==2.0.0 +google-cloud-datastore diff --git a/sebs.py b/sebs.py index ff7f7769..0395795a 100755 --- a/sebs.py +++ b/sebs.py @@ -6,15 +6,16 @@ import functools import os import traceback -from typing import cast, Optional +from typing import cast, List, Optional import click import sebs from sebs import SeBS from sebs.types import Storage as StorageTypes +from sebs.types import NoSQLStorage as NoSQLStorageTypes from sebs.regression import regression_suite -from sebs.utils import update_nested_dict, catch_interrupt +from sebs.utils import update_nested_dict, append_nested_dict, catch_interrupt from sebs.faas import System as FaaSSystem from sebs.faas.function import Trigger @@ -120,7 +121,7 @@ def parse_common_params( resource_prefix: Optional[str] = None, initialize_deployment: bool = True, ignore_cache: bool = False, - storage_configuration: Optional[str] = None + storage_configuration: Optional[List[str]] = None ): global sebs_client, deployment_client @@ -140,9 +141,13 @@ def parse_common_params( update_nested_dict(config_obj, ["experiments", "update_code"], update_code) update_nested_dict(config_obj, ["experiments", "update_storage"], update_storage) - if storage_configuration: - cfg = json.load(open(storage_configuration, 'r')) - update_nested_dict(config_obj, ["deployment", deployment, "storage"], cfg) + if storage_configuration is not None: + + for cfg_f in storage_configuration: + sebs_client.logging.info(f"Loading storage configuration from {cfg_f}") + + cfg = json.load(open(cfg_f, 'r')) + append_nested_dict(config_obj, ["deployment", deployment, "storage"], cfg) if initialize_deployment: deployment_client = sebs_client.get_deployment( @@ -206,6 +211,7 @@ def benchmark(): type=str, help="Attach prefix to generated Docker image tag.", ) +@click.option("--storage-configuration", type=str, multiple=True, help="JSON configuration of deployed storage.") @common_params def invoke( benchmark, @@ -242,12 +248,12 @@ def invoke( if timeout is not None: benchmark_obj.benchmark_config.timeout = timeout + input_config = benchmark_obj.prepare_input(deployment_client.system_resources, size=benchmark_input_size, replace_existing=experiment_config.update_storage) + func = deployment_client.get_function( benchmark_obj, function_name if function_name else deployment_client.default_function_name(benchmark_obj), ) - storage = deployment_client.get_storage(replace_existing=experiment_config.update_storage) - input_config = benchmark_obj.prepare_input(storage=storage, size=benchmark_input_size) result = sebs.experiments.ExperimentResult(experiment_config, deployment_client.config) result.begin() @@ -344,59 +350,109 @@ def regression(benchmark_input_size, benchmark_name, **kwargs): ) +""" + Storage operations have the following characteristics: + - Two operations, start and stop. + - Three options, object storage, NoSQL storage, and all. + - Port and additional settings. + + Configuration is read from a JSON. +""" + + @cli.group() def storage(): pass @storage.command("start") -@click.argument("storage", type=click.Choice([StorageTypes.MINIO])) +@click.argument("storage", type=click.Choice(["object", "nosql", "all"])) +@click.argument("config", type=click.Path(dir_okay=False, readable=True)) @click.option("--output-json", type=click.Path(dir_okay=False, writable=True), default=None) -@click.option("--port", type=int, default=9000) -def storage_start(storage, output_json, port): +def storage_start(storage, config, output_json): import docker sebs.utils.global_logging() - storage_type = sebs.SeBS.get_storage_implementation(StorageTypes(storage)) - storage_config, storage_resources = sebs.SeBS.get_storage_config_implementation(StorageTypes(storage)) - config = storage_config() - resources = storage_resources() - - storage_instance = storage_type(docker.from_env(), None, resources, True) - logging.info(f"Starting storage {str(storage)} on port {port}.") - storage_instance.start(port) + user_storage_config = json.load(open(config, 'r')) + + if storage in ["object", "all"]: + + storage_type_name = user_storage_config["object"]["type"] + storage_type_enum = StorageTypes(storage_type_name) + + storage_type = sebs.SeBS.get_storage_implementation(storage_type_enum) + storage_config = sebs.SeBS.get_storage_config_implementation(storage_type_enum) + config = storage_config.deserialize(user_storage_config["object"][storage_type_name]) + + storage_instance = storage_type(docker.from_env(), None, None, True) + storage_instance.config = config + + storage_instance.start() + + user_storage_config["object"][storage_type_name] = storage_instance.serialize() + else: + user_storage_config.pop("object") + + if storage in ["nosql", "all"]: + + storage_type_name = user_storage_config["nosql"]["type"] + storage_type_enum = NoSQLStorageTypes(storage_type_name) + + storage_type = sebs.SeBS.get_nosql_implementation(storage_type_enum) + storage_config = sebs.SeBS.get_nosql_config_implementation(storage_type_enum) + config = storage_config.deserialize(user_storage_config["nosql"][storage_type_name]) + + storage_instance = storage_type(docker.from_env(), None, config) + + storage_instance.start() + + key, value = storage_instance.serialize() + user_storage_config["nosql"][key] = value + else: + user_storage_config.pop("nosql") + if output_json: logging.info(f"Writing storage configuration to {output_json}.") with open(output_json, "w") as f: - json.dump(storage_instance.serialize(), fp=f, indent=2) + json.dump(user_storage_config, fp=f, indent=2) else: logging.info("Writing storage configuration to stdout.") - logging.info(json.dumps(storage_instance.serialize(), indent=2)) + logging.info(json.dumps(user_storage_config, indent=2)) @storage.command("stop") +@click.argument("storage", type=click.Choice(["object", "nosql", "all"])) @click.argument("input-json", type=click.Path(exists=True, dir_okay=False, readable=True)) -def storage_stop(input_json): +def storage_stop(storage, input_json): sebs.utils.global_logging() with open(input_json, "r") as f: cfg = json.load(f) - storage_type = cfg["type"] - storage_cfg, storage_resources = sebs.SeBS.get_storage_config_implementation(storage_type) - config = storage_cfg.deserialize(cfg) + if storage in ["object", "all"]: - if "resources" in cfg: - resources = storage_resources.deserialize(cfg["resources"]) - else: - resources = storage_resources() + storage_type = cfg["object"]["type"] + + storage_cfg = sebs.SeBS.get_storage_config_implementation(storage_type) + config = storage_cfg.deserialize(cfg["object"][storage_type]) logging.info(f"Stopping storage deployment of {storage_type}.") - storage = sebs.SeBS.get_storage_implementation(storage_type).deserialize(config, None, resources) + storage = sebs.SeBS.get_storage_implementation(storage_type).deserialize(config, None, None) storage.stop() logging.info(f"Stopped storage deployment of {storage_type}.") + if storage in ["nosql", "all"]: + + storage_type = cfg["nosql"]["type"] + + storage_cfg = sebs.SeBS.get_nosql_config_implementation(storage_type) + config = storage_cfg.deserialize(cfg["nosql"][storage_type]) + + logging.info(f"Stopping nosql deployment of {storage_type}.") + storage = sebs.SeBS.get_nosql_implementation(storage_type).deserialize(config, None, None) + storage.stop() + logging.info(f"Stopped nosql deployment of {storage_type}.") @cli.group() def local(): @@ -408,7 +464,7 @@ def local(): @click.argument("benchmark-input-size", type=click.Choice(["test", "small", "large"])) @click.argument("output", type=str) @click.option("--deployments", default=1, type=int, help="Number of deployed containers.") -@click.option("--storage-configuration", type=str, help="JSON configuration of deployed storage.") +@click.option("--storage-configuration", type=str, multiple=True, help="JSON configuration of deployed storage.") @click.option("--measure-interval", type=int, default=-1, help="Interval duration between memory measurements in ms.") @click.option( @@ -439,9 +495,12 @@ def start(benchmark, benchmark_input_size, output, deployments, storage_configur experiment_config, logging_filename=logging_filename, ) - storage = deployment_client.get_storage(replace_existing=experiment_config.update_storage) - result.set_storage(storage) - input_config = benchmark_obj.prepare_input(storage=storage, size=benchmark_input_size) + input_config = benchmark_obj.prepare_input( + deployment_client.system_resources, + size=benchmark_input_size, + replace_existing=experiment_config.update_storage + ) + result.set_storage(deployment_client.system_resources.get_storage()) result.add_input(input_config) for i in range(deployments): diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index 6dc70e52..232084d3 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -8,6 +8,8 @@ import boto3 import docker +from sebs.aws.dynamodb import DynamoDB +from sebs.aws.resources import AWSSystemResources from sebs.aws.s3 import S3 from sebs.aws.function import LambdaFunction from sebs.aws.config import AWSConfig @@ -18,7 +20,6 @@ from sebs.config import SeBSConfig from sebs.utils import LoggingHandlers from sebs.faas.function import Function, ExecutionResult, Trigger, FunctionConfig -from sebs.faas.storage import PersistentStorage from sebs.faas.system import System @@ -43,6 +44,10 @@ def function_type() -> "Type[Function]": def config(self) -> AWSConfig: return self._config + @property + def system_resources(self) -> AWSSystemResources: + return cast(AWSSystemResources, self._system_resources) + """ :param cache_client: Function cache instance :param config: Experiments config @@ -57,10 +62,16 @@ def __init__( docker_client: docker.client, logger_handlers: LoggingHandlers, ): - super().__init__(sebs_config, cache_client, docker_client) + super().__init__( + sebs_config, + cache_client, + docker_client, + AWSSystemResources(config, cache_client, docker_client, logger_handlers), + ) self.logging_handlers = logger_handlers self._config = config self.storage: Optional[S3] = None + self.nosql_storage: Optional[DynamoDB] = None def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None): # thread-safe @@ -69,9 +80,10 @@ def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] aws_secret_access_key=self.config.credentials.secret_key, ) self.get_lambda_client() - self.get_storage() self.initialize_resources(select_prefix=resource_prefix) + self.system_resources.initialize_session(self.session) + def get_lambda_client(self): if not hasattr(self, "client"): self.client = self.session.client( @@ -80,33 +92,6 @@ def get_lambda_client(self): ) return self.client - """ - Create a client instance for cloud storage. When benchmark and buckets - parameters are passed, then storage is initialized with required number - of buckets. Buckets may be created or retrieved from cache. - - :param benchmark: benchmark name - :param buckets: tuple of required input/output buckets - :param replace_existing: replace existing files in cached buckets? - :return: storage client - """ - - def get_storage(self, replace_existing: bool = False) -> PersistentStorage: - if not self.storage: - self.storage = S3( - self.session, - self.cache_client, - self.config.resources, - self.config.region, - access_key=self.config.credentials.access_key, - secret_key=self.config.credentials.secret_key, - replace_existing=replace_existing, - ) - self.storage.logging_handlers = self.logging_handlers - else: - self.storage.replace_existing = replace_existing - return self.storage - """ It would be sufficient to just pack the code and ship it as zip to AWS. However, to have a compatible function implementation across providers, @@ -178,7 +163,6 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFun code_size = code_package.code_size code_bucket: Optional[str] = None func_name = AWS.format_function_name(func_name) - storage_client = self.get_storage() function_cfg = FunctionConfig.from_benchmark(code_package) # we can either check for exception or use list_functions @@ -216,6 +200,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFun else: code_package_name = cast(str, os.path.basename(package)) + storage_client = self.system_resources.get_storage() code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT) code_prefix = os.path.join(benchmark, code_package_name) storage_client.upload(code_bucket, package, code_prefix) @@ -247,6 +232,9 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFun self.wait_function_active(lambda_function) + # Update environment variables + self.update_function_configuration(lambda_function, code_package) + # Add LibraryTrigger to a new function from sebs.aws.triggers import LibraryTrigger @@ -291,8 +279,8 @@ def update_function(self, function: Function, code_package: Benchmark): # Upload code package to S3, then update else: code_package_name = os.path.basename(package) - storage = cast(S3, self.get_storage()) - bucket = function.code_bucket(code_package.benchmark, storage) + storage = self.system_resources.get_storage() + bucket = function.code_bucket(code_package.benchmark, cast(S3, storage)) storage.upload(bucket, package, code_package_name) self.client.update_function_code( FunctionName=name, S3Bucket=bucket, S3Key=code_package_name @@ -300,21 +288,49 @@ def update_function(self, function: Function, code_package: Benchmark): self.wait_function_updated(function) self.logging.info(f"Updated code of {name} function. ") # and update config - self.client.update_function_configuration( - FunctionName=name, Timeout=function.config.timeout, MemorySize=function.config.memory - ) - self.wait_function_updated(function) - self.logging.info(f"Updated configuration of {name} function. ") - self.wait_function_updated(function) + self.update_function_configuration(function, code_package) + self.logging.info("Published new function code") - def update_function_configuration(self, function: Function, benchmark: Benchmark): + def update_function_configuration(self, function: Function, code_package: Benchmark): + + # We can only update storage configuration once it has been processed for this benchmark + assert code_package.has_input_processed + + envs = {} + if code_package.uses_nosql: + + nosql_storage = self.system_resources.get_nosql_storage() + for original_name, actual_name in nosql_storage.get_tables( + code_package.benchmark + ).items(): + envs[f"NOSQL_STORAGE_TABLE_{original_name}"] = actual_name + + # AWS Lambda will overwrite existing variables + # If we modify them, we need to first read existing ones and append. + if len(envs) > 0: + + response = self.client.get_function_configuration(FunctionName=function.name) + # preserve old variables while adding new ones. + # but for conflict, we select the new one + if "Environment" in response: + envs = {**response["Environment"]["Variables"], **envs} + function = cast(LambdaFunction, function) - self.client.update_function_configuration( - FunctionName=function.name, - Timeout=function.config.timeout, - MemorySize=function.config.memory, - ) + # We only update envs if anything new was added + if len(envs) > 0: + self.client.update_function_configuration( + FunctionName=function.name, + Timeout=function.config.timeout, + MemorySize=function.config.memory, + Environment={"Variables": envs}, + ) + else: + self.client.update_function_configuration( + FunctionName=function.name, + Timeout=function.config.timeout, + MemorySize=function.config.memory, + ) self.wait_function_updated(function) self.logging.info(f"Updated configuration of {function.name} function. ") diff --git a/sebs/aws/dynamodb.py b/sebs/aws/dynamodb.py new file mode 100644 index 00000000..a282a49a --- /dev/null +++ b/sebs/aws/dynamodb.py @@ -0,0 +1,152 @@ +from collections import defaultdict +from typing import Dict, Optional, Tuple + +from sebs.cache import Cache +from sebs.faas.config import Resources +from sebs.faas.nosql import NoSQLStorage + +import boto3 +from boto3.dynamodb.types import TypeSerializer + + +class DynamoDB(NoSQLStorage): + @staticmethod + def typename() -> str: + return "AWS.DynamoDB" + + @staticmethod + def deployment_name(): + return "aws" + + def __init__( + self, + session: boto3.session.Session, + cache_client: Cache, + resources: Resources, + region: str, + access_key: str, + secret_key: str, + ): + super().__init__(region, cache_client, resources) + self.client = session.client( + "dynamodb", + region_name=region, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + # Map benchmark -> orig_name -> table_name + self._tables: Dict[str, Dict[str, str]] = defaultdict(dict) + + self._serializer = TypeSerializer() + + def retrieve_cache(self, benchmark: str) -> bool: + + if benchmark in self._tables: + return True + + cached_storage = self.cache_client.get_nosql_config(self.deployment_name(), benchmark) + if cached_storage is not None: + self._tables[benchmark] = cached_storage["tables"] + return True + + return False + + def update_cache(self, benchmark: str): + + self._cache_client.update_nosql( + self.deployment_name(), + benchmark, + { + "tables": self._tables[benchmark], + }, + ) + + def get_tables(self, benchmark: str) -> Dict[str, str]: + return self._tables[benchmark] + + def _get_table_name(self, benchmark: str, table: str) -> Optional[str]: + + if benchmark not in self._tables: + return None + + if table not in self._tables[benchmark]: + return None + + return self._tables[benchmark][table] + + def writer_func( + self, + benchmark: str, + table: str, + data: dict, + primary_key: Tuple[str, str], + secondary_key: Optional[Tuple[str, str]] = None, + ): + + table_name = self._get_table_name(benchmark, table) + assert table_name is not None + + for key in (primary_key, secondary_key): + if key is not None: + data[key[0]] = key[1] + + serialized_data = {k: self._serializer.serialize(v) for k, v in data.items()} + self.client.put_item(TableName=table_name, Item=serialized_data) + + """ + AWS: create a DynamoDB Table + + In contrast to the hierarchy of database objects in Azure (account -> database -> container) + and GCP (database per benchmark), we need to create unique table names here. + """ + + def create_table( + self, benchmark: str, name: str, primary_key: str, secondary_key: Optional[str] = None + ) -> str: + + table_name = f"sebs-benchmarks-{self._cloud_resources.resources_id}-{benchmark}-{name}" + + try: + + definitions = [{"AttributeName": primary_key, "AttributeType": "S"}] + key_schema = [{"AttributeName": primary_key, "KeyType": "HASH"}] + + if secondary_key is not None: + definitions.append({"AttributeName": secondary_key, "AttributeType": "S"}) + key_schema.append({"AttributeName": secondary_key, "KeyType": "RANGE"}) + + ret = self.client.create_table( + TableName=table_name, + BillingMode="PAY_PER_REQUEST", + AttributeDefinitions=definitions, # type: ignore + KeySchema=key_schema, # type: ignore + ) + + if ret["TableDescription"]["TableStatus"] == "CREATING": + + self.logging.info(f"Waiting for creation of DynamoDB table {name}") + waiter = self.client.get_waiter("table_exists") + waiter.wait(TableName=table_name, WaiterConfig={"Delay": 1}) + + self.logging.info(f"Created DynamoDB table {name} for benchmark {benchmark}") + self._tables[benchmark][name] = table_name + + return ret["TableDescription"]["TableName"] + + except self.client.exceptions.ResourceInUseException as e: + + if "already exists" in e.response["Error"]["Message"]: + self.logging.info( + f"Using existing DynamoDB table {table_name} for benchmark {benchmark}" + ) + self._tables[benchmark][name] = table_name + return name + + raise RuntimeError(f"Creating DynamoDB failed, unknown reason! Error: {e}") + + def clear_table(self, name: str) -> str: + raise NotImplementedError() + + def remove_table(self, name: str) -> str: + raise NotImplementedError() diff --git a/sebs/aws/resources.py b/sebs/aws/resources.py new file mode 100644 index 00000000..bea9650f --- /dev/null +++ b/sebs/aws/resources.py @@ -0,0 +1,84 @@ +from typing import cast, Optional + +from sebs.aws.s3 import S3 +from sebs.aws.dynamodb import DynamoDB +from sebs.aws.config import AWSConfig +from sebs.cache import Cache +from sebs.faas.resources import SystemResources +from sebs.faas.storage import PersistentStorage +from sebs.faas.nosql import NoSQLStorage +from sebs.utils import LoggingHandlers + +import boto3 +import docker + + +class AWSSystemResources(SystemResources): + @staticmethod + def typename() -> str: + return "AWS.SystemResources" + + @property + def config(self) -> AWSConfig: + return cast(AWSConfig, self._config) + + def __init__( + self, + config: AWSConfig, + cache_client: Cache, + docker_client: docker.client, + logger_handlers: LoggingHandlers, + ): + super().__init__(config, cache_client, docker_client) + + self._session: Optional[boto3.session.Session] = None + self._logging_handlers = logger_handlers + self._storage: Optional[S3] = None + self._nosql_storage: Optional[DynamoDB] = None + + def initialize_session(self, session: boto3.session.Session): + self._session = session + + """ + Create a client instance for cloud storage. When benchmark and buckets + parameters are passed, then storage is initialized with required number + of buckets. Buckets may be created or retrieved from cache. + + :param benchmark: benchmark name + :param buckets: tuple of required input/output buckets + :param replace_existing: replace existing files in cached buckets? + :return: storage client + """ + + def get_storage(self, replace_existing: Optional[bool] = None) -> PersistentStorage: + + if not self._storage: + assert self._session is not None + self.logging.info("Initialize S3 storage instance.") + self._storage = S3( + self._session, + self._cache_client, + self.config.resources, + self.config.region, + access_key=self.config.credentials.access_key, + secret_key=self.config.credentials.secret_key, + replace_existing=replace_existing if replace_existing is not None else False, + ) + self._storage.logging_handlers = self._logging_handlers + elif replace_existing is not None: + self._storage.replace_existing = replace_existing + return self._storage + + def get_nosql_storage(self) -> NoSQLStorage: + if not self._nosql_storage: + assert self._session is not None + self.logging.info("Initialize DynamoDB NoSQL instance.") + self._nosql_storage = DynamoDB( + self._session, + self._cache_client, + self.config.resources, + self.config.region, + access_key=self.config.credentials.access_key, + secret_key=self.config.credentials.secret_key, + ) + return self._nosql_storage diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 78e45963..a29347b6 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -11,17 +11,18 @@ from sebs.azure.blob_storage import BlobStorage from sebs.azure.cli import AzureCLI +from sebs.azure.cosmosdb import CosmosDB from sebs.azure.function import AzureFunction from sebs.azure.config import AzureConfig, AzureResources +from sebs.azure.system_resources import AzureSystemResources from sebs.azure.triggers import AzureTrigger, HTTPTrigger from sebs.faas.function import Trigger from sebs.benchmark import Benchmark from sebs.cache import Cache from sebs.config import SeBSConfig from sebs.utils import LoggingHandlers, execute -from ..faas.function import Function, FunctionConfig, ExecutionResult -from ..faas.storage import PersistentStorage -from ..faas.system import System +from sebs.faas.function import Function, FunctionConfig, ExecutionResult +from sebs.faas.system import System class Azure(System): @@ -45,6 +46,10 @@ def config(self) -> AzureConfig: def function_type() -> Type[Function]: return AzureFunction + @property + def cli_instance(self) -> AzureCLI: + return cast(AzureSystemResources, self._system_resources).cli_instance + def __init__( self, sebs_config: SeBSConfig, @@ -53,14 +58,15 @@ def __init__( docker_client: docker.client, logger_handlers: LoggingHandlers, ): - super().__init__(sebs_config, cache_client, docker_client) + super().__init__( + sebs_config, + cache_client, + docker_client, + AzureSystemResources(sebs_config, config, cache_client, docker_client, logger_handlers), + ) self.logging_handlers = logger_handlers self._config = config - def initialize_cli(self, cli: AzureCLI): - self.cli_instance = cli - self.cli_instance_stop = False - """ Start the Docker container running Azure CLI tools. """ @@ -70,30 +76,11 @@ def initialize( config: Dict[str, str] = {}, resource_prefix: Optional[str] = None, ): - if not hasattr(self, "cli_instance"): - self.cli_instance = AzureCLI(self.system_config, self.docker_client) - self.cli_instance_stop = True - - output = self.cli_instance.login( - appId=self.config.credentials.appId, - tenant=self.config.credentials.tenant, - password=self.config.credentials.password, - ) - - subscriptions = json.loads(output) - if len(subscriptions) == 0: - raise RuntimeError("Didn't find any valid subscription on Azure!") - if len(subscriptions) > 1: - raise RuntimeError("Found more than one valid subscription on Azure - not supported!") - - self.config.credentials.subscription_id = subscriptions[0]["id"] - self.initialize_resources(select_prefix=resource_prefix) self.allocate_shared_resource() def shutdown(self): - if self.cli_instance and self.cli_instance_stop: - self.cli_instance.shutdown() + cast(AzureSystemResources, self._system_resources).shutdown() super().shutdown() def find_deployments(self) -> List[str]: @@ -119,33 +106,6 @@ def find_deployments(self) -> List[str]: def allocate_shared_resource(self): self.config.resources.data_storage_account(self.cli_instance) - """ - Create wrapper object for Azure blob storage. - First ensure that storage account is created and connection string - is known. Then, create wrapper and create request number of buckets. - - Requires Azure CLI instance in Docker to obtain storage account details. - - :param benchmark: - :param buckets: number of input and output buckets - :param replace_existing: when true, replace existing files in input buckets - :return: Azure storage instance - """ - - def get_storage(self, replace_existing: bool = False) -> PersistentStorage: - if not hasattr(self, "storage"): - self.storage = BlobStorage( - self.config.region, - self.cache_client, - self.config.resources, - self.config.resources.data_storage_account(self.cli_instance).connection_string, - replace_existing=replace_existing, - ) - self.storage.logging_handlers = self.logging_handlers - else: - self.storage.replace_existing = replace_existing - return self.storage - # Directory structure # handler # - source files @@ -287,13 +247,109 @@ def publish_function( def update_function(self, function: Function, code_package: Benchmark): + assert code_package.has_input_processed + # Mount code package in Docker instance container_dest = self._mount_function_code(code_package) - url = self.publish_function(function, code_package, container_dest, True) + function_url = self.publish_function(function, code_package, container_dest, True) + + envs = {} + if code_package.uses_nosql: + + nosql_storage = cast(CosmosDB, self._system_resources.get_nosql_storage()) - trigger = HTTPTrigger(url, self.config.resources.data_storage_account(self.cli_instance)) - trigger.logging_handlers = self.logging_handlers - function.add_trigger(trigger) + # If we use NoSQL, then the handle must be allocated + _, url, creds = nosql_storage.credentials() + db = nosql_storage.benchmark_database(code_package.benchmark) + envs["NOSQL_STORAGE_DATABASE"] = db + envs["NOSQL_STORAGE_URL"] = url + envs["NOSQL_STORAGE_CREDS"] = creds + + for original_name, actual_name in nosql_storage.get_tables( + code_package.benchmark + ).items(): + envs[f"NOSQL_STORAGE_TABLE_{original_name}"] = actual_name + + if code_package.uses_storage: + + envs["STORAGE_CONNECTION_STRING"] = self.config.resources.data_storage_account( + self.cli_instance + ).connection_string + + resource_group = self.config.resources.resource_group(self.cli_instance) + # Retrieve existing environment variables to prevent accidental overwrite + if len(envs) > 0: + + try: + self.logging.info( + f"Retrieving existing environment variables for function {function.name}" + ) + + # First read existing properties + response = self.cli_instance.execute( + f"az functionapp config appsettings list --name {function.name} " + f" --resource-group {resource_group} " + ) + old_envs = json.loads(response.decode()) + + # Find custom envs and copy them - unless they are overwritten now + for env in old_envs: + + # Ignore vars set automatically by Azure + found = False + for prefix in ["FUNCTIONS_", "WEBSITE_", "APPINSIGHTS_", "Azure"]: + if env["name"].startswith(prefix): + found = True + break + + # do not overwrite new value + if not found and env["name"] not in envs: + envs[env["name"]] = env["value"] + + except RuntimeError as e: + self.logging.error("Failed to retrieve environment variables!") + self.logging.error(e) + raise e + + if len(envs) > 0: + try: + env_string = "" + for k, v in envs.items(): + env_string += f" {k}={v}" + + self.logging.info(f"Exporting environment variables for function {function.name}") + self.cli_instance.execute( + f"az functionapp config appsettings set --name {function.name} " + f" --resource-group {resource_group} " + f" --settings {env_string} " + ) + + # if we don't do that, next invocation might still see old values + self.logging.info( + "Sleeping for 5 seconds - Azure needs more time to propagate changes" + ) + time.sleep(5) + + except RuntimeError as e: + self.logging.error("Failed to set environment variable!") + self.logging.error(e) + raise e + + # Avoid duplication of HTTP trigger + found_trigger = False + for trigger in function.triggers_all(): + + if isinstance(trigger, HTTPTrigger): + found_trigger = True + trigger.url = function_url + break + + if not found_trigger: + trigger = HTTPTrigger( + function_url, self.config.resources.data_storage_account(self.cli_instance) + ) + trigger.logging_handlers = self.logging_handlers + function.add_trigger(trigger) def update_function_configuration(self, function: Function, code_package: Benchmark): # FIXME: this does nothing currently - we don't specify timeout diff --git a/sebs/azure/cloud_resources.py b/sebs/azure/cloud_resources.py new file mode 100644 index 00000000..962e41d3 --- /dev/null +++ b/sebs/azure/cloud_resources.py @@ -0,0 +1,81 @@ +import json +from typing import Optional + +from sebs.azure.cli import AzureCLI + + +class CosmosDBAccount: + @property + def account_name(self) -> str: + return self._account_name + + @property + def url(self) -> str: + return self._url + + @property + def credential(self) -> str: + return self._credential + + def __init__(self, account_name: str, url: str, credential: str): + super().__init__() + self._account_name = account_name + self._url = url + self._credential = credential + + @staticmethod + def from_cache(account_name: str, url: str, credential: str) -> "CosmosDBAccount": + return CosmosDBAccount(account_name, url, credential) + + @staticmethod + def from_allocation( + account_name: str, resource_group: str, cli_instance: AzureCLI, url: Optional[str] + ) -> "CosmosDBAccount": + + if url is None: + url = CosmosDBAccount.query_url( + account_name, + resource_group, + cli_instance, + ) + + credential = CosmosDBAccount.query_credentials( + account_name, + resource_group, + cli_instance, + ) + + return CosmosDBAccount(account_name, url, credential) + + @staticmethod + def query_url(account_name: str, resource_group: str, cli_instance: AzureCLI) -> str: + + # Find the endpoint URL + ret = cli_instance.execute( + f" az cosmosdb show --name {account_name} " f" --resource-group {resource_group} " + ) + ret = json.loads(ret.decode("utf-8")) + return ret["documentEndpoint"] + + @staticmethod + def query_credentials(account_name: str, resource_group: str, cli_instance: AzureCLI) -> str: + + # Read the master key to access CosmosDB account + ret = cli_instance.execute( + f" az cosmosdb keys list --name {account_name} " f" --resource-group {resource_group} " + ) + ret = json.loads(ret.decode("utf-8")) + credential = ret["primaryMasterKey"] + + return credential + + def serialize(self) -> dict: + return { + "account_name": self._account_name, + "url": self._url, + "credential": self._credential, + } + + @staticmethod + def deserialize(obj: dict) -> "CosmosDBAccount": + return CosmosDBAccount.from_cache(obj["account_name"], obj["url"], obj["credential"]) diff --git a/sebs/azure/config.py b/sebs/azure/config.py index 73944864..9aef0d8c 100644 --- a/sebs/azure/config.py +++ b/sebs/azure/config.py @@ -3,10 +3,11 @@ import os import re import uuid -from typing import cast, Any, Dict, List, Optional # noqa +from typing import cast, Dict, List, Optional from sebs.azure.cli import AzureCLI +from sebs.azure.cloud_resources import CosmosDBAccount from sebs.cache import Cache from sebs.faas.config import Config, Credentials, Resources from sebs.utils import LoggingHandlers @@ -154,11 +155,13 @@ def __init__( resource_group: Optional[str] = None, storage_accounts: List["AzureResources.Storage"] = [], data_storage_account: Optional["AzureResources.Storage"] = None, + cosmosdb_account: Optional[CosmosDBAccount] = None, ): super().__init__(name="azure") self._resource_group = resource_group self._storage_accounts = storage_accounts self._data_storage_account = data_storage_account + self._cosmosdb_account = cosmosdb_account def set_region(self, region: str): self._region = region @@ -222,6 +225,68 @@ def delete_resource_group(self, cli_instance: AzureCLI, name: str, wait: bool = self.logging.error(ret.decode()) raise RuntimeError("Failed to delete the resource group!") + """ + Find or create a serverless CosmosDB account. + If not found, then create a new one based on the current resource ID. + Restriction: account names must be globally unique. + + Requires Azure CLI instance in Docker. + """ + + def cosmosdb_account(self, cli_instance: AzureCLI) -> CosmosDBAccount: + # Create resource group if not known + if not self._cosmosdb_account: + + # Only hyphen and alphanumeric characters are allowed + account_name = f"sebs-cosmosdb-account-{self.resources_id}" + account_name = account_name.replace("_", "-") + account_name = account_name.replace(".", "-") + + accounts = self.list_cosmosdb_accounts(cli_instance) + if account_name in accounts: + + self.logging.info("Using existing CosmosDB account {}.".format(account_name)) + url = accounts[account_name] + + else: + + try: + self.logging.info(f"Starting allocation of CosmosDB account {account_name}") + self.logging.info("This can take few minutes :-)!") + ret = cli_instance.execute( + f" az cosmosdb create --name {account_name} " + f" --resource-group {self._resource_group} " + f' --locations regionName="{self._region}" ' + " --capabilities EnableServerless " + ) + ret_values = json.loads(ret.decode()) + url = ret_values["documentEndpoint"] + self.logging.info(f"Allocated CosmosDB account {account_name}") + except Exception: + self.logging.error("Failed to parse the response!") + self.logging.error(ret.decode()) + raise RuntimeError("Failed to parse response from Azure CLI!") + + self._cosmosdb_account = CosmosDBAccount.from_allocation( + account_name, self.resource_group(cli_instance), cli_instance, url + ) + + return self._cosmosdb_account + + def list_cosmosdb_accounts(self, cli_instance: AzureCLI) -> Dict[str, str]: + + ret = cli_instance.execute( + f" az cosmosdb list --resource-group {self._resource_group} " + " --query \"[?starts_with(name,'sebs-cosmosdb-account')]\" " + ) + try: + accounts = json.loads(ret.decode()) + return {x["name"]: x["documentEndpoint"] for x in accounts} + except Exception: + self.logging.error("Failed to parse the response!") + self.logging.error(ret.decode()) + raise RuntimeError("Failed to parse response from Azure CLI!") + """ Retrieve or create storage account associated with benchmark data. Last argument allows to override the resource - useful when handling @@ -317,17 +382,23 @@ def initialize(res: Resources, dct: dict): ] else: ret._storage_accounts = [] + if "data_storage_account" in dct: ret._data_storage_account = AzureResources.Storage.deserialize( dct["data_storage_account"] ) + if "cosmosdb_account" in dct: + ret._cosmosdb_account = CosmosDBAccount.deserialize(dct["cosmosdb_account"]) + def serialize(self) -> dict: out = super().serialize() if len(self._storage_accounts) > 0: out["storage_accounts"] = [x.serialize() for x in self._storage_accounts] if self._resource_group: out["resource_group"] = self._resource_group + if self._cosmosdb_account: + out["cosmosdb_account"] = self._cosmosdb_account.serialize() if self._data_storage_account: out["data_storage_account"] = self._data_storage_account.serialize() return out diff --git a/sebs/azure/cosmosdb.py b/sebs/azure/cosmosdb.py new file mode 100644 index 00000000..d92e3d34 --- /dev/null +++ b/sebs/azure/cosmosdb.py @@ -0,0 +1,200 @@ +from dataclasses import dataclass +from typing import cast, Dict, List, Optional, Tuple + +from sebs.azure.cli import AzureCLI +from sebs.azure.cloud_resources import CosmosDBAccount +from sebs.cache import Cache +from sebs.azure.config import AzureResources +from sebs.faas.nosql import NoSQLStorage + +from azure.cosmos import CosmosClient, DatabaseProxy, PartitionKey +from azure.cosmos.exceptions import CosmosResourceNotFoundError + + +@dataclass +class BenchmarkResources: + + database: str + containers: List[str] + # We allocate this dynamically - ignore when caching + database_client: Optional[DatabaseProxy] = None + + def serialize(self) -> dict: + return {"database": self.database, "containers": self.containers} + + @staticmethod + def deserialize(config: dict) -> "BenchmarkResources": + return BenchmarkResources(database=config["database"], containers=config["containers"]) + + +class CosmosDB(NoSQLStorage): + @staticmethod + def typename() -> str: + return "Azure.CosmosDB" + + @staticmethod + def deployment_name(): + return "azure" + + def __init__(self, cli: AzureCLI, cache_client: Cache, resources: AzureResources, region: str): + super().__init__(region, cache_client, resources) + self._cli_instance = cli + self._resource_group = resources.resource_group(self._cli_instance) + + self._benchmark_resources: Dict[str, BenchmarkResources] = {} + self._cosmos_client: Optional[CosmosClient] = None + self._cosmosdb_account: Optional[CosmosDBAccount] = None + + """ + Azure requires no table mappings: the name of container is the same as benchmark name. + """ + + def get_tables(self, benchmark: str) -> Dict[str, str]: + return {} + + def _get_table_name(self, benchmark: str, table: str) -> Optional[str]: + + if benchmark not in self._benchmark_resources: + return None + + if table not in self._benchmark_resources[benchmark].containers: + return None + + return table + + def retrieve_cache(self, benchmark: str) -> bool: + + if benchmark in self._benchmark_resources: + return True + + cached_storage = self.cache_client.get_nosql_config(self.deployment_name(), benchmark) + if cached_storage is not None: + self._benchmark_resources[benchmark] = BenchmarkResources.deserialize(cached_storage) + return True + + return False + + def update_cache(self, benchmark: str): + + self._cache_client.update_nosql( + self.deployment_name(), benchmark, self._benchmark_resources[benchmark].serialize() + ) + + def cosmos_client(self) -> CosmosClient: + + if self._cosmos_client is None: + + self._cosmosdb_account = cast(AzureResources, self._cloud_resources).cosmosdb_account( + self._cli_instance + ) + + self._cosmos_client = CosmosClient( + url=self._cosmosdb_account.url, credential=self._cosmosdb_account.credential + ) + + return self._cosmos_client + + def has_tables(self, benchmark: str) -> bool: + return benchmark in self._benchmark_resources + + def benchmark_database(self, benchmark: str) -> str: + return self._benchmark_resources[benchmark].database + + def credentials(self) -> Tuple[str, str, str]: + + # An update of function that uses fully cached data will have + # to initialize it separately + # There were no prior actions that initialized this variable + if self._cosmosdb_account is None: + self._cosmosdb_account = cast(AzureResources, self._cloud_resources).cosmosdb_account( + self._cli_instance + ) + + return ( + self._cosmosdb_account.account_name, + self._cosmosdb_account.url, + self._cosmosdb_account.credential, + ) + + def writer_func( + self, + benchmark: str, + table: str, + data: dict, + primary_key: Tuple[str, str], + secondary_key: Optional[Tuple[str, str]] = None, + ): + res = self._benchmark_resources[benchmark] + table_name = self._get_table_name(benchmark, table) + assert table_name is not None + + data[primary_key[0]] = primary_key[1] + # secondary key must have that name in CosmosDB + # FIXME: support both options + assert secondary_key is not None + data["id"] = secondary_key[1] + + if res.database_client is None: + res.database_client = self.cosmos_client().get_database_client(benchmark) + + container_client = res.database_client.get_container_client(table_name) + container_client.create_item(data) + + def create_table( + self, benchmark: str, name: str, primary_key: str, _: Optional[str] = None + ) -> str: + + benchmark_resources = self._benchmark_resources.get(benchmark, None) + + if benchmark_resources is not None and name in benchmark_resources.containers: + self.logging.info(f"Using cached CosmosDB container {name}") + + """ + For some reason, creating the client is enough to verify existence of db/container. + We need to force the client to make some actions; that's why we call read. + """ + # Each benchmark receives its own CosmosDB database + if benchmark_resources is None: + + # Get or allocate database + try: + db_client = self.cosmos_client().get_database_client(benchmark) + db_client.read() + + except CosmosResourceNotFoundError: + self.logging.info(f"Creating CosmosDB database {benchmark}") + db_client = self.cosmos_client().create_database(benchmark) + + benchmark_resources = BenchmarkResources( + database=benchmark, database_client=db_client, containers=[] + ) + self._benchmark_resources[benchmark] = benchmark_resources + + if benchmark_resources.database_client is None: + # Data loaded from cache will miss database client + benchmark_resources.database_client = self.cosmos_client().get_database_client( + benchmark + ) + + try: + + # verify it exists + benchmark_resources.database_client.get_container_client(name).read() + self.logging.info(f"Using existing CosmosDB container {name}") + + except CosmosResourceNotFoundError: + self.logging.info(f"Creating CosmosDB container {name}") + # no container with such name -> allocate + benchmark_resources.database_client.create_container( + id=name, partition_key=PartitionKey(path=f"/{primary_key}") + ) + + benchmark_resources.containers.append(name) + + return name + + def clear_table(self, name: str) -> str: + raise NotImplementedError() + + def remove_table(self, name: str) -> str: + raise NotImplementedError() diff --git a/sebs/azure/system_resources.py b/sebs/azure/system_resources.py new file mode 100644 index 00000000..cd69ffe9 --- /dev/null +++ b/sebs/azure/system_resources.py @@ -0,0 +1,106 @@ +import json +from typing import cast, Optional + +from sebs.config import SeBSConfig +from sebs.azure.config import AzureConfig +from sebs.azure.blob_storage import BlobStorage +from sebs.azure.cosmosdb import CosmosDB +from sebs.azure.cli import AzureCLI +from sebs.cache import Cache +from sebs.faas.resources import SystemResources +from sebs.utils import LoggingHandlers + +import docker + + +class AzureSystemResources(SystemResources): + @staticmethod + def typename() -> str: + return "Azure.SystemResources" + + @property + def config(self) -> AzureConfig: + return cast(AzureConfig, self._config) + + def __init__( + self, + system_config: SeBSConfig, + config: AzureConfig, + cache_client: Cache, + docker_client: docker.client, + logger_handlers: LoggingHandlers, + ): + super().__init__(config, cache_client, docker_client) + + self._logging_handlers = logger_handlers + self._storage: Optional[BlobStorage] = None + self._nosql_storage: Optional[CosmosDB] = None + self._cli_instance: Optional[AzureCLI] = None + self._system_config = system_config + + """ + Create wrapper object for Azure blob storage. + First ensure that storage account is created and connection string + is known. Then, create wrapper and create request number of buckets. + + Requires Azure CLI instance in Docker to obtain storage account details. + + :param benchmark: + :param buckets: number of input and output buckets + :param replace_existing: when true, replace existing files in input buckets + :return: Azure storage instance + """ + + def get_storage(self, replace_existing: Optional[bool] = None) -> BlobStorage: + if self._storage is None: + self._storage = BlobStorage( + self.config.region, + self._cache_client, + self.config.resources, + self.config.resources.data_storage_account(self.cli_instance).connection_string, + replace_existing=replace_existing if replace_existing is not None else False, + ) + self._storage.logging_handlers = self.logging_handlers + elif replace_existing is not None: + self._storage.replace_existing = replace_existing + return self._storage + + def get_nosql_storage(self) -> CosmosDB: + if self._nosql_storage is None: + self._nosql_storage = CosmosDB( + self.cli_instance, self._cache_client, self.config.resources, self.config.region + ) + return self._nosql_storage + + @property + def cli_instance(self) -> AzureCLI: + + if self._cli_instance is None: + self._cli_instance = AzureCLI(self._system_config, self._docker_client) + self._cli_instance_stop = True + + output = self._cli_instance.login( + appId=self.config.credentials.appId, + tenant=self.config.credentials.tenant, + password=self.config.credentials.password, + ) + + subscriptions = json.loads(output) + if len(subscriptions) == 0: + raise RuntimeError("Didn't find any valid subscription on Azure!") + if len(subscriptions) > 1: + raise RuntimeError( + "Found more than one valid subscription on Azure - not supported!" + ) + + self.config.credentials.subscription_id = subscriptions[0]["id"] + + return self._cli_instance + + def initialize_cli(self, cli: AzureCLI): + self._cli_instance = cli + self._cli_instance_stop = False + + def shutdown(self) -> None: + if self._cli_instance and self._cli_instance_stop: + self._cli_instance.shutdown() diff --git a/sebs/azure/triggers.py b/sebs/azure/triggers.py index 66be8c6d..4296a588 100644 --- a/sebs/azure/triggers.py +++ b/sebs/azure/triggers.py @@ -31,7 +31,6 @@ def trigger_type() -> Trigger.TriggerType: def sync_invoke(self, payload: dict) -> ExecutionResult: - payload["connection_string"] = self.data_storage_account.connection_string return self._http_invoke(payload, self.url) def async_invoke(self, payload: dict) -> concurrent.futures.Future: diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 90eed6ae..5781476a 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -5,15 +5,16 @@ import shutil import subprocess from abc import abstractmethod -from typing import Any, Callable, Dict, List, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple import docker from sebs.config import SeBSConfig from sebs.cache import Cache from sebs.faas.config import Resources +from sebs.faas.resources import SystemResources from sebs.utils import find_benchmark, project_absolute_path, LoggingBase -from sebs.faas.storage import PersistentStorage +from sebs.types import BenchmarkModule from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -22,10 +23,13 @@ class BenchmarkConfig: - def __init__(self, timeout: int, memory: int, languages: List["Language"]): + def __init__( + self, timeout: int, memory: int, languages: List["Language"], modules: List[BenchmarkModule] + ): self._timeout = timeout self._memory = memory self._languages = languages + self._modules = modules @property def timeout(self) -> int: @@ -47,6 +51,10 @@ def memory(self, val: int): def languages(self) -> List["Language"]: return self._languages + @property + def modules(self) -> List[BenchmarkModule]: + return self._modules + # FIXME: 3.7+ python with future annotations @staticmethod def deserialize(json_object: dict) -> "BenchmarkConfig": @@ -56,6 +64,7 @@ def deserialize(json_object: dict) -> "BenchmarkConfig": json_object["timeout"], json_object["memory"], [Language.deserialize(x) for x in json_object["languages"]], + [BenchmarkModule(x) for x in json_object["modules"]], ) @@ -136,6 +145,18 @@ def language_name(self) -> str: def language_version(self): return self._language_version + @property + def has_input_processed(self) -> bool: + return self._input_processed + + @property + def uses_storage(self) -> bool: + return self._uses_storage + + @property + def uses_nosql(self) -> bool: + return self._uses_nosql + @property # noqa: A003 def hash(self): path = os.path.join(self.benchmark_path, self.language_name) @@ -189,6 +210,16 @@ def __init__( if config.update_code: self._is_cached_valid = False + # Load input module + + self._benchmark_data_path = find_benchmark(self._benchmark, "benchmarks-data") + self._benchmark_input_module = load_benchmark_input(self._benchmark_path) + + # Check if input has been processed + self._input_processed: bool = False + self._uses_storage: bool = False + self._uses_nosql: bool = False + """ Compute MD5 hash of an entire directory. """ @@ -289,14 +320,23 @@ def add_deployment_files(self, output_dir): shutil.copy2(file, os.path.join(output_dir)) def add_deployment_package_python(self, output_dir): + # append to the end of requirements file - packages = self._system_config.deployment_packages( - self._deployment_name, self.language_name - ) - if len(packages): - with open(os.path.join(output_dir, "requirements.txt"), "a") as out: - for package in packages: - out.write(package) + with open(os.path.join(output_dir, "requirements.txt"), "a") as out: + + packages = self._system_config.deployment_packages( + self._deployment_name, self.language_name + ) + for package in packages: + out.write(package) + + module_packages = self._system_config.deployment_module_packages( + self._deployment_name, self.language_name + ) + for bench_module in self._benchmark_config.modules: + if bench_module.value in module_packages: + for package in module_packages[bench_module.value]: + out.write(package) def add_deployment_package_nodejs(self, output_dir): # modify package.json @@ -537,36 +577,69 @@ def build( :param size: Benchmark workload size """ - def prepare_input(self, storage: PersistentStorage, size: str): - benchmark_data_path = find_benchmark(self._benchmark, "benchmarks-data") - mod = load_benchmark_input(self._benchmark_path) + def prepare_input( + self, system_resources: SystemResources, size: str, replace_existing: bool = False + ): - buckets = mod.buckets_count() - input, output = storage.benchmark_data(self.benchmark, buckets) + """ + Handle object storage buckets. + """ + if hasattr(self._benchmark_input_module, "buckets_count"): + + buckets = self._benchmark_input_module.buckets_count() + storage = system_resources.get_storage(replace_existing) + input, output = storage.benchmark_data(self.benchmark, buckets) + + self._uses_storage = len(input) > 0 or len(output) > 0 + + self._cache_client.update_storage( + storage.deployment_name(), + self._benchmark, + { + "buckets": { + "input": storage.input_prefixes, + "output": storage.output_prefixes, + "input_uploaded": True, + } + }, + ) + + storage_func = storage.uploader_func + bucket = storage.get_bucket(Resources.StorageBucketType.BENCHMARKS) + else: + input = [] + output = [] + storage_func = None + bucket = None + + """ + Handle key-value storage. + This part is optional - only selected benchmarks implement this. + """ + if hasattr(self._benchmark_input_module, "allocate_nosql"): + + nosql_storage = system_resources.get_nosql_storage() + for name, table_properties in self._benchmark_input_module.allocate_nosql().items(): + nosql_storage.create_benchmark_tables( + self._benchmark, + name, + table_properties["primary_key"], + table_properties.get("secondary_key"), + ) + + self._uses_nosql = True + nosql_func = nosql_storage.writer_func + else: + nosql_func = None # buckets = mod.buckets_count() # storage.allocate_buckets(self.benchmark, buckets) # Get JSON and upload data as required by benchmark - input_config = mod.generate_input( - benchmark_data_path, - size, - storage.get_bucket(Resources.StorageBucketType.BENCHMARKS), - input, - output, - storage.uploader_func, + input_config = self._benchmark_input_module.generate_input( + self._benchmark_data_path, size, bucket, input, output, storage_func, nosql_func ) - self._cache_client.update_storage( - storage.deployment_name(), - self._benchmark, - { - "buckets": { - "input": storage.input_prefixes, - "output": storage.output_prefixes, - "input_uploaded": True, - } - }, - ) + self._input_processed = True return input_config @@ -639,15 +712,23 @@ class BenchmarkModuleInterface: def buckets_count() -> Tuple[int, int]: pass + @staticmethod + @abstractmethod + def allocate_nosql() -> dict: + pass + @staticmethod @abstractmethod def generate_input( data_dir: str, size: str, - benchmarks_bucket: str, + benchmarks_bucket: Optional[str], input_paths: List[str], output_paths: List[str], - upload_func: Callable[[int, str, str], None], + upload_func: Optional[Callable[[int, str, str], None]], + nosql_func: Optional[ + Callable[[str, str, dict, Tuple[str, str], Optional[Tuple[str, str]]], None] + ], ) -> Dict[str, str]: pass diff --git a/sebs/cache.py b/sebs/cache.py index 4403e59b..95bf4288 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -148,17 +148,50 @@ def get_functions( """ def get_storage_config(self, deployment: str, benchmark: str): + return self._get_resource_config(deployment, benchmark, "storage") + + def get_nosql_config(self, deployment: str, benchmark: str): + return self._get_resource_config(deployment, benchmark, "nosql") + + def _get_resource_config(self, deployment: str, benchmark: str, resource: str): cfg = self.get_benchmark_config(deployment, benchmark) - return cfg["storage"] if cfg and "storage" in cfg and not self.ignore_storage else None + return cfg[resource] if cfg and resource in cfg and not self.ignore_storage else None def update_storage(self, deployment: str, benchmark: str, config: dict): if self.ignore_storage: return + self._update_resources(deployment, benchmark, "storage", config) + + def update_nosql(self, deployment: str, benchmark: str, config: dict): + if self.ignore_storage: + return + self._update_resources(deployment, benchmark, "nosql", config) + + def _update_resources(self, deployment: str, benchmark: str, resource: str, config: dict): + if self.ignore_storage: + return + + """ + We are now preparing benchmark data before caching function. + Thus, we have to take over a situation where the cache directory does not exist. + """ + benchmark_dir = os.path.join(self.cache_dir, benchmark) + os.makedirs(benchmark_dir, exist_ok=True) + with self._lock: - with open(os.path.join(benchmark_dir, "config.json"), "r") as fp: - cached_config = json.load(fp) - cached_config[deployment]["storage"] = config + + if os.path.exists(os.path.join(benchmark_dir, "config.json")): + with open(os.path.join(benchmark_dir, "config.json"), "r") as fp: + cached_config = json.load(fp) + else: + cached_config = {} + + if deployment in cached_config: + cached_config[deployment][resource] = config + else: + cached_config[deployment] = {resource: config} + with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: json.dump(cached_config, fp, indent=2) diff --git a/sebs/config.py b/sebs/config.py index cfafbf00..5b6d5e5d 100644 --- a/sebs/config.py +++ b/sebs/config.py @@ -26,6 +26,13 @@ def deployment_packages(self, deployment_name: str, language_name: str) -> Dict[ "packages" ] + def deployment_module_packages( + self, deployment_name: str, language_name: str + ) -> Dict[str, str]: + return self._system_config[deployment_name]["languages"][language_name]["deployment"][ + "module_packages" + ] + def deployment_files(self, deployment_name: str, language_name: str) -> List[str]: return self._system_config[deployment_name]["languages"][language_name]["deployment"][ "files" diff --git a/sebs/experiments/invocation_overhead.py b/sebs/experiments/invocation_overhead.py index 76f9a41a..d2559175 100644 --- a/sebs/experiments/invocation_overhead.py +++ b/sebs/experiments/invocation_overhead.py @@ -78,6 +78,12 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): self._benchmark = sebs_client.get_benchmark( "030.clock-synchronization", deployment_client, self.config ) + + self.benchmark_input = self._benchmark.prepare_input( + deployment_client.system_resources, size="test", replace_existing=True + ) + self._storage = deployment_client.system_resources.get_storage(replace_existing=True) + self._function = deployment_client.get_function(self._benchmark) triggers = self._function.triggers(Trigger.TriggerType.HTTP) @@ -88,8 +94,6 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): else: self._trigger = triggers[0] - self._storage = deployment_client.get_storage(replace_existing=True) - self.benchmark_input = self._benchmark.prepare_input(storage=self._storage, size="test") self._out_dir = os.path.join( sebs_client.output_dir, "invocation-overhead", self.settings["type"] ) diff --git a/sebs/experiments/network_ping_pong.py b/sebs/experiments/network_ping_pong.py index 303f6f53..6c44f848 100644 --- a/sebs/experiments/network_ping_pong.py +++ b/sebs/experiments/network_ping_pong.py @@ -28,9 +28,14 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): benchmark = sebs_client.get_benchmark( "020.network-benchmark", deployment_client, self.config ) + + self.benchmark_input = benchmark.prepare_input( + deployment_client.system_resources, size="test", replace_existing=True + ) + self._storage = deployment_client.system_resources.get_storage(replace_existing=True) + self._function = deployment_client.get_function(benchmark) - self._storage = deployment_client.get_storage(replace_existing=True) - self.benchmark_input = benchmark.prepare_input(storage=self._storage, size="test") + self._out_dir = os.path.join(sebs_client.output_dir, "network-ping-pong") if not os.path.exists(self._out_dir): # shutil.rmtree(self._out_dir) diff --git a/sebs/experiments/perf_cost.py b/sebs/experiments/perf_cost.py index 38e4d418..80ef2d34 100644 --- a/sebs/experiments/perf_cost.py +++ b/sebs/experiments/perf_cost.py @@ -46,13 +46,16 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): self._benchmark = sebs_client.get_benchmark( settings["benchmark"], deployment_client, self.config ) - self._function = deployment_client.get_function(self._benchmark) + # prepare benchmark input - self._storage = deployment_client.get_storage(replace_existing=self.config.update_storage) self._benchmark_input = self._benchmark.prepare_input( - storage=self._storage, size=settings["input-size"] + deployment_client.system_resources, + size=settings["input-size"], + replace_existing=self.config.update_storage, ) + self._function = deployment_client.get_function(self._benchmark) + # add HTTP trigger triggers = self._function.triggers(Trigger.TriggerType.HTTP) if len(triggers) == 0: diff --git a/sebs/faas/nosql.py b/sebs/faas/nosql.py new file mode 100644 index 00000000..c53013d0 --- /dev/null +++ b/sebs/faas/nosql.py @@ -0,0 +1,122 @@ +from abc import ABC +from abc import abstractmethod +from typing import Dict, Optional, Tuple + +from sebs.faas.config import Resources +from sebs.cache import Cache +from sebs.utils import LoggingBase + + +class NoSQLStorage(ABC, LoggingBase): + @staticmethod + @abstractmethod + def deployment_name() -> str: + pass + + @property + def cache_client(self) -> Cache: + return self._cache_client + + @property + def region(self): + return self._region + + def __init__(self, region: str, cache_client: Cache, resources: Resources): + super().__init__() + self._cache_client = cache_client + self._cached = False + self._region = region + self._cloud_resources = resources + + # Map benchmark -> orig_name -> table_name + # self._tables: Dict[str, Dict[str, str]] = defaultdict(dict) + + @abstractmethod + def get_tables(self, benchmark: str) -> Dict[str, str]: + pass + + @abstractmethod + def _get_table_name(self, benchmark: str, table: str) -> Optional[str]: + pass + + @abstractmethod + def retrieve_cache(self, benchmark: str) -> bool: + pass + + @abstractmethod + def update_cache(self, benchmark: str): + pass + + def envs(self) -> dict: + return {} + + """ + Each table name follow this pattern: + sebs-benchmarks-{resource_id}-{benchmark-name}-{table-name} + + Each implementation should do the following + (1) Retrieve cached data + (2) Create missing table that do not exist + (3) Update cached data if anything new was created + """ + + def create_benchmark_tables( + self, benchmark: str, name: str, primary_key: str, secondary_key: Optional[str] = None + ): + + if self.retrieve_cache(benchmark): + + table_name = self._get_table_name(benchmark, name) + if table_name is not None: + self.logging.info( + f"Using cached NoSQL table {table_name} for benchmark {benchmark}" + ) + return + + self.logging.info(f"Preparing to create a NoSQL table {name} for benchmark {benchmark}") + + self.create_table(benchmark, name, primary_key, secondary_key) + + self.update_cache(benchmark) + + """ + + AWS: DynamoDB Table + Azure: CosmosDB Container + Google Cloud: Firestore in Datastore Mode, Database + """ + + @abstractmethod + def create_table( + self, benchmark: str, name: str, primary_key: str, secondary_key: Optional[str] = None + ) -> str: + pass + + @abstractmethod + def writer_func( + self, + benchmark: str, + table: str, + data: dict, + primary_key: Tuple[str, str], + secondary_key: Optional[Tuple[str, str]] = None, + ): + pass + + """ + + AWS DynamoDB: Removing & recreating table is the cheapest & fastest option + + Azure CosmosDB: recreate container + + Google Cloud: also likely recreate + + """ + + @abstractmethod + def clear_table(self, name: str) -> str: + pass + + @abstractmethod + def remove_table(self, name: str) -> str: + pass diff --git a/sebs/faas/resources.py b/sebs/faas/resources.py new file mode 100644 index 00000000..ba57689c --- /dev/null +++ b/sebs/faas/resources.py @@ -0,0 +1,44 @@ +from abc import abstractmethod, ABC +from typing import Optional + +import docker + +from sebs.cache import Cache +from sebs.faas.config import Config +from sebs.faas.storage import PersistentStorage +from sebs.faas.nosql import NoSQLStorage +from sebs.utils import LoggingBase + + +class SystemResources(ABC, LoggingBase): + def __init__(self, config: Config, cache_client: Cache, docker_client: docker.client): + + super().__init__() + + self._config = config + self._cache_client = cache_client + self._docker_client = docker_client + + """ + Access persistent storage instance. + It might be a remote and truly persistent service (AWS S3, Azure Blob..), + or a dynamically allocated local instance. + + :param replace_existing: replace benchmark input data if exists already + """ + + @abstractmethod + def get_storage(self, replace_existing: Optional[bool] = None) -> PersistentStorage: + pass + + """ + Access persistent storage instance. + It might be a remote and truly persistent service (AWS S3, Azure Blob..), + or a dynamically allocated local instance. + + :param replace_existing: replace benchmark input data if exists already + """ + + @abstractmethod + def get_nosql_storage(self) -> NoSQLStorage: + pass diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 2576a0ef..40861c79 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -9,9 +9,9 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.faas.resources import SystemResources from sebs.faas.config import Resources from sebs.faas.function import Function, Trigger, ExecutionResult -from sebs.faas.storage import PersistentStorage from sebs.utils import LoggingBase from .config import Config @@ -30,6 +30,7 @@ def __init__( system_config: SeBSConfig, cache_client: Cache, docker_client: docker.client, + system_resources: SystemResources, ): super().__init__() self._system_config = system_config @@ -37,6 +38,8 @@ def __init__( self._cache_client = cache_client self._cold_start_counter = randrange(100) + self._system_resources = system_resources + @property def system_config(self) -> SeBSConfig: return self._system_config @@ -62,6 +65,10 @@ def cold_start_counter(self, val: int): def config(self) -> Config: pass + @property + def system_resources(self) -> SystemResources: + return self._system_resources + @staticmethod @abstractmethod def function_type() -> "Type[Function]": @@ -75,7 +82,7 @@ def find_deployments(self) -> List[str]: This can be overriden, e.g., in Azure that looks for unique """ - return self.get_storage().find_deployments() + return self.system_resources.get_storage().find_deployments() def initialize_resources(self, select_prefix: Optional[str]): @@ -119,7 +126,7 @@ def initialize_resources(self, select_prefix: Optional[str]): self.config.resources.resources_id = res_id self.logging.info(f"Generating unique resource name {res_id}") # ensure that the bucket is created - this allocates the new resource - self.get_storage().get_bucket(Resources.StorageBucketType.BENCHMARKS) + self.system_resources.get_storage().get_bucket(Resources.StorageBucketType.BENCHMARKS) """ Initialize the system. After the call the local or remote @@ -132,18 +139,6 @@ def initialize_resources(self, select_prefix: Optional[str]): def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None): pass - """ - Access persistent storage instance. - It might be a remote and truly persistent service (AWS S3, Azure Blob..), - or a dynamically allocated local instance. - - :param replace_existing: replace benchmark input data if exists already - """ - - @abstractmethod - def get_storage(self, replace_existing: bool = False) -> PersistentStorage: - pass - """ Apply the system-specific code packaging routine to build benchmark. The benchmark creates a code directory with the following structure: @@ -220,7 +215,24 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) be updated if the local version is different. """ functions = code_package.functions - if not functions or func_name not in functions: + + is_function_cached = not (not functions or func_name not in functions) + if is_function_cached: + # retrieve function + cached_function = functions[func_name] + code_location = code_package.code_location + + try: + function = self.function_type().deserialize(cached_function) + except RuntimeError as e: + + self.logging.error( + f"Cached function {cached_function['name']} is no longer available." + ) + self.logging.error(e) + is_function_cached = False + + if not is_function_cached: msg = ( "function name not provided." if not func_name @@ -237,10 +249,8 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) code_package.query_cache() return function else: - # retrieve function - cached_function = functions[func_name] - code_location = code_package.code_location - function = self.function_type().deserialize(cached_function) + + assert function is not None self.cached_function(function) self.logging.info( "Using cached function {fname} in {loc}".format(fname=func_name, loc=code_location) diff --git a/sebs/gcp/cli.py b/sebs/gcp/cli.py new file mode 100644 index 00000000..65ca33bc --- /dev/null +++ b/sebs/gcp/cli.py @@ -0,0 +1,97 @@ +import logging +import os + +import docker + +from sebs.config import SeBSConfig +from sebs.gcp.config import GCPCredentials +from sebs.utils import LoggingBase + + +class GCloudCLI(LoggingBase): + @staticmethod + def typename() -> str: + return "GCP.CLI" + + def __init__( + self, credentials: GCPCredentials, system_config: SeBSConfig, docker_client: docker.client + ): + + super().__init__() + + repo_name = system_config.docker_repository() + image_name = "manage.gcp" + try: + docker_client.images.get(repo_name + ":" + image_name) + except docker.errors.ImageNotFound: + try: + logging.info( + "Docker pull of image {repo}:{image}".format(repo=repo_name, image=image_name) + ) + docker_client.images.pull(repo_name, image_name) + except docker.errors.APIError: + raise RuntimeError("Docker pull of image {} failed!".format(image_name)) + + volumes = { + os.path.abspath(credentials.gcp_credentials): { + "bind": "/credentials.json", + "mode": "ro", + } + } + self.docker_instance = docker_client.containers.run( + image=repo_name + ":" + image_name, + volumes=volumes, + remove=True, + stdout=True, + stderr=True, + detach=True, + tty=True, + ) + self.logging.info(f"Started gcloud CLI container: {self.docker_instance.id}.") + # while True: + # try: + # dkg = self.docker_instance.logs(stream=True, follow=True) + # next(dkg).decode("utf-8") + # break + # except StopIteration: + # pass + + """ + Execute the given command in Azure CLI. + Throws an exception on failure (commands are expected to execute succesfully). + """ + + def execute(self, cmd: str): + exit_code, out = self.docker_instance.exec_run(cmd) + if exit_code != 0: + raise RuntimeError( + "Command {} failed at gcloud CLI docker!\n Output {}".format( + cmd, out.decode("utf-8") + ) + ) + return out + + """ + Run gcloud auth command on Docker instance. + + Important: we cannot run "init" as this always requires authenticating through a browser. + Instead, we authenticate as a service account. + + Setting cloud project will show a warning about missing permissions + for Cloud Resource Manager API: I don't know why, we don't seem to need it. + + Because of that, it will ask for verification to continue - which we do by passing "Y". + """ + + def login(self, project_name: str): + self.execute("gcloud auth login --cred-file=/credentials.json") + self.execute(f"/bin/bash -c 'gcloud config set project {project_name} <<< Y'") + self.logging.info("gcloud CLI login succesful") + + """ + Shuts down the Docker instance. + """ + + def shutdown(self): + self.logging.info("Stopping gcloud CLI manage Docker instance") + self.docker_instance.stop() diff --git a/sebs/gcp/datastore.py b/sebs/gcp/datastore.py new file mode 100644 index 00000000..add994fc --- /dev/null +++ b/sebs/gcp/datastore.py @@ -0,0 +1,180 @@ +from dataclasses import dataclass +from typing import Dict, List, Tuple, Optional + +from sebs.cache import Cache +from sebs.faas.config import Resources +from sebs.faas.nosql import NoSQLStorage +from sebs.gcp.cli import GCloudCLI + +from google.cloud import datastore + + +@dataclass +class BenchmarkResources: + + database: str + kinds: List[str] + # We allocate this dynamically - ignore when caching + database_client: Optional[datastore.Client] = None + + def serialize(self) -> dict: + return {"database": self.database, "kinds": self.kinds} + + @staticmethod + def deserialize(config: dict) -> "BenchmarkResources": + return BenchmarkResources(database=config["database"], kinds=config["kinds"]) + + +class Datastore(NoSQLStorage): + @staticmethod + def typename() -> str: + return "GCP.Datastore" + + @staticmethod + def deployment_name(): + return "gcp" + + def __init__( + self, cli_instance: GCloudCLI, cache_client: Cache, resources: Resources, region: str + ): + super().__init__(region, cache_client, resources) + self._cli_instance = cli_instance + self._region = region + + # Mapping: benchmark -> Datastore database + self._benchmark_resources: Dict[str, BenchmarkResources] = {} + + """ + GCP requires no table mappings: the name of "kind" is the same as benchmark name. + """ + + def get_tables(self, benchmark: str) -> Dict[str, str]: + return {} + + def _get_table_name(self, benchmark: str, table: str) -> Optional[str]: + + if benchmark not in self._benchmark_resources: + return None + + if table not in self._benchmark_resources[benchmark].kinds: + return None + + return table + + def retrieve_cache(self, benchmark: str) -> bool: + + if benchmark in self._benchmark_resources: + return True + + cached_storage = self.cache_client.get_nosql_config(self.deployment_name(), benchmark) + if cached_storage is not None: + self._benchmark_resources[benchmark] = BenchmarkResources.deserialize(cached_storage) + return True + + return False + + def update_cache(self, benchmark: str): + + self._cache_client.update_nosql( + self.deployment_name(), benchmark, self._benchmark_resources[benchmark].serialize() + ) + + def benchmark_database(self, benchmark: str) -> str: + return self._benchmark_resources[benchmark].database + + def writer_func( + self, + benchmark: str, + table: str, + data: dict, + primary_key: Tuple[str, str], + secondary_key: Optional[Tuple[str, str]] = None, + ): + + res = self._benchmark_resources[benchmark] + table_name = self._get_table_name(benchmark, table) + + # FIXME: support both options + assert secondary_key is not None + + if res.database_client is None: + res.database_client = datastore.Client(database=res.database) + + parent_key = res.database_client.key(secondary_key[0], secondary_key[1]) + key = res.database_client.key( + # kind determines the table + table_name, + # main ID key + secondary_key[1], + # organization key + parent=parent_key, + ) + + val = datastore.Entity(key=key) + val.update(data) + res.database_client.put(val) + + def create_table( + self, benchmark: str, name: str, primary_key: str, _: Optional[str] = None + ) -> str: + + benchmark_resources = self._benchmark_resources.get(benchmark, None) + + if benchmark_resources is not None and name in benchmark_resources.kinds: + self.logging.info(f"Using cached Datastore kind {name}") + return name + + """ + No data for this benchmark -> we need to allocate a new Datastore database. + """ + + if benchmark_resources is None: + + database_name = f"sebs-benchmarks-{self._cloud_resources.resources_id}-{benchmark}" + + try: + + self._cli_instance.execute( + "gcloud firestore databases describe " + f" --database='{database_name}' " + " --format='json'" + ) + + except RuntimeError as e: + + if "NOT_FOUND" in str(e): + + """ + Allocate a new Firestore database, in datastore mode + """ + + self.logging.info(f"Allocating a new Firestore database {database_name}") + self._cli_instance.execute( + "gcloud firestore databases create " + f" --database='{database_name}' " + f" --location={self.region} " + f" --type='datastore-mode' " + ) + self.logging.info(f"Allocated a new Firestore database {database_name}") + + else: + + self.logging.error("Couldn't query Datastore instances!") + self.logging.error(e) + raise RuntimeError("Couldn't query Datastore instances!") + + db_client = datastore.Client(database=database_name) + benchmark_resources = BenchmarkResources( + database=database_name, kinds=[], database_client=db_client + ) + self._benchmark_resources[benchmark] = benchmark_resources + + benchmark_resources.kinds.append(name) + + return name + + def clear_table(self, name: str) -> str: + raise NotImplementedError() + + def remove_table(self, name: str) -> str: + raise NotImplementedError() diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index c99850b2..a4669af3 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -11,16 +11,16 @@ from googleapiclient.discovery import build from googleapiclient.errors import HttpError -from google.cloud import monitoring_v3 +import google.cloud.monitoring_v3 as monitoring_v3 from sebs.cache import Cache from sebs.config import SeBSConfig from sebs.benchmark import Benchmark -from ..faas.function import Function, FunctionConfig, Trigger -from .storage import PersistentStorage +from sebs.faas.function import Function, FunctionConfig, Trigger from sebs.faas.config import Resources -from ..faas.system import System +from sebs.faas.system import System from sebs.gcp.config import GCPConfig +from sebs.gcp.resources import GCPSystemResources from sebs.gcp.storage import GCPStorage from sebs.gcp.function import GCPFunction from sebs.utils import LoggingHandlers @@ -43,9 +43,15 @@ def __init__( docker_client: docker.client, logging_handlers: LoggingHandlers, ): - super().__init__(system_config, cache_client, docker_client) + super().__init__( + system_config, + cache_client, + docker_client, + GCPSystemResources( + system_config, config, cache_client, docker_client, logging_handlers + ), + ) self._config = config - self.storage: Optional[GCPStorage] = None self.logging_handlers = logging_handlers @property @@ -74,35 +80,11 @@ def function_type() -> "Type[Function]": def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None): self.function_client = build("cloudfunctions", "v1", cache_discovery=False) - self.get_storage() self.initialize_resources(select_prefix=resource_prefix) def get_function_client(self): return self.function_client - """ - Access persistent storage instance. - It might be a remote and truly persistent service (AWS S3, Azure Blob..), - or a dynamically allocated local instance. - - :param replace_existing: replace benchmark input data if exists already - """ - - def get_storage( - self, - replace_existing: bool = False, - benchmark=None, - buckets=None, - ) -> PersistentStorage: - if not self.storage: - self.storage = GCPStorage( - self.config.region, self.cache_client, self.config.resources, replace_existing - ) - self.storage.logging_handlers = self.logging_handlers - else: - self.storage.replace_existing = replace_existing - return self.storage - @staticmethod def default_function_name(code_package: Benchmark) -> str: # Create function name @@ -200,7 +182,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "GCPFuncti timeout = code_package.benchmark_config.timeout memory = code_package.benchmark_config.memory code_bucket: Optional[str] = None - storage_client = self.get_storage() + storage_client = self._system_resources.get_storage() location = self.config.region project_name = self.config.project_name function_cfg = FunctionConfig.from_benchmark(code_package) @@ -218,6 +200,9 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "GCPFuncti try: get_req.execute() except HttpError: + + envs = self._generate_function_envs(code_package) + create_req = ( self.function_client.projects() .locations() @@ -235,6 +220,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "GCPFuncti "httpsTrigger": {}, "ingressSettings": "ALLOW_ALL", "sourceArchiveUrl": "gs://" + code_bucket + "/" + code_prefix, + "environmentVariables": envs, }, ) ) @@ -329,10 +315,13 @@ def update_function(self, function: Function, code_package: Benchmark): function = cast(GCPFunction, function) language_runtime = code_package.language_version code_package_name = os.path.basename(code_package.code_location) - storage = cast(GCPStorage, self.get_storage()) + storage = cast(GCPStorage, self._system_resources.get_storage()) bucket = function.code_bucket(code_package.benchmark, storage) storage.upload(bucket, code_package.code_location, code_package_name) + + envs = self._generate_function_envs(code_package) + self.logging.info(f"Uploaded new code package to {bucket}/{code_package_name}") full_func_name = GCP.get_full_function_name( self.config.project_name, self.config.region, function.name @@ -351,6 +340,7 @@ def update_function(self, function: Function, code_package: Benchmark): "timeout": str(function.config.timeout) + "s", "httpsTrigger": {}, "sourceArchiveUrl": "gs://" + bucket + "/" + code_package_name, + "environmentVariables": envs, }, ) ) @@ -374,24 +364,82 @@ def update_function(self, function: Function, code_package: Benchmark): ) self.logging.info("Published new function code and configuration.") - def update_function_configuration(self, function: Function, benchmark: Benchmark): + def _update_envs(self, full_function_name: str, envs: dict) -> dict: + + get_req = ( + self.function_client.projects().locations().functions().get(name=full_function_name) + ) + response = get_req.execute() + + # preserve old variables while adding new ones. + # but for conflict, we select the new one + if "environmentVariables" in response: + envs = {**response["environmentVariables"], **envs} + + return envs + + def _generate_function_envs(self, code_package: Benchmark) -> dict: + + envs = {} + if code_package.uses_nosql: + + db = ( + cast(GCPSystemResources, self._system_resources) + .get_nosql_storage() + .benchmark_database(code_package.benchmark) + ) + envs["NOSQL_STORAGE_DATABASE"] = db + + return envs + + def update_function_configuration(self, function: Function, code_package: Benchmark): + + assert code_package.has_input_processed + function = cast(GCPFunction, function) full_func_name = GCP.get_full_function_name( self.config.project_name, self.config.region, function.name ) - req = ( - self.function_client.projects() - .locations() - .functions() - .patch( - name=full_func_name, - updateMask="availableMemoryMb,timeout", - body={ - "availableMemoryMb": function.config.memory, - "timeout": str(function.config.timeout) + "s", - }, + + envs = self._generate_function_envs(code_package) + # GCP might overwrite existing variables + # If we modify them, we need to first read existing ones and append. + if len(envs) > 0: + envs = self._update_envs(full_func_name, envs) + + if len(envs) > 0: + + req = ( + self.function_client.projects() + .locations() + .functions() + .patch( + name=full_func_name, + updateMask="availableMemoryMb,timeout,environmentVariables", + body={ + "availableMemoryMb": function.config.memory, + "timeout": str(function.config.timeout) + "s", + "environmentVariables": envs, + }, + ) ) - ) + + else: + + req = ( + self.function_client.projects() + .locations() + .functions() + .patch( + name=full_func_name, + updateMask="availableMemoryMb,timeout", + body={ + "availableMemoryMb": function.config.memory, + "timeout": str(function.config.timeout) + "s", + }, + ) + ) + res = req.execute() versionId = res["metadata"]["versionId"] retries = 0 @@ -417,10 +465,13 @@ def get_full_function_name(project_name: str, location: str, func_name: str): return f"projects/{project_name}/locations/{location}/functions/{func_name}" def prepare_experiment(self, benchmark): - logs_bucket = self.storage.add_output_bucket(benchmark, suffix="logs") + logs_bucket = self._system_resources.get_storage().add_output_bucket( + benchmark, suffix="logs" + ) return logs_bucket def shutdown(self) -> None: + cast(GCPSystemResources, self._system_resources).shutdown() super().shutdown() def download_metrics( @@ -446,7 +497,7 @@ def wrapper(gen): There shouldn't be problem of waiting for complete results, since logs appear very quickly here. """ - from google.cloud import logging as gcp_logging + import google.cloud.logging as gcp_logging logging_client = gcp_logging.Client() logger = logging_client.logger("cloudfunctions.googleapis.com%2Fcloud-functions") @@ -546,7 +597,11 @@ def _enforce_cold_start(self, function: Function): name = GCP.get_full_function_name( self.config.project_name, self.config.region, function.name ) + self.cold_start_counter += 1 + envs = self._update_envs(name, {}) + envs["cold_start"] = str(self.cold_start_counter) + req = ( self.function_client.projects() .locations() @@ -554,7 +609,7 @@ def _enforce_cold_start(self, function: Function): .patch( name=name, updateMask="environmentVariables", - body={"environmentVariables": {"cold_start": str(self.cold_start_counter)}}, + body={"environmentVariables": envs}, ) ) res = req.execute() diff --git a/sebs/gcp/resources.py b/sebs/gcp/resources.py new file mode 100644 index 00000000..0a7d5c14 --- /dev/null +++ b/sebs/gcp/resources.py @@ -0,0 +1,85 @@ +from typing import cast, Optional + +from sebs.config import SeBSConfig +from sebs.gcp.config import GCPConfig +from sebs.gcp.storage import GCPStorage +from sebs.gcp.datastore import Datastore +from sebs.gcp.cli import GCloudCLI +from sebs.cache import Cache +from sebs.faas.resources import SystemResources +from sebs.utils import LoggingHandlers + +import docker + + +class GCPSystemResources(SystemResources): + @staticmethod + def typename() -> str: + return "GCP.SystemResources" + + @property + def config(self) -> GCPConfig: + return cast(GCPConfig, self._config) + + def __init__( + self, + system_config: SeBSConfig, + config: GCPConfig, + cache_client: Cache, + docker_client: docker.client, + logger_handlers: LoggingHandlers, + ): + super().__init__(config, cache_client, docker_client) + + self._logging_handlers = logger_handlers + self._storage: Optional[GCPStorage] = None + self._nosql_storage: Optional[Datastore] = None + self._cli_instance: Optional[GCloudCLI] = None + self._system_config = system_config + + """ + Access persistent storage instance. + It might be a remote and truly persistent service (AWS S3, Azure Blob..), + or a dynamically allocated local instance. + + :param replace_existing: replace benchmark input data if exists already + """ + + def get_storage(self, replace_existing: Optional[bool] = None) -> GCPStorage: + if not self._storage: + self._storage = GCPStorage( + self.config.region, + self._cache_client, + self.config.resources, + replace_existing if replace_existing is not None else False, + ) + self._storage.logging_handlers = self._logging_handlers + elif replace_existing is not None: + self._storage.replace_existing = replace_existing + return self._storage + + def get_nosql_storage(self) -> Datastore: + if not self._nosql_storage: + self._nosql_storage = Datastore( + self.cli_instance, self._cache_client, self.config.resources, self.config.region + ) + return self._nosql_storage + + @property + def cli_instance(self) -> GCloudCLI: + if self._cli_instance is None: + self._cli_instance = GCloudCLI( + self.config.credentials, self._system_config, self._docker_client + ) + self._cli_instance_stop = True + + self._cli_instance.login(self.config.credentials.project_name) + return self._cli_instance + + def initialize_cli(self, cli: GCloudCLI): + self._cli_instance = cli + self._cli_instance_stop = False + + def shutdown(self) -> None: + if self._cli_instance and self._cli_instance_stop: + self._cli_instance.shutdown() diff --git a/sebs/gcp/storage.py b/sebs/gcp/storage.py index a5b0e006..c578966f 100644 --- a/sebs/gcp/storage.py +++ b/sebs/gcp/storage.py @@ -3,7 +3,7 @@ import uuid from typing import List, Optional -from google.cloud import storage as gcp_storage +import google.cloud.storage as gcp_storage from google.api_core import exceptions from sebs.cache import Cache diff --git a/sebs/local/config.py b/sebs/local/config.py index 9cfca75d..75fa1ec7 100644 --- a/sebs/local/config.py +++ b/sebs/local/config.py @@ -1,7 +1,8 @@ -from typing import cast, Optional +from typing import cast, Optional, Tuple from sebs.cache import Cache from sebs.faas.config import Config, Credentials, Resources +from sebs.storage.config import NoSQLStorageConfig, PersistentStorageConfig, ScyllaDBConfig from sebs.storage.minio import MinioConfig from sebs.utils import LoggingHandlers @@ -22,28 +23,100 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Creden class LocalResources(Resources): - def __init__(self, storage_cfg: Optional[MinioConfig] = None): + def __init__( + self, + storage_cfg: Optional[PersistentStorageConfig] = None, + nosql_storage_cfg: Optional[NoSQLStorageConfig] = None, + ): super().__init__(name="local") - self._storage = storage_cfg + self._object_storage = storage_cfg + self._nosql_storage = nosql_storage_cfg @property - def storage_config(self) -> Optional[MinioConfig]: - return self._storage + def storage_config(self) -> Optional[PersistentStorageConfig]: + return self._object_storage + + @property + def nosql_storage_config(self) -> Optional[NoSQLStorageConfig]: + return self._nosql_storage def serialize(self) -> dict: - return {} + out: dict = {} + + if self._object_storage is not None: + out = {**out, "storage": self._object_storage.serialize()} + + if self._nosql_storage is not None: + out = {**out, "nosql": self._nosql_storage.serialize()} + + return out @staticmethod def initialize(res: Resources, cfg: dict): pass + def _deserialize_storage( + self, config: dict, cached_config: Optional[dict], storage_type: str + ) -> Tuple[str, dict]: + + storage_impl = "" + storage_config = {} + + # Check for new config + if "storage" in config and storage_type in config["storage"]: + + storage_impl = config["storage"][storage_type]["type"] + storage_config = config["storage"][storage_type][storage_impl] + self.logging.info( + "Using user-provided configuration of storage " + f"type: {storage_type} for local containers." + ) + + # Load cached values + elif ( + cached_config is not None + and "resources" in cached_config + and "storage" in cached_config["resources"] + and "object" in cached_config["resources"]["storage"] + ): + storage_impl = cached_config["storage"]["object"]["type"] + storage_config = cached_config["storage"]["object"][storage_impl] + self.logging.info( + f"Using cached configuration of storage type: {storage_type} for local container." + ) + + return storage_impl, storage_config + @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: ret = LocalResources() - # Check for new config - if "storage" in config: - ret._storage = MinioConfig.deserialize(config["storage"]) - ret.logging.info("Using user-provided configuration of storage for local containers.") + + cached_config = cache.get_config("local") + + obj_storage_impl, obj_storage_cfg = ret._deserialize_storage( + config, cached_config, "object" + ) + + if obj_storage_impl == "minio": + ret._object_storage = MinioConfig.deserialize(obj_storage_cfg) + ret.logging.info("Deserializing access data to Minio storage") + elif obj_storage_impl != "": + ret.logging.warning(f"Unknown object storage type: {obj_storage_impl}") + else: + ret.logging.info("No object storage available") + + nosql_storage_impl, nosql_storage_cfg = ret._deserialize_storage( + config, cached_config, "nosql" + ) + + if nosql_storage_impl == "scylladb": + ret._nosql_storage = ScyllaDBConfig.deserialize(nosql_storage_cfg) + ret.logging.info("Deserializing access data to ScylladB NoSQL storage") + elif nosql_storage_impl != "": + ret.logging.warning(f"Unknown NoSQL storage type: {nosql_storage_impl}") + else: + ret.logging.info("No NoSQL storage available") + return ret diff --git a/sebs/local/function.py b/sebs/local/function.py index 1db9d4cb..778d9564 100644 --- a/sebs/local/function.py +++ b/sebs/local/function.py @@ -67,6 +67,14 @@ def __init__( self._measurement_pid = measurement_pid + @property + def container(self) -> docker.models.containers.Container: + return self._instance + + @container.setter + def container(self, instance: docker.models.containers.Container): + self._instance = instance + @property def url(self) -> str: return self._url diff --git a/sebs/local/local.py b/sebs/local/local.py index 5a4eb18f..fa24735f 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -9,12 +9,11 @@ from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.local.resources import LocalSystemResources from sebs.utils import LoggingHandlers from sebs.local.config import LocalConfig -from sebs.local.storage import Minio from sebs.local.function import LocalFunction from sebs.faas.function import Function, FunctionConfig, ExecutionResult, Trigger -from sebs.faas.storage import PersistentStorage from sebs.faas.system import System from sebs.benchmark import Benchmark @@ -67,7 +66,12 @@ def __init__( docker_client: docker.client, logger_handlers: LoggingHandlers, ): - super().__init__(sebs_config, cache_client, docker_client) + super().__init__( + sebs_config, + cache_client, + docker_client, + LocalSystemResources(config, cache_client, docker_client, logger_handlers), + ) self.logging_handlers = logger_handlers self._config = config self._remove_containers = True @@ -77,31 +81,6 @@ def __init__( self.initialize_resources(select_prefix="local") - """ - Create wrapper object for minio storage and fill buckets. - Starts minio as a Docker instance, using always fresh buckets. - - :param benchmark: - :param buckets: number of input and output buckets - :param replace_existing: not used. - :return: Azure storage instance - """ - - def get_storage(self, replace_existing: bool = False) -> PersistentStorage: - if not hasattr(self, "storage"): - - if not self.config.resources.storage_config: - raise RuntimeError( - "The local deployment is missing the configuration of pre-allocated storage!" - ) - self.storage = Minio.deserialize( - self.config.resources.storage_config, self.cache_client, self.config.resources - ) - self.storage.logging_handlers = self.logging_handlers - else: - self.storage.replace_existing = replace_existing - return self.storage - """ Shut down minio storage instance. """ @@ -155,25 +134,38 @@ def package_code( return directory, bytes_size - def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunction": + def _start_container( + self, code_package: Benchmark, func_name: str, func: Optional[LocalFunction] + ) -> LocalFunction: container_name = "{}:run.local.{}.{}".format( self._system_config.docker_repository(), code_package.language_name, code_package.language_version, ) + environment: Dict[str, str] = {} if self.config.resources.storage_config: + environment = { - "MINIO_ADDRESS": self.config.resources.storage_config.address, - "MINIO_ACCESS_KEY": self.config.resources.storage_config.access_key, - "MINIO_SECRET_KEY": self.config.resources.storage_config.secret_key, + **self.config.resources.storage_config.envs(), "CONTAINER_UID": str(os.getuid()), "CONTAINER_GID": str(os.getgid()), "CONTAINER_USER": self._system_config.username( self.name(), code_package.language_name ), } + + if code_package.uses_nosql: + + nosql_storage = self.system_resources.get_nosql_storage() + environment = {**environment, **nosql_storage.envs()} + + for original_name, actual_name in nosql_storage.get_tables( + code_package.benchmark + ).items(): + environment[f"NOSQL_STORAGE_TABLE_{original_name}"] = actual_name + container = self._docker_client.containers.run( image=container_name, command=f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}", @@ -213,16 +205,20 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc ) pid = proc.pid - function_cfg = FunctionConfig.from_benchmark(code_package) - func = LocalFunction( - container, - self.DEFAULT_PORT, - func_name, - code_package.benchmark, - code_package.hash, - function_cfg, - pid, - ) + if func is None: + function_cfg = FunctionConfig.from_benchmark(code_package) + func = LocalFunction( + container, + self.DEFAULT_PORT, + func_name, + code_package.benchmark, + code_package.hash, + function_cfg, + pid, + ) + else: + func.container = container + func._measurement_pid = pid # Wait until server starts max_attempts = 10 @@ -238,20 +234,27 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc if attempts == max_attempts: raise RuntimeError( f"Couldn't start {func_name} function at container " - f"{container.id} , running on {func._url}" + f"{container.id} , running on {func.url}" ) self.logging.info( f"Started {func_name} function at container {container.id} , running on {func._url}" ) + return func + def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunction": + return self._start_container(code_package, func_name, None) + """ - FIXME: restart Docker? + Restart Docker container """ def update_function(self, function: Function, code_package: Benchmark): - pass + func = cast(LocalFunction, function) + func.stop() + self.logging.info("Allocating a new function container with updated code") + self._start_container(code_package, function.name, func) """ For local functions, we don't need to do anything for a cached function. diff --git a/sebs/local/resources.py b/sebs/local/resources.py new file mode 100644 index 00000000..06211ef6 --- /dev/null +++ b/sebs/local/resources.py @@ -0,0 +1,91 @@ +from typing import cast, Optional + +from sebs.cache import Cache +from sebs.local.storage import Minio, ScyllaDB +from sebs.faas.resources import SystemResources +from sebs.faas.storage import PersistentStorage +from sebs.faas.nosql import NoSQLStorage +from sebs.local.config import LocalConfig, LocalResources +from sebs.storage.config import MinioConfig, ScyllaDBConfig +from sebs.utils import LoggingHandlers + +import docker + + +class LocalSystemResources(SystemResources): + def __init__( + self, + config: LocalConfig, + cache_client: Cache, + docker_client: docker.client, + logger_handlers: LoggingHandlers, + ): + super().__init__(config, cache_client, docker_client) + + self._logging_handlers = logger_handlers + self._storage: Optional[PersistentStorage] = None + self._nosql_storage: Optional[NoSQLStorage] = None + + """ + Create wrapper object for minio storage and fill buckets. + Starts minio as a Docker instance, using always fresh buckets. + + :param benchmark: + :param buckets: number of input and output buckets + :param replace_existing: not used. + :return: Azure storage instance + """ + + def get_storage(self, replace_existing: Optional[bool] = None) -> PersistentStorage: + if self._storage is None: + + storage_config = cast(LocalResources, self._config.resources).storage_config + if storage_config is None: + self.logging.error( + "The local deployment is missing the configuration of pre-allocated storage!" + ) + raise RuntimeError("Cannot run local deployment without any object storage") + + if isinstance(storage_config, MinioConfig): + self._storage = Minio.deserialize( + storage_config, + self._cache_client, + self._config.resources, + ) + self._storage.logging_handlers = self._logging_handlers + else: + self.logging.error( + "The local deployment does not support " + f"the object storage config type: {type(storage_config)}!" + ) + raise RuntimeError("Cannot work with the provided object storage!") + + elif replace_existing is not None: + self._storage.replace_existing = replace_existing + return self._storage + + def get_nosql_storage(self) -> NoSQLStorage: + + if self._nosql_storage is None: + + storage_config = cast(LocalResources, self._config.resources).nosql_storage_config + if storage_config is None: + self.logging.error( + "The local deployment is missing the configuration " + "of pre-allocated NoSQL storage!" + ) + raise RuntimeError("Cannot allocate NoSQL storage!") + + if isinstance(storage_config, ScyllaDBConfig): + self._nosql_storage = ScyllaDB.deserialize( + storage_config, self._cache_client, self._config.resources + ) + self._nosql_storage.logging_handlers = self._logging_handlers + else: + self.logging.error( + "The local deployment does not support " + f"the NoSQL storage config type: {type(storage_config)}!" + ) + raise RuntimeError("Cannot work with the provided NoSQL storage!") + + return self._nosql_storage diff --git a/sebs/local/storage.py b/sebs/local/storage.py index 9563deb4..71dd3ce2 100644 --- a/sebs/local/storage.py +++ b/sebs/local/storage.py @@ -1,8 +1,10 @@ import docker +from typing import Optional from sebs.faas.config import Resources from sebs.storage import minio -from sebs.storage.config import MinioConfig +from sebs.storage import scylladb +from sebs.storage.config import MinioConfig, ScyllaDBConfig from sebs.cache import Cache @@ -11,6 +13,10 @@ class Minio(minio.Minio): def deployment_name() -> str: return "local" + @staticmethod + def typename() -> str: + return "Local.Minio" + def __init__( self, docker_client: docker.client, @@ -25,3 +31,30 @@ def deserialize( cached_config: MinioConfig, cache_client: Cache, resources: Resources ) -> "Minio": return super(Minio, Minio)._deserialize(cached_config, cache_client, resources, Minio) + + +class ScyllaDB(scylladb.ScyllaDB): + @staticmethod + def deployment_name() -> str: + return "local" + + @staticmethod + def typename() -> str: + return "Local.ScyllaDB" + + def __init__( + self, + docker_client: docker.client, + cache_client: Cache, + config: ScyllaDBConfig, + res: Optional[Resources], + ): + super().__init__(docker_client, cache_client, config, res) + + @staticmethod + def deserialize( + cached_config: ScyllaDBConfig, cache_client: Cache, resources: Resources + ) -> "ScyllaDB": + return super(ScyllaDB, ScyllaDB)._deserialize( + cached_config, cache_client, resources, ScyllaDB + ) diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 00660de9..f080caea 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -9,6 +9,7 @@ from sebs.cache import Cache from sebs.faas import System, PersistentStorage from sebs.faas.function import Function, ExecutionResult, Trigger +from sebs.faas.nosql import NoSQLStorage from sebs.openwhisk.storage import Minio from sebs.openwhisk.triggers import LibraryTrigger, HTTPTrigger from sebs.utils import DOCKER_DIR, LoggingHandlers, execute @@ -67,6 +68,9 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: self.storage.replace_existing = replace_existing return self.storage + def get_nosql_storage(self) -> NoSQLStorage: + raise NotImplementedError() + def shutdown(self) -> None: if hasattr(self, "storage") and self.config.shutdownStorage: self.storage.stop() diff --git a/sebs/sebs.py b/sebs/sebs.py index 58bc07a9..43e90ef9 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -3,7 +3,6 @@ import docker -import sebs.storage from sebs import types from sebs.local import Local from sebs.cache import Cache @@ -11,7 +10,9 @@ from sebs.benchmark import Benchmark from sebs.faas.system import System as FaaSSystem from sebs.faas.storage import PersistentStorage +from sebs.faas.nosql import NoSQLStorage from sebs.faas.config import Config +from sebs.storage import minio, config, scylladb from sebs.utils import has_platform, LoggingHandlers, LoggingBase from sebs.experiments.config import Config as ExperimentConfig @@ -180,19 +181,28 @@ def get_benchmark( @staticmethod def get_storage_implementation(storage_type: types.Storage) -> Type[PersistentStorage]: - _storage_implementations = {types.Storage.MINIO: sebs.storage.minio.Minio} + _storage_implementations = {types.Storage.MINIO: minio.Minio} + impl = _storage_implementations.get(storage_type) + assert impl + return impl + + @staticmethod + def get_nosql_implementation(storage_type: types.NoSQLStorage) -> Type[NoSQLStorage]: + _storage_implementations = {types.NoSQLStorage.SCYLLADB: scylladb.ScyllaDB} impl = _storage_implementations.get(storage_type) assert impl return impl @staticmethod def get_storage_config_implementation(storage_type: types.Storage): - _storage_implementations = { - types.Storage.MINIO: ( - sebs.storage.config.MinioConfig, - sebs.storage.config.MinioResources, - ) - } + _storage_implementations = {types.Storage.MINIO: config.MinioConfig} + impl = _storage_implementations.get(storage_type) + assert impl + return impl + + @staticmethod + def get_nosql_config_implementation(storage_type: types.NoSQLStorage): + _storage_implementations = {types.NoSQLStorage.SCYLLADB: config.ScyllaDBConfig} impl = _storage_implementations.get(storage_type) assert impl return impl diff --git a/sebs/storage/config.py b/sebs/storage/config.py index 9d097897..1441f209 100644 --- a/sebs/storage/config.py +++ b/sebs/storage/config.py @@ -1,37 +1,25 @@ -from typing import cast, List +from abc import ABC +from abc import abstractmethod +from typing import List from dataclasses import dataclass, field from sebs.cache import Cache -from sebs.faas.config import Resources -class MinioResources(Resources): - def __init__(self): - super().__init__(name="minio") - - @staticmethod - def initialize(res: Resources, dct: dict): - ret = cast(MinioResources, res) - super(MinioResources, MinioResources).initialize(ret, dct) - return ret - +@dataclass +class PersistentStorageConfig(ABC): + @abstractmethod def serialize(self) -> dict: - return super().serialize() + pass - @staticmethod - def deserialize(config: dict) -> "Resources": # type: ignore - - ret = MinioResources() - MinioResources.initialize(ret, {}) - return ret - - def update_cache(self, cache: Cache): - super().update_cache(cache) + @abstractmethod + def envs(self) -> dict: + pass @dataclass -class MinioConfig: +class MinioConfig(PersistentStorageConfig): address: str = "" mapped_port: int = -1 access_key: str = "" @@ -39,6 +27,8 @@ class MinioConfig: instance_id: str = "" output_buckets: List[str] = field(default_factory=list) input_buckets: List[str] = field(default_factory=lambda: []) + version: str = "" + data_volume: str = "" type: str = "minio" def update_cache(self, path: List[str], cache: Cache): @@ -55,9 +45,55 @@ def deserialize(data: dict) -> "MinioConfig": data = {k: v for k, v in data.items() if k in keys} cfg = MinioConfig(**data) - # cfg.resources = cast(MinioResources, MinioResources.deserialize(data["resources"])) return cfg def serialize(self) -> dict: - return self.__dict__ # , "resources": self.resources.serialize()} + return self.__dict__ + + def envs(self) -> dict: + return { + "MINIO_ADDRESS": self.address, + "MINIO_ACCESS_KEY": self.access_key, + "MINIO_SECRET_KEY": self.secret_key, + } + + +@dataclass +class NoSQLStorageConfig(ABC): + @abstractmethod + def serialize(self) -> dict: + pass + + +@dataclass +class ScyllaDBConfig(NoSQLStorageConfig): + address: str = "" + mapped_port: int = -1 + alternator_port: int = 8000 + access_key: str = "None" + secret_key: str = "None" + instance_id: str = "" + region: str = "None" + cpus: int = -1 + memory: int = -1 + version: str = "" + data_volume: str = "" + type: str = "nosql" + + def update_cache(self, path: List[str], cache: Cache): + + for key in ScyllaDBConfig.__dataclass_fields__.keys(): + cache.update_config(val=getattr(self, key), keys=[*path, key]) + + @staticmethod + def deserialize(data: dict) -> "ScyllaDBConfig": + keys = list(ScyllaDBConfig.__dataclass_fields__.keys()) + data = {k: v for k, v in data.items() if k in keys} + + cfg = ScyllaDBConfig(**data) + + return cfg + + def serialize(self) -> dict: + return self.__dict__ diff --git a/sebs/storage/minio.py b/sebs/storage/minio.py index 1c544a23..75640af3 100644 --- a/sebs/storage/minio.py +++ b/sebs/storage/minio.py @@ -14,6 +14,7 @@ from sebs.faas.config import Resources from sebs.faas.storage import PersistentStorage from sebs.storage.config import MinioConfig +from sebs.utils import project_absolute_path class Minio(PersistentStorage): @@ -44,6 +45,10 @@ def __init__( def config(self) -> MinioConfig: return self._cfg + @config.setter + def config(self, config: MinioConfig): + self._cfg = config + @staticmethod def _define_http_client(): """ @@ -63,17 +68,31 @@ def _define_http_client(): ), ) - def start(self, port: int = 9000): + def start(self): + + if self._cfg.data_volume == "": + minio_volume = os.path.join(project_absolute_path(), "minio-volume") + else: + minio_volume = self._cfg.data_volume + minio_volume = os.path.abspath(minio_volume) + + os.makedirs(minio_volume, exist_ok=True) + volumes = { + minio_volume: { + "bind": "/data", + "mode": "rw", + } + } - self._cfg.mapped_port = port self._cfg.access_key = secrets.token_urlsafe(32) self._cfg.secret_key = secrets.token_hex(32) self._cfg.address = "" self.logging.info("Minio storage ACCESS_KEY={}".format(self._cfg.access_key)) self.logging.info("Minio storage SECRET_KEY={}".format(self._cfg.secret_key)) try: + self.logging.info(f"Starting storage Minio on port {self._cfg.mapped_port}") self._storage_container = self._docker_client.containers.run( - "minio/minio:latest", + f"minio/minio:{self._cfg.version}", command="server /data", network_mode="bridge", ports={"9000": str(self._cfg.mapped_port)}, @@ -81,6 +100,7 @@ def start(self, port: int = 9000): "MINIO_ACCESS_KEY": self._cfg.access_key, "MINIO_SECRET_KEY": self._cfg.secret_key, }, + volumes=volumes, remove=True, stdout=True, stderr=True, @@ -239,6 +259,14 @@ def serialize(self) -> dict: "type": StorageTypes.MINIO, } + """ + This implementation supports overriding this class. + The main Minio class is used to start/stop deployments. + + When overriding the implementation in Local/OpenWhisk/..., + we call the _deserialize and provide an alternative implementation. + """ + T = TypeVar("T", bound="Minio") @staticmethod diff --git a/sebs/storage/scylladb.py b/sebs/storage/scylladb.py new file mode 100644 index 00000000..85cd47a0 --- /dev/null +++ b/sebs/storage/scylladb.py @@ -0,0 +1,318 @@ +import json +import os +import platform +import time +from collections import defaultdict +from typing import Dict, Optional, Tuple, Type, TypeVar + +from sebs.cache import Cache +from sebs.faas.config import Resources +from sebs.faas.nosql import NoSQLStorage +from sebs.types import NoSQLStorage as StorageType +from sebs.storage.config import ScyllaDBConfig +from sebs.utils import project_absolute_path + +import boto3 +from boto3.dynamodb.types import TypeSerializer +import docker + + +class ScyllaDB(NoSQLStorage): + @staticmethod + def typename() -> str: + return f"{ScyllaDB.deployment_name()}.ScyllaDB" + + @staticmethod + def deployment_name() -> str: + return "scylladb" + + @property + def config(self) -> ScyllaDBConfig: + return self._cfg + + # the location does not matter + SCYLLADB_REGION = "None" + + def __init__( + self, + docker_client: docker.client, + cache_client: Cache, + config: ScyllaDBConfig, + resources: Optional[Resources] = None, + ): + + super().__init__(self.SCYLLADB_REGION, cache_client, resources) # type: ignore + self._docker_client = docker_client + self._storage_container: Optional[docker.container] = None + self._cfg = config + + # Map benchmark -> orig_name -> table_name + self._tables: Dict[str, Dict[str, str]] = defaultdict(dict) + self._serializer = TypeSerializer() + + if config.address != "": + self.client = boto3.client( + "dynamodb", + region_name="None", + aws_access_key_id="None", + aws_secret_access_key="None", + endpoint_url=f"http://{config.address}", + ) + + def start(self): + + if self._cfg.data_volume == "": + scylladb_volume = os.path.join(project_absolute_path(), "scylladb-volume") + else: + scylladb_volume = self._cfg.data_volume + scylladb_volume = os.path.abspath(scylladb_volume) + + os.makedirs(scylladb_volume, exist_ok=True) + volumes = { + scylladb_volume: { + "bind": "/var/lib/scylla/", + "mode": "rw", + } + } + + try: + + scylladb_args = "" + scylladb_args += f"--smp {self._cfg.cpus} " + scylladb_args += f"--memory {self._cfg.memory}M " + scylladb_args += "--overprovisioned 1 " + scylladb_args += "--alternator-port 8000 " + scylladb_args += "--alternator-write-isolation=only_rmw_uses_lwt " + + self.logging.info("Starting ScyllaDB storage") + self._storage_container = self._docker_client.containers.run( + f"scylladb/scylla:{self._cfg.version}", + command=scylladb_args, + name="some-scylla", + hostname="some-scylla", + network_mode="bridge", + volumes=volumes, + ports={"8000": str(self._cfg.mapped_port)}, + remove=True, + stdout=True, + stderr=True, + detach=True, + ) + self._cfg.instance_id = self._storage_container.id + + # Wait until it boots up + attempts = 0 + max_attempts = 30 + while attempts < max_attempts: + + exit_code, out = self._storage_container.exec_run("nodetool status") + + if exit_code == 0: + self.logging.info("Started ScyllaDB succesfully!") + break + + time.sleep(1.0) + attempts += 1 + + if attempts == max_attempts: + self.logging.error("Failed to launch ScyllaBD!") + self.logging.error(f"Last result of nodetool status: {out}") + raise RuntimeError("Failed to launch ScyllaBD!") + + self.configure_connection() + except docker.errors.APIError as e: + self.logging.error("Starting ScyllaDB storage failed! Reason: {}".format(e)) + raise RuntimeError("Starting ScyllaDB storage unsuccesful") + except Exception as e: + self.logging.error("Starting ScyllaDB storage failed! Unknown error: {}".format(e)) + raise RuntimeError("Starting ScyllaDB storage unsuccesful") + + # FIXME: refactor this - duplicated code from minio + def configure_connection(self): + # who knows why? otherwise attributes are not loaded + if self._cfg.address == "": + + if self._storage_container is None: + raise RuntimeError( + "ScyllaDB container is not available! Make sure that you deployed " + "the ScyllaDB storage and provided configuration!" + ) + + self._storage_container.reload() + + # Check if the system is Linux and that it's not WSL + if platform.system() == "Linux" and "microsoft" not in platform.release().lower(): + networks = self._storage_container.attrs["NetworkSettings"]["Networks"] + self._cfg.address = "{IPAddress}:{Port}".format( + IPAddress=networks["bridge"]["IPAddress"], Port=self._cfg.alternator_port + ) + else: + # System is either WSL, Windows, or Mac + self._cfg.address = f"localhost:{self._cfg.mapped_port}" + + if not self._cfg.address: + self.logging.error( + f"Couldn't read the IP address of container from attributes " + f"{json.dumps(self._instance.attrs, indent=2)}" + ) + raise RuntimeError( + f"Incorrect detection of IP address for container with id {self._instance_id}" + ) + self.logging.info("Starting ScyllaDB instance at {}".format(self._cfg.address)) + + def stop(self): + if self._storage_container is not None: + self.logging.info(f"Stopping ScyllaDB container at {self._cfg.address}.") + self._storage_container.stop() + self.logging.info(f"Stopped ScyllaDB container at {self._cfg.address}.") + else: + self.logging.error("Stopping ScyllaDB was not succesful, storage container not known!") + + def envs(self) -> dict: + return {"NOSQL_STORAGE_TYPE": "scylladb", "NOSQL_STORAGE_ENDPOINT": self._cfg.address} + + def serialize(self) -> Tuple[StorageType, dict]: + return StorageType.SCYLLADB, self._cfg.serialize() + + """ + This implementation supports overriding this class. + The main ScyllaDB class is used to start/stop deployments. + + When overriding the implementation in Local/OpenWhisk/..., + we call the _deserialize and provide an alternative implementation. + """ + + T = TypeVar("T", bound="ScyllaDB") + + @staticmethod + def _deserialize( + cached_config: ScyllaDBConfig, cache_client: Cache, resources: Resources, obj_type: Type[T] + ) -> T: + docker_client = docker.from_env() + obj = obj_type(docker_client, cache_client, cached_config, resources) + + if cached_config.instance_id: + instance_id = cached_config.instance_id + try: + obj._storage_container = docker_client.containers.get(instance_id) + except docker.errors.NotFound: + raise RuntimeError(f"Storage container {instance_id} does not exist!") + else: + obj._storage_container = None + return obj + + @staticmethod + def deserialize( + cached_config: ScyllaDBConfig, cache_client: Cache, resources: Resources + ) -> "ScyllaDB": + return ScyllaDB._deserialize(cached_config, cache_client, resources, ScyllaDB) + + def retrieve_cache(self, benchmark: str) -> bool: + + if benchmark in self._tables: + return True + + cached_storage = self.cache_client.get_nosql_config(self.deployment_name(), benchmark) + if cached_storage is not None: + self._tables[benchmark] = cached_storage["tables"] + return True + + return False + + def update_cache(self, benchmark: str): + + self._cache_client.update_nosql( + self.deployment_name(), + benchmark, + { + "tables": self._tables[benchmark], + }, + ) + + def get_tables(self, benchmark: str) -> Dict[str, str]: + return self._tables[benchmark] + + def _get_table_name(self, benchmark: str, table: str) -> Optional[str]: + + if benchmark not in self._tables: + return None + + if table not in self._tables[benchmark]: + return None + + return self._tables[benchmark][table] + + def writer_func( + self, + benchmark: str, + table: str, + data: dict, + primary_key: Tuple[str, str], + secondary_key: Optional[Tuple[str, str]] = None, + ): + + table_name = self._get_table_name(benchmark, table) + assert table_name is not None + + for key in (primary_key, secondary_key): + if key is not None: + data[key[0]] = key[1] + + serialized_data = {k: self._serializer.serialize(v) for k, v in data.items()} + self.client.put_item(TableName=table_name, Item=serialized_data) + + """ + AWS: create a DynamoDB Table + + In contrast to the hierarchy of database objects in Azure (account -> database -> container) + and GCP (database per benchmark), we need to create unique table names here. + """ + + def create_table( + self, benchmark: str, name: str, primary_key: str, secondary_key: Optional[str] = None + ) -> str: + + table_name = f"sebs-benchmarks-{self._cloud_resources.resources_id}-{benchmark}-{name}" + + try: + + definitions = [{"AttributeName": primary_key, "AttributeType": "S"}] + key_schema = [{"AttributeName": primary_key, "KeyType": "HASH"}] + + if secondary_key is not None: + definitions.append({"AttributeName": secondary_key, "AttributeType": "S"}) + key_schema.append({"AttributeName": secondary_key, "KeyType": "RANGE"}) + + ret = self.client.create_table( + TableName=table_name, + BillingMode="PAY_PER_REQUEST", + AttributeDefinitions=definitions, # type: ignore + KeySchema=key_schema, # type: ignore + ) + + if ret["TableDescription"]["TableStatus"] == "CREATING": + self.logging.info(f"Waiting for creation of DynamoDB table {name}") + waiter = self.client.get_waiter("table_exists") + waiter.wait(TableName=name) + + self.logging.info(f"Created DynamoDB table {name} for benchmark {benchmark}") + self._tables[benchmark][name] = table_name + + return ret["TableDescription"]["TableName"] + + except self.client.exceptions.ResourceInUseException as e: + + if "already exists" in e.response["Error"]["Message"]: + self.logging.info( + f"Using existing DynamoDB table {table_name} for benchmark {benchmark}" + ) + self._tables[benchmark][name] = table_name + return name + + raise RuntimeError(f"Creating DynamoDB failed, unknown reason! Error: {e}") + + def clear_table(self, name: str) -> str: + raise NotImplementedError() + + def remove_table(self, name: str) -> str: + raise NotImplementedError() diff --git a/sebs/types.py b/sebs/types.py index 2f26117e..b87516fb 100644 --- a/sebs/types.py +++ b/sebs/types.py @@ -1,6 +1,11 @@ from enum import Enum +class BenchmarkModule(str, Enum): + STORAGE = "storage" + NOSQL = "nosql" + + class Platforms(str, Enum): AWS = "aws" AZURE = "azure" @@ -14,3 +19,10 @@ class Storage(str, Enum): AZURE_BLOB_STORAGE = "azure-blob-storage" GCP_STORAGE = "google-cloud-storage" MINIO = "minio" + + +class NoSQLStorage(str, Enum): + AWS_DYNAMODB = "aws-dynamodb" + AZURE_COSMOSDB = "azure-cosmosdb" + GCP_DATASTORE = "google-cloud-datastore" + SCYLLADB = "scylladb" diff --git a/sebs/utils.py b/sebs/utils.py index 3df8ffc9..6092ee60 100644 --- a/sebs/utils.py +++ b/sebs/utils.py @@ -60,6 +60,14 @@ def update_nested_dict(cfg: dict, keys: List[str], value: Optional[str]): cfg[keys[-1]] = value +def append_nested_dict(cfg: dict, keys: List[str], value: Optional[dict]): + if value: + # make sure parent keys exist + for key in keys[:-1]: + cfg = cfg.setdefault(key, {}) + cfg[keys[-1]] = {**cfg[keys[-1]], **value} + + def find(name, path): for root, dirs, files in os.walk(path): if name in dirs: