From 5b4dbf3f9958ced15131b92076b4f5668cfec43a Mon Sep 17 00:00:00 2001 From: Lei Wang Date: Mon, 1 Jul 2024 17:32:51 +0800 Subject: [PATCH] Add gart cli Signed-off-by: Lei Wang --- charts/gart/templates/configmap.yaml | 2 +- .../templates/gie_frontend/deployment.yaml | 2 + charts/gart/templates/gie_frontend/svc.yaml | 4 +- charts/gart/templates/writer/statefulset.yaml | 2 +- charts/gart/templates/writer/svc.yaml | 2 +- charts/gart/values.yaml | 9 +- scripts/controller.py | 58 +++++- scripts/gart_cli.py | 184 ++++++++++++++++++ scripts/k8s_deployment.py | 0 9 files changed, 251 insertions(+), 12 deletions(-) create mode 100755 scripts/gart_cli.py mode change 100644 => 100755 scripts/k8s_deployment.py diff --git a/charts/gart/templates/configmap.yaml b/charts/gart/templates/configmap.yaml index 9ecb7b2..28cb65c 100644 --- a/charts/gart/templates/configmap.yaml +++ b/charts/gart/templates/configmap.yaml @@ -57,7 +57,7 @@ data: {{- else }} SECONDS_PER_EPOCH: "60" {{- end }} - {{- if not .Values.analyzer.enabled }} + {{- if not .Values.dataconfig.useGAE }} GIE_EXECUTOR_POD_BASE_NAME: {{ include "gart.writer.fullname" . | quote }} GIE_EXECUTOR_POD_SERVICE_NAME: {{ include "gart.fullname" . }}-gie-executor-service GIE_EXECUTOR_POD_SERVICE_PORT: {{ .Values.gie_executor.HTTP_SERVICE_PORT | quote }} diff --git a/charts/gart/templates/gie_frontend/deployment.yaml b/charts/gart/templates/gie_frontend/deployment.yaml index 20faf7a..1031e7d 100644 --- a/charts/gart/templates/gie_frontend/deployment.yaml +++ b/charts/gart/templates/gie_frontend/deployment.yaml @@ -1,3 +1,4 @@ +{{- if not .Values.dataconfig.useGAE }} {{- $etcd_service_name := .Values.etcd.fullnameOverride }} {{- $etcd_service_port := int .Values.etcd.containerPorts.client }} {{- $etcd_service := printf "%s:%d" $etcd_service_name $etcd_service_port }} @@ -40,3 +41,4 @@ spec: +{{- end }} \ No newline at end of file diff --git a/charts/gart/templates/gie_frontend/svc.yaml b/charts/gart/templates/gie_frontend/svc.yaml index df6c41e..689c6ba 100644 --- a/charts/gart/templates/gie_frontend/svc.yaml +++ b/charts/gart/templates/gie_frontend/svc.yaml @@ -1,3 +1,4 @@ +{{- if not .Values.dataconfig.useGAE }} apiVersion: v1 kind: Service metadata: @@ -12,4 +13,5 @@ spec: ports: - protocol: TCP port: {{ .Values.gie_frontend.gremlinPort }} - targetPort: gremlin \ No newline at end of file + targetPort: gremlin +{{- end }} \ No newline at end of file diff --git a/charts/gart/templates/writer/statefulset.yaml b/charts/gart/templates/writer/statefulset.yaml index d7d9204..47cfbd8 100644 --- a/charts/gart/templates/writer/statefulset.yaml +++ b/charts/gart/templates/writer/statefulset.yaml @@ -61,7 +61,7 @@ spec: volumeMounts: - name: shared-socket mountPath: /tmp/shared - {{- if .Values.analyzer.enabled }} + {{- if .Values.dataconfig.useGAE }} - name: analyzer image: {{ include "gart.analyzer.image" . }} imagePullPolicy: {{ .Values.analyzer.image.pullPolicy | quote }} diff --git a/charts/gart/templates/writer/svc.yaml b/charts/gart/templates/writer/svc.yaml index cb05015..44e3f52 100644 --- a/charts/gart/templates/writer/svc.yaml +++ b/charts/gart/templates/writer/svc.yaml @@ -1,4 +1,4 @@ -{{- if not .Values.analyzer.enabled }} +{{- if not .Values.dataconfig.useGAE }} apiVersion: v1 kind: Service metadata: diff --git a/charts/gart/values.yaml b/charts/gart/values.yaml index aa9017a..8eb974d 100644 --- a/charts/gart/values.yaml +++ b/charts/gart/values.yaml @@ -186,7 +186,6 @@ writer: # analyzer config analyzer: - enabled: false image: repository: gart-analyzer tag: latest @@ -237,11 +236,17 @@ dataconfig: dbPassword: "123456" dbType: "postgresql" dbName: "ldbc" + #optional v6dSocket: "/tmp/shared/v6d.sock" + #optional v6dSize: "750G" + #optional etcdPrefix: "gart_meta_" enableBulkload: 1 - # if you want to need an epoch after a give time interval, set useSecondsPerEpoch to 1 + # if you want to need an epoch after a give time interval, set useSecondsPerEpoch to 1. + # If not set, use logsPerEpoch # useSecondsPerEpoch: 1 logsPerEpoch: 10000 # secondsPerEpoch: 60 + # if you run GAE task, set useGAE to 1, otherwise run GIE task. Default run GIE task + useGAE: 0 diff --git a/scripts/controller.py b/scripts/controller.py index d7764d9..ed14426 100755 --- a/scripts/controller.py +++ b/scripts/controller.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from flask import Flask, request +from flask import Flask, request, jsonify import subprocess import os from kubernetes import client, config @@ -18,6 +18,34 @@ previous_read_epoch = None +@app.route("/submit-config", methods=["POST"]) +def submit_config(): + if "file" not in request.files: + return jsonify({"error": "No file part in the request"}), 400 + file = request.files["file"] + if file.filename == "": + return jsonify({"error": "No selected file"}), 400 + + try: + content = file.read() + etcd_server = os.getenv("ETCD_SERVICE", "etcd") + if not etcd_server.startswith("http://"): + etcd_server = f"http://{etcd_server}" + etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_") + etcd_host = etcd_server.split("://")[1].split(":")[0] + etcd_port = etcd_server.split(":")[2] + etcd_client = etcd3.client(host=etcd_host, port=etcd_port) + while True: + try: + etcd_client.put(etcd_prefix + "gart_rg_mapping_yaml", content) + break + except Exception as e: + time.sleep(5) + return "Config submitted", 200 + except Exception as e: + return jsonify({"error": str(e)}), 400 + + @app.route("/control/pause", methods=["POST"]) def pause(): subprocess.run( @@ -72,8 +100,21 @@ def get_read_epoch_by_timestamp(): @app.route("/run-gae-task", methods=["POST"]) def run_gae_task(): command = "" - for key, value in request.form.items(): - command += f"--{key} {value} " + data = request.json + algorithm_name = data.get("algorithm_name") + graph_version = data.get("graph_version") + latest_epoch = get_latest_read_epoch() + if int(graph_version) > latest_epoch: + return "Invalid read epoch", 400 + if latest_epoch == 2**64 - 1: + return "No available read epoch", 400 + command += f"--app_name {algorithm_name} " + command += f"--read_epoch {graph_version} " + for key, value in data.items(): + if key not in ["algorithm_name", "graph_version"]: + command += f"--{key} {value} " + # for key, value in request.form.items(): + # command += f"--{key} {value} " etcd_server = os.getenv("ETCD_SERVICE", "etcd") if not etcd_server.startswith("http://"): etcd_server = f"http://{etcd_server}" @@ -91,6 +132,8 @@ def change_read_epoch(): latest_epoch = get_latest_read_epoch() if int(read_epoch) > latest_epoch: return "Invalid read epoch", 400 + if latest_epoch == 2**64 - 1: + return "No available read epoch", 400 global previous_read_epoch if previous_read_epoch is None or previous_read_epoch != read_epoch: previous_read_epoch = read_epoch @@ -287,9 +330,12 @@ def get_latest_read_epoch(): latest_epoch = 2**64 - 1 for idx in range(int(num_fragment)): etcd_key = etcd_prefix + "gart_latest_epoch_p" + str(idx) - etcd_value, _ = etcd_client.get(etcd_key) - if latest_epoch > int(etcd_value): - latest_epoch = int(etcd_value) + try: + etcd_value, _ = etcd_client.get(etcd_key) + if latest_epoch > int(etcd_value): + latest_epoch = int(etcd_value) + except Exception as e: + print(f"Error: {e}") return latest_epoch diff --git a/scripts/gart_cli.py b/scripts/gart_cli.py new file mode 100755 index 0000000..7098ab4 --- /dev/null +++ b/scripts/gart_cli.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 + +import click +import os +import json +import socket +import requests +from urllib.parse import urlparse + +CONFIG_FILE_PATH = "/tmp/gart_cli_config.json" + + +def save_config(config): + with open(CONFIG_FILE_PATH, "w") as f: + json.dump(config, f) + + +def load_config(): + if os.path.exists(CONFIG_FILE_PATH): + with open(CONFIG_FILE_PATH, "r") as f: + try: + return json.load(f) + except json.JSONDecodeError: + return {} + return {} + + +@click.group() +@click.pass_context +def cli(ctx): + """System Manager CLI""" + # Load the configuration file and store it in the context + ctx.ensure_object(dict) + ctx.obj = load_config() + + +@cli.command() +@click.pass_context +@click.argument("endpoint", required=True, type=str) +def connect(ctx, endpoint): + """Connect to a new service endpoint.""" + # Save the endpoint to the configuration file + if not endpoint.startswith(("http://", "https://")): + endpoint = "http://" + endpoint + ctx.obj["endpoint"] = endpoint + save_config(ctx.obj) + # check if the endpoint is reachable + parsed_url = urlparse(endpoint) + host = parsed_url.netloc.split(":")[0] + port = parsed_url.port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.settimeout(1) + s.connect((host, port)) + click.echo(f"Connected to {endpoint}") + s.close() + except socket.error as e: + click.echo(f"Failed to connect to {endpoint}: {e}") + return + + +@cli.command() +@click.pass_context +def resume_data_loading(ctx): + """Resume data loading process.""" + endpoint = ctx.obj.get("endpoint") + if not endpoint: + click.echo('Please connect to an endpoint first using the "connect" command.') + return + + response = requests.post(f"{endpoint}/control/resume") + click.echo(f"Resumed data loading: {response.text}") + + +@cli.command() +@click.pass_context +def pause_data_loading(ctx): + """Pause data loading process.""" + endpoint = ctx.obj.get("endpoint") + if not endpoint: + click.echo('Please connect to an endpoint first using the "connect" command.') + return + + response = requests.post(f"{endpoint}/control/pause") + click.echo(f"Paused data loading: {response.text}") + + +@cli.command() +@click.pass_context +def get_all_available_versions(ctx): + """Get all available versions of the graph at the moment.""" + endpoint = ctx.obj.get("endpoint") + if not endpoint: + click.echo('Please connect to an endpoint first using the "connect" command.') + return + + response = requests.post(f"{endpoint}/get-all-available-read-epochs") + click.echo(f"Available versions: {response.text}") + + +@cli.command() +@click.pass_context +@click.argument("timestamp", required=True, type=str) +def get_version_by_timestamp(ctx, timestamp): + """Get the version of the graph at the given timestamp.""" + endpoint = ctx.obj.get("endpoint") + if not endpoint: + click.echo('Please connect to an endpoint first using the "connect" command.') + return + response = requests.post( + f"{endpoint}/get-read-epoch-by-timestamp", data={"timestamp": timestamp} + ) + click.echo(f"Version at {timestamp}: {response.text}") + + +@cli.command() +@click.pass_context +@click.argument("config_path", type=click.Path(exists=True)) +def submit_config(ctx, config_path): + """Submit a new configuration file.""" + endpoint = ctx.obj.get("endpoint") + if not endpoint: + click.echo('Please connect to an endpoint first using the "connect" command.') + return + + with open(config_path, "rb") as file: + files = {"file": (config_path, file)} + try: + response = requests.post(f"{endpoint}/submit-config", files=files) + response.raise_for_status() + click.echo(f"Success: Server responded with {response.status_code} status") + except requests.exceptions.HTTPError as e: + click.echo(f"Failed to submit the configuration file: {e}") + except requests.exceptions.RequestException as e: + click.echo(f"Failed to submit the configuration file: {e}") + except Exception as e: + click.echo(f"Failed to submit the configuration file: {e}") + + +@cli.command() +@click.pass_context +@click.argument("algorithm_name") +@click.argument("graph_version", type=str, required=True) +@click.option("--arg", multiple=True, type=(str, str)) +def submit_gae_task(ctx, algorithm_name, graph_version, arg): + """Submit a new GAE task.""" + endpoint = ctx.obj.get("endpoint") + if not endpoint: + click.echo('Please connect to an endpoint first using the "connect" command.') + return + + args_dict = dict(arg) + args_dict["algorithm_name"] = algorithm_name + args_dict["graph_version"] = graph_version + + response = requests.post(f"{endpoint}/run-gae-task", json=args_dict) + + if response.status_code == 200: + click.echo("Algorithm executed successfully!") + click.echo(response.text) + else: + click.echo("Failed to execute algorithm!") + click.echo(f"Status code: {response.status_code}") + click.echo(response.text) + + +@cli.command() +@click.pass_context +@click.argument("graph_version", type=int, required=True) +def change_graph_version_gie(ctx, graph_version): + """Change the graph version to the given version for GIE.""" + endpoint = ctx.obj.get("endpoint") + if not endpoint: + click.echo('Please connect to an endpoint first using the "connect" command.') + return + + response = requests.post( + f"{endpoint}/change-read-epoch", data={"read_epoch": graph_version} + ) + click.echo(f"Changed graph version to {graph_version}: {response.text}") + + +if __name__ == "__main__": + cli(obj={}) diff --git a/scripts/k8s_deployment.py b/scripts/k8s_deployment.py old mode 100644 new mode 100755