From f6abea2975d69e74fc32c808a50245a68fb56e5d Mon Sep 17 00:00:00 2001 From: Dongze Li Date: Wed, 27 Dec 2023 18:05:43 +0800 Subject: [PATCH] update --- .../servicer/flex/interactive/hiactor.py | 142 ++++ .../flex/interactive/job_scheduler.py | 77 ++ .../gscoordinator/servicer/flex/job.py | 66 +- .../{ => servicer/flex}/scheduler.py | 4 +- .../gscoordinator/servicer/flex/service.py | 81 ++- .../{ => servicer/flex}/stoppable_thread.py | 0 proto/coordinator_service.proto | 10 +- proto/flex.proto | 58 +- python/graphscope/gsctl/client/rpc.py | 31 +- .../graphscope/gsctl/commands/interactive.py | 665 ++++++++++++++++++ python/graphscope/gsctl/utils.py | 28 +- 11 files changed, 1051 insertions(+), 111 deletions(-) create mode 100644 coordinator/gscoordinator/servicer/flex/interactive/job_scheduler.py rename coordinator/gscoordinator/{ => servicer/flex}/scheduler.py (98%) rename coordinator/gscoordinator/{ => servicer/flex}/stoppable_thread.py (100%) create mode 100644 python/graphscope/gsctl/commands/interactive.py diff --git a/coordinator/gscoordinator/servicer/flex/interactive/hiactor.py b/coordinator/gscoordinator/servicer/flex/interactive/hiactor.py index b6a6699cf1ed..29f8e417168b 100644 --- a/coordinator/gscoordinator/servicer/flex/interactive/hiactor.py +++ b/coordinator/gscoordinator/servicer/flex/interactive/hiactor.py @@ -17,17 +17,44 @@ # import os +import itertools import logging import hiactor_client +from google.protobuf.json_format import MessageToDict + from graphscope.config import Config +from graphscope.proto import flex_pb2 + +from gscoordinator.servicer.flex.interactive.job_scheduler import ( + DataloadingJobScheduler, +) +from gscoordinator.servicer.flex.job import JobStatus + +from gscoordinator.utils import WORKSPACE +from gscoordinator.utils import delegate_command_to_pod +from gscoordinator.utils import run_kube_cp_command __all__ = ["init_interactive_service"] logger = logging.getLogger("graphscope") +# There are two workspaces for FLEX Interactive, one residing on "coordinator" node +# and the other on the "interactive" node. These two workspaces are equivalent when +# running mode is "hosts". Hence, INTERACTIVE_WORKSPACE is only effective in +# Kubernetes (k8s) environment. +INTERACTIVE_WORKSPACE = ( + os.environ["INTERACTIVE_WORKSPACE"] + if "INTERACTIVE_WORKSPACE" in os.environ + else "/tmp" +) + + +INTERACTIVE_CONTAINER_NAME = "interactive" + + class Hiactor(object): """Hiactor module used to interact with hiactor engine""" @@ -35,6 +62,12 @@ def __init__(self, config: Config): self._config = config # hiactor admin service host self._hiactor_host = self._get_hiactor_service_endpoints() + # workspace + self._workspace = os.path.join(WORKSPACE, "interactive") + os.makedirs(self._workspace, exist_ok=True) + # job status + self._job_status = {} + # check heartbeat to interactive engine logger.info("Connect to hiactor service at %s", self._hiactor_host) def _get_hiactor_service_endpoints(self): @@ -45,6 +78,52 @@ def _get_hiactor_service_endpoints(self): ) return endpoint + @property + def hiactor_host(self): + return self._hiactor_host + + @property + def job_status(self): + return self._job_status + + def register_job_status(self, jobid: str, status: JobStatus): + self._job_status[jobid] = status + + def write_and_distribute_file( + self, graph_name: str, basename: str, raw_data: bytes + ): + # //raw_data/ + filedir = os.path.join(self._workspace, "raw_data", graph_name) + os.makedirs(filedir, exist_ok=True) + # //basename + filepath = os.path.join(filedir, basename) + # dump raw data to file + with open(filepath, "wb") as f: + f.write(raw_data) + # distribute + target_file = filepath + if self._config.launcher_type == "k8s": + # filepath is different in interactive pod + target_file = os.path.join( + INTERACTIVE_WORKSPACE, "raw_data", graph_name, basename + ) + for pod in []: + container = INTERACTIVE_CONTAINER_NAME + cmd = f"mkdir -p {os.path.dirname(target_file)}" + logger.debug(delegate_command_to_pod(cmd, pod, container)) + logger.debug( + run_kube_cp_command(tmp_file, target_file, pod, container, True) + ) + return target_file + + def create_graph(self, graph_def_dict): + with hiactor_client.ApiClient( + hiactor_client.Configuration(self._hiactor_host) + ) as api_client: + api_instance = hiactor_client.GraphApi(api_client) + graph = hiactor_client.Graph.from_dict(graph_def_dict) + return api_instance.create_graph(graph) + def list_graph(self): with hiactor_client.ApiClient( hiactor_client.Configuration(self._hiactor_host) @@ -52,6 +131,69 @@ def list_graph(self): api_instance = hiactor_client.GraphApi(api_client) return api_instance.list_graphs() + def delete_graph(self, graph_name: str): + with hiactor_client.ApiClient( + hiactor_client.Configuration(self._hiactor_host) + ) as api_client: + api_instance = hiactor_client.GraphApi(api_client) + return api_instance.delete_graph(graph_name) + + def create_job( + self, + job_type: str, + schedule_proto: flex_pb2.Schedule, + description_proto: flex_pb2.JobDescription, + ): + if job_type != "dataloading": + raise RuntimeError( + "Job type {0} is not supported in interacive.".format(job_type) + ) + + # write raw data to file and distribute to interactive workspace + schema_mapping = description_proto.schema_mapping + for mapping in itertools.chain( + schema_mapping.vertex_mappings, schema_mapping.edge_mappings + ): + for index, location in enumerate(mapping.inputs): + if location.startswith("@"): + # write raw data and distribute file to interactive workspace + new_location = self.write_and_distribute_file( + schema_mapping.graph, + os.path.basename(location), + mapping.raw_data[index], + ) + # clear the raw_data + mapping.raw_data[index] = bytes() + # update the location + mapping.inputs[index] = new_location + # schedule + schedule = MessageToDict( + schedule_proto, + preserving_proto_field_name=True, + including_default_value_fields=True, + ) + # job description + description = MessageToDict( + description_proto, + preserving_proto_field_name=True, + including_default_value_fields=True, + ) + # submit + if schedule["run_now"]: + at_time = "now" + repeat = "null" + else: + at_time = schedule["at_time"] + repeat = schedule["repeat"] + + scheduler = DataloadingJobScheduler( + at_time=at_time, + repeat=repeat, + description=description, + servicer=self, + ) + scheduler.start() + def init_interactive_service(config: Config): return Hiactor(config) diff --git a/coordinator/gscoordinator/servicer/flex/interactive/job_scheduler.py b/coordinator/gscoordinator/servicer/flex/interactive/job_scheduler.py new file mode 100644 index 000000000000..07fa7baa2d9f --- /dev/null +++ b/coordinator/gscoordinator/servicer/flex/interactive/job_scheduler.py @@ -0,0 +1,77 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging + +import hiactor_client + +from gscoordinator.servicer.flex.job import JobType +from gscoordinator.servicer.flex.job import JobStatus +from gscoordinator.servicer.flex.scheduler import Scheduler + +logger = logging.getLogger("graphscope") + + +class DataloadingJobScheduler(Scheduler): + """Class for scheduling the dataloading process of interactive graphs""" + + def __init__(self, at_time, repeat, description, servicer): + super().__init__(at_time, repeat) + + self._description = description + self._servicer = servicer + + self._tags = [JobType.GRAPH_IMPORT] + + def run(self): + """This function needs to handle exception by itself""" + + schema_mapping = self._description["schema_mapping"] + graph_name = schema_mapping["graph"] + + detail = {"graph name": graph_name} + status = JobStatus( + jobid=self.jobid, + type=JobType.GRAPH_IMPORT, + start_time=self.last_run, + detail=detail, + ) + + # register status to servicer + self._servicer.register_job_status(self.jobid, status) + + with hiactor_client.ApiClient( + hiactor_client.Configuration(self._servicer.hiactor_host) + ) as api_client: + # create an instance of the API class + api_instance = hiactor_client.DataloadingApi(api_client) + + try: + api_response = api_instance.create_dataloading_job( + graph_name, + hiactor_client.SchemaMapping.from_dict(schema_mapping), + ) + except Exception as e: + logger.warning( + "Failed to create dataloading job on graph %s: %s", + graph_name, + str(e), + ) + status.set_failed(message=str(e)) + else: + status.set_success(message=api_response.message) diff --git a/coordinator/gscoordinator/servicer/flex/job.py b/coordinator/gscoordinator/servicer/flex/job.py index 0281f5f7840f..20f5fc7d5055 100644 --- a/coordinator/gscoordinator/servicer/flex/job.py +++ b/coordinator/gscoordinator/servicer/flex/job.py @@ -17,25 +17,16 @@ # import datetime -import logging from enum import Enum -import interactive_client -from graphscope.proto import interactive_pb2 - -from gscoordinator.scheduler import Scheduler from gscoordinator.utils import encode_datetime -logger = logging.getLogger("graphscope") - class JobType(Enum): - GRAPH_IMPORT = 0 class Status(Enum): - RUNNING = 0 CANCELLED = 1 SUCCESS = 2 @@ -44,12 +35,18 @@ class Status(Enum): class JobStatus(object): - """Base class of job status for GraphScope FLEX runnable tasks""" - def __init__( - self, jobid, status, start_time, end_time=None, detail=dict(), message="" + self, + jobid, + type, + start_time, + status=Status.RUNNING, + end_time=None, + detail=dict(), + message="", ): self.jobid = jobid + self.type = type self.status = status self.start_time = start_time self.end_time = end_time @@ -60,6 +57,7 @@ def __init__( def to_dict(self): return { "jobid": self.jobid, + "type": self.type.name, "status": self.status.name, "start_time": encode_datetime(self.start_time), "end_time": encode_datetime(self.end_time), @@ -79,47 +77,3 @@ def set_failed(self, message=""): def set_canncelled(self): self.status = Status.CANCELLED - - -class GraphImportScheduler(Scheduler): - """This class responsible for scheduling and managing the import of interactive graphs.""" - - def __init__(self, at_time, repeat, schema_mapping, servicer): - super().__init__(at_time, repeat) - - self._schema_mapping = schema_mapping - # we use interactive servicer to get the latest in runtime - self._servicer = servicer - - self._tags = [JobType.GRAPH_IMPORT] - - def run(self): - """This function needs to handle exception by itself""" - graph_name = self._schema_mapping["graph"] - - detail = {"graph name": graph_name, "type": JobType.GRAPH_IMPORT.name} - status = JobStatus(self.jobid, Status.RUNNING, self.last_run, detail=detail) - - # register status to servicer - self._servicer.register_job_status(self.jobid, status) - - with interactive_client.ApiClient( - interactive_client.Configuration(self._servicer.interactive_host) - ) as api_client: - # create an instance of the API class - api_instance = interactive_client.DataloadingApi(api_client) - - try: - api_response = api_instance.create_dataloading_job( - graph_name, - interactive_client.SchemaMapping.from_dict(self._schema_mapping), - ) - except Exception as e: - logger.warning( - "Failed to create dataloading job on graph %s: %s", - graph_name, - str(e), - ) - status.set_failed(message=str(e)) - else: - status.set_success(message=api_response.message) diff --git a/coordinator/gscoordinator/scheduler.py b/coordinator/gscoordinator/servicer/flex/scheduler.py similarity index 98% rename from coordinator/gscoordinator/scheduler.py rename to coordinator/gscoordinator/servicer/flex/scheduler.py index 86328c95f9bf..5a79dde9cb9b 100644 --- a/coordinator/gscoordinator/scheduler.py +++ b/coordinator/gscoordinator/servicer/flex/scheduler.py @@ -25,7 +25,7 @@ import schedule from schedule import CancelJob -from gscoordinator.stoppable_thread import StoppableThread +from gscoordinator.servicer.flex.stoppable_thread import StoppableThread from gscoordinator.utils import decode_datetimestr @@ -177,7 +177,7 @@ def do_run(self): """Start a thread for the job.""" # overwrite for each scheduled job self._jobid = "job-{0}".format( - datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") ) self._last_run = datetime.datetime.now() # schedule in a thread diff --git a/coordinator/gscoordinator/servicer/flex/service.py b/coordinator/gscoordinator/servicer/flex/service.py index f8d9cfff2b64..2219f9ca267e 100644 --- a/coordinator/gscoordinator/servicer/flex/service.py +++ b/coordinator/gscoordinator/servicer/flex/service.py @@ -19,13 +19,15 @@ """Service under FLEX Architecture""" import atexit +import traceback +import functools # import itertools import logging import os import threading -# from google.protobuf.json_format import MessageToDict +from google.protobuf.json_format import MessageToDict from graphscope.config import Config from graphscope.gsctl.utils import dict_to_proto_message from graphscope.proto import coordinator_service_pb2_grpc @@ -45,6 +47,26 @@ logger = logging.getLogger("graphscope") +def handle_api_exception(proto_message_response): + """Decorator to handle api exception occurs during request engine service.""" + + def _handle_api_exception(fn): + @functools.wraps(fn) + def wrapper(*args, **kwargs): + try: + return fn(*args, **kwargs) + except Exception as e: + logger.warning("Failed to execute %s: %s", str(fn.__name__), str(e)) + traceback.print_exc() + return proto_message_response( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + + return wrapper + + return _handle_api_exception + + class FlexServiceServicer(coordinator_service_pb2_grpc.CoordinatorServiceServicer): """Service under flex architecture.""" @@ -63,7 +85,6 @@ def __init__(self, config: Config): # lock to protect the service self._lock = threading.RLock() - # initialize specific service client self._service_client = self._initialize_service_client() @@ -82,22 +103,48 @@ def cleanup(self): def Connect(self, request, context): return message_pb2.ConnectResponse(solution=self._solution) + @handle_api_exception(flex_pb2.ApiResponse) + def CreateGraph(self, request, context): + graph_def_dict = MessageToDict( + request.graph_def, + preserving_proto_field_name=True, + including_default_value_fields=True, + ) + api_response = self._service_client.create_graph(graph_def_dict) + return flex_pb2.ApiResponse(code=error_codes_pb2.OK, error_msg=api_response) + + @handle_api_exception(flex_pb2.ListGraphResponse) def ListGraph(self, request, context): - try: - graphs = self._service_client.list_graph() - except Exception as e: - logger.warning("Failed to list graph: %s", str(e)) - return flex_pb2.ListGraphResponse( - code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) - ) - else: - return flex_pb2.ListGraphResponse( - code=error_codes_pb2.OK, - graphs=[ - dict_to_proto_message(g.to_dict(), flex_pb2.GraphProto()) - for g in graphs - ], - ) + graphs = self._service_client.list_graph() + return flex_pb2.ListGraphResponse( + code=error_codes_pb2.OK, + graphs=[ + dict_to_proto_message(g.to_dict(), flex_pb2.GraphProto()) + for g in graphs + ], + ) + + @handle_api_exception(flex_pb2.ApiResponse) + def DeleteGraph(self, request, context): + api_response = self._service_client.delete_graph(request.graph_name) + return flex_pb2.ApiResponse(code=error_codes_pb2.OK, error_msg=api_response) + + @handle_api_exception(flex_pb2.ApiResponse) + def CreateJob(self, request, context): + self._service_client.create_job( + request.type, request.schedule, request.description + ) + return flex_pb2.ApiResponse(code=error_codes_pb2.OK) + + @handle_api_exception(flex_pb2.ListJobResponse) + def ListJob(self, request, context): + return flex_pb2.ListJobResponse( + code=error_codes_pb2.OK, + job_status=[ + dict_to_proto_message(s.to_dict(), flex_pb2.JobStatus()) + for _, s in self._service_client.job_status.items() + ], + ) def init_flex_service_servicer(config: Config): diff --git a/coordinator/gscoordinator/stoppable_thread.py b/coordinator/gscoordinator/servicer/flex/stoppable_thread.py similarity index 100% rename from coordinator/gscoordinator/stoppable_thread.py rename to coordinator/gscoordinator/servicer/flex/stoppable_thread.py diff --git a/proto/coordinator_service.proto b/proto/coordinator_service.proto index 6102cb5ba7a2..6d7fddff3e21 100644 --- a/proto/coordinator_service.proto +++ b/proto/coordinator_service.proto @@ -55,15 +55,15 @@ service CoordinatorService { rpc ListGraph(ListGraphRequest) returns (ListGraphResponse); - // end + rpc CreateGraph(CreateGraphRequest) returns (ApiResponse); - rpc CreateInteractiveGraph(CreateInteractiveGraphRequest) returns (ApiResponse); + rpc DeleteGraph(DeleteGraphRequest) returns (ApiResponse); - rpc RemoveInteractiveGraph(RemoveInteractiveGraphRequest) returns (ApiResponse); + rpc CreateJob(CreateJobRequest) returns (ApiResponse); - rpc ImportInteractiveGraph(ImportInteractiveGraphRequest) returns (ApiResponse); + rpc ListJob(ListJobRequest) returns (ListJobResponse); - rpc ListInteractiveJob(ListInteractiveJobRequest) returns (ListInteractiveJobResponse); + // end rpc CreateInteractiveProcedure(CreateInteractiveProcedureRequest) returns (ApiResponse); diff --git a/proto/flex.proto b/proto/flex.proto index d90afd374f22..86229e9aefed 100644 --- a/proto/flex.proto +++ b/proto/flex.proto @@ -19,10 +19,18 @@ package gs.rpc; import "error_codes.proto"; +// attributes value +message MapValue { + oneof value { + string s = 1; + int64 i = 2; + bool b = 3; + } +} // property type message PropertyTypeProto { - string primitive_type = 1; // [ DT_DOUBLE, DT_STRING, DT_SIGNED_INT32, DT_SIGNED_INT64 ] + string primitive_type = 1; // [ DT_DOUBLE, DT_STRING, DT_SIGNED_INT32, DT_SIGNED_INT64 ] } // property @@ -128,7 +136,7 @@ message DataSource { // format message DataSourceFormat { string type = 1; // csv - map metadata = 2; + map metadata = 2; } // loading config @@ -149,11 +157,12 @@ message SchemaMapping { // job status message JobStatus { string jobid = 1; - string status = 2; - string start_time = 3; - string end_time = 4; - map detail = 5; - string message = 6; + string type = 2; + string status = 3; + string start_time = 4; + string end_time = 5; + map detail = 6; + string message = 7; } // procedure runtime params, used both in property and return type @@ -197,17 +206,38 @@ message NodeStatus { int32 disk_usage = 4; } +// job schedule +message Schedule { + bool run_now = 1; + string time = 2; // formate with "2023-02-21 11:56:30" + string repeate = 3; // day, week, or null +} + +// AnalyticalJobPlaceHolder +message AnalyticalJobPlaceHolder { + string a = 1; + int32 b = 2; +} + +// job description +message JobDescription { + oneof value { + SchemaMapping schema_mapping = 1; + AnalyticalJobPlaceHolder ajph = 2; + } +} + // message message ApiResponse { Code code = 1; optional string error_msg = 2; } -message CreateInteractiveGraphRequest { +message CreateGraphRequest { GraphProto graph_def = 1; } -message RemoveInteractiveGraphRequest { +message DeleteGraphRequest { string graph_name = 1; } @@ -220,13 +250,15 @@ message ListGraphResponse { repeated GraphProto graphs = 3; } -message ImportInteractiveGraphRequest { - SchemaMapping schema_mapping = 1; +message CreateJobRequest { + string type = 1; // optional value: dataloading + Schedule schedule = 2; + JobDescription description = 3; } -message ListInteractiveJobRequest {} +message ListJobRequest {} -message ListInteractiveJobResponse { +message ListJobResponse { Code code = 1; optional string error_msg = 2; diff --git a/python/graphscope/gsctl/client/rpc.py b/python/graphscope/gsctl/client/rpc.py index 94ea255d263b..fb0ba99cf776 100644 --- a/python/graphscope/gsctl/client/rpc.py +++ b/python/graphscope/gsctl/client/rpc.py @@ -89,14 +89,14 @@ def close(self): pass @handle_grpc_error - def create_interactive_graph(self, graph_def: flex_pb2.GraphProto): - request = flex_pb2.CreateInteractiveGraphRequest(graph_def=graph_def) - return self._stub.CreateInteractiveGraph(request) + def create_graph(self, graph_def: flex_pb2.GraphProto): + request = flex_pb2.CreateGraphRequest(graph_def=graph_def) + return self._stub.CreateGraph(request) @handle_grpc_error - def remove_interactive_graph(self, graph: str): - request = flex_pb2.RemoveInteractiveGraphRequest(graph_name=graph) - return self._stub.RemoveInteractiveGraph(request) + def delete_graph(self, graph_name: str): + request = flex_pb2.DeleteGraphRequest(graph_name=graph_name) + return self._stub.DeleteGraph(request) @handle_grpc_error def list_graph(self): @@ -104,14 +104,21 @@ def list_graph(self): return self._stub.ListGraph(request) @handle_grpc_error - def import_interactive_graph(self, schema_mapping: flex_pb2.SchemaMapping): - request = flex_pb2.ImportInteractiveGraphRequest(schema_mapping=schema_mapping) - return self._stub.ImportInteractiveGraph(request) + def create_job( + self, + type: str, + schedule: flex_pb2.Schedule, + description: flex_pb2.JobDescription, + ): + request = flex_pb2.CreateJobRequest( + type=type, schedule=schedule, description=description + ) + return self._stub.CreateJob(request) @handle_grpc_error - def list_interactive_job(self): - request = flex_pb2.ListInteractiveJobRequest() - return self._stub.ListInteractiveJob(request) + def list_job(self): + request = flex_pb2.ListJobRequest() + return self._stub.ListJob(request) @handle_grpc_error def create_interactive_procedure(self, procedure: flex_pb2.Procedure): diff --git a/python/graphscope/gsctl/commands/interactive.py b/python/graphscope/gsctl/commands/interactive.py new file mode 100644 index 000000000000..9b9766ddd0c4 --- /dev/null +++ b/python/graphscope/gsctl/commands/interactive.py @@ -0,0 +1,665 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Group of interactive commands under the FLEX architecture""" + +import itertools +import json + +import click +import yaml +from google.protobuf.json_format import MessageToDict + +from graphscope.gsctl.client.rpc import get_grpc_client +from graphscope.gsctl.utils import dict_to_proto_message +from graphscope.gsctl.utils import is_valid_file_path +from graphscope.gsctl.utils import read_yaml_file +from graphscope.gsctl.utils import terminal_display +from graphscope.proto import error_codes_pb2 +from graphscope.proto import flex_pb2 + + +@click.group() +def cli(): + pass + + +@cli.group() +def get(): + """Display a specific resource or group of resources""" + pass + + +@cli.group() +def describe(): + """Show details of a specific resource or group of resources""" + pass + + +@cli.group() +def create(): + """Create a resource(graph, procedure) from a file""" + pass + + +@cli.group() +def delete(): + """Delete a resource(graph, procedure) by name""" + pass + + +@cli.group() +def enable(): + """Enable a stored procedure by name""" + pass + + +@cli.group() +def disable(): + """Disable a stored procedure by name""" + pass + + +@cli.group() +def service(): + """Start, restart or stop the database service""" + pass + + +def is_success(response): + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + return False + return True + + +@create.command() +@click.option( + "-n", + "--name", + required=False, + help="The name of the graph", +) +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file to use to create the graph", +) +def graph(name, filename): + """Create a new graph in database, with the provided schema file""" + + if not is_valid_file_path(filename): + click.secho("Invalid file: {0}".format(filename), fg="blue") + return + + graph_def_dict = read_yaml_file(filename) + + # override graph name + if name is not None: + graph_def_dict["name"] = name + + # transform graph dict to proto message + graph_def = flex_pb2.GraphProto() + dict_to_proto_message(graph_def_dict, graph_def) + + grpc_client = get_grpc_client() + if not grpc_client.connect(): + return + + response = grpc_client.create_graph(graph_def) + + if is_success(response): + click.secho( + "Create graph {0} successfully.".format(graph_def_dict["name"]), + fg="green", + ) + + +@create.command() +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file to use to create the job", +) +def job(filename): + """Create or schedule a job in database""" + + def _read_and_fill_raw_data(config): + for mapping in itertools.chain( + config["vertex_mappings"], config["edge_mappings"] + ): + for index, location in enumerate(mapping["inputs"]): + # path begin with "@" represents the local file + if location.startswith("@"): + if "raw_data" not in mapping: + mapping["raw_data"] = [] + + # read file and set raw data + with open(location[1:], "rb") as f: + content = f.read() + mapping["raw_data"].append(content) + + if not is_valid_file_path(filename): + click.secho("Invalid file: {0}".format(filename), fg="blue") + return + + job = read_yaml_file(filename) + + try: + _read_and_fill_raw_data(job["description"]) + + # job schedule + schedule = flex_pb2.Schedule() + dict_to_proto_message(job["schedule"], schedule) + + # job description + schema_mapping = flex_pb2.SchemaMapping() + dict_to_proto_message(job["description"], schema_mapping) + description = flex_pb2.JobDescription(schema_mapping=schema_mapping) + + grpc_client = get_grpc_client() + if not grpc_client.connect(): + return + + response = grpc_client.create_job( + type="dataloading", + schedule=schedule, + description=description, + ) + + if is_success(response): + click.secho( + "Job has been submitted", + fg="green", + ) + except Exception as e: + click.secho(str(e), fg="red") + + +@create.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph", +) +@click.option( + "-n", + "--name", + required=True, + help="The name of the procedure", +) +@click.option( + "-i", + "--sourcefile", + required=True, + help="Path of [ .cc, .cypher ] file", +) +@click.option( + "-d", + "--description", + required=False, + help="Description for the specific procedure", +) +def procedure(graph, name, sourcefile, description): + """Create and compile procedure over a specific graph""" + + # read source + with open(sourcefile, "r") as f: + query = f.read() + + # construct procedure proto + procedure = flex_pb2.Procedure( + name=name, bound_graph=graph, description=description, query=query, enable=True + ) + + grpc_client = get_grpc_client() + response = grpc_client.create_interactive_procedure(procedure) + + if is_success(response): + click.secho("Create procedure {0} successfully.".format(name), fg="green") + + +@delete.command() +@click.argument("graph", required=True) +def graph(graph): + """Delete a graph by name""" + + grpc_client = get_grpc_client() + response = grpc_client.delete_graph(graph) + + if is_success(response): + click.secho( + "Delete graph {0} successfully.".format(graph), + fg="green", + ) + + +@delete.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph", +) +@click.option( + "-n", + "--name", + required=True, + help="The name of the procedure to be deleted", +) +def procedure(graph, name): + """Delete a procedure over a specific graph in database""" + + grpc_client = get_grpc_client() + response = grpc_client.remove_interactive_procedure(graph, name) + + if is_success(response): + click.secho( + "Remove procedure {0} on graph {1} successfully.".format(name, graph), + fg="green", + ) + + +@enable.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph", +) +@click.option( + "-n", + "--name", + required=True, + help="List of procedure's name to enable, seprated by comma", +) +def procedure(graph, name): + """Enable stored procedures in a given graph""" + + # remove the last "," if exists + if name.endswith(","): + name = name[:-1] + procedure_list = name.split(",") + + # list current procedures + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_procedure(graph) + + if is_success(response): + # enable procedures locally + procedures = response.procedures + for p in procedures: + if p.name in procedure_list: + p.enable = True + procedure_list.remove(p.name) + + # check + if procedure_list: + click.secho( + "Procedure {0} not found.".format(str(procedure_list)), fg="red" + ) + return + + # update procedures + response = grpc_client.update_interactive_procedure(procedures) + if is_success(response): + click.secho("Update procedures successfully.", fg="green") + + +@disable.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph", +) +@click.option( + "-n", + "--name", + required=True, + help="List of procedure's name to enable, seprated by comma", +) +def proceduree(graph, name): + """Disable stored procedures in a given graph""" + + # remove the last "," if exists + if name.endswith(","): + name = name[:-1] + procedure_list = name.split(",") + + # list current procedures + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_procedure(graph) + + if is_success(response): + # disable procedures locally + procedures = response.procedures + for p in procedures: + if p.name in procedure_list: + p.enable = False + procedure_list.remove(p.name) + + # check + if procedure_list: + click.secho( + "Procedure {0} not found.".format(str(procedure_list)), fg="red" + ) + return + + # update procedures + response = grpc_client.update_interactive_procedure(procedures) + if is_success(response): + click.secho("Update procedures successfully.", fg="green") + + +@service.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph", +) +def start(graph): + """Stop current service and start a new one on a specified graph""" + + service_def = flex_pb2.Service(graph_name=graph) + + grpc_client = get_grpc_client() + response = grpc_client.start_interactive_service(service_def) + + if is_success(response): + click.secho( + "Start service on graph {0} successfully.".format(graph), fg="green" + ) + + +@service.command() +def stop(): + """Stop the current database service""" + + grpc_client = get_grpc_client() + response = grpc_client.stop_interactive_service() + + if is_success(response): + click.secho("Service has stopped.", fg="green") + + +@service.command() +def restart(): + """Restart database service on current graph""" + + grpc_client = get_grpc_client() + response = grpc_client.restart_interactive_service() + + if is_success(response): + click.secho("Restart service successfully.", fg="green") + + +@get.command() +def graph(): + """Display graphs in database""" + + def _construct_and_display_data(graphs): + if not graphs: + click.secho("no graph found in database.", fg="blue") + return + + head = ["NAME", "STORE TYPE", "VERTEX LABEL SIZE", "EDGE LABEL SIZE"] + data = [head] + + for g in graphs: + data.append( + [ + g.name, + g.store_type, + str(len(g.schema.vertex_types)), + str(len(g.schema.edge_types)), + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + if not grpc_client.connect(): + return + + response = grpc_client.list_graph() + + if is_success(response): + _construct_and_display_data(response.graphs) + + +@get.command() +def job(): + """Display jobs in database""" + + def _construct_and_display_data(job_status): + if not job_status: + click.secho("no job found in database", fg="blue") + return + + head = ["JOBID", "TYPE", "STATUS", "START TIME", "END TIME"] + data = [head] + + for s in job_status: + # detail = {} + # for k, v in s.detail.items(): + # detail[k] = v + + # message = s.message.replace("\n", "") + # if len(message) > 63: + # message = message[:63] + "..." + + data.append( + [ + s.jobid, + s.type, + s.status, + s.start_time, + s.end_time, + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + if not grpc_client.connect(): + return + + response = grpc_client.list_job() + + if is_success(response): + _construct_and_display_data(response.job_status) + + +@get.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph", +) +def procedure(graph): + """Display procedures in database""" + + def _construct_and_display_data(procedures): + if not procedures: + click.secho("no procedure found in database.", fg="blue") + return + + head = ["NAME", "TYPE", "ENABLE", "DESCRIPTION"] + data = [head] + + for procedure in procedures: + data.append( + [ + procedure.name, + procedure.type, + str(procedure.enable), + procedure.description, + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_procedure(graph) + + if is_success(response): + _construct_and_display_data(response.procedures) + return response.procedures + + +@get.command() +def service(): + """Display service status in database""" + + def _construct_and_display_data(service_status): + head = [ + "STATUS", + "SERVICE GRAPH", + "BOLT SERVICE ENDPOINT", + "HQPS SERVICE ENDPOINT", + ] + data = [head] + + data.append( + [ + service_status.status, + service_status.graph_name, + service_status.bolt_port, + service_status.hqps_port, + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + response = grpc_client.get_interactive_service_status() + + if is_success(response): + _construct_and_display_data(response.service_status) + return response.service_status + + +@get.command() +def node(): + """Display resource (CPU/memory) usage of nodes""" + + def _construct_and_display_data(nodes_status): + head = ["NAME", "CPU USAGE(%)", "MEMORY USAGE(%)", "DISK USAGE(%)"] + data = [head] + + for s in nodes_status: + data.append( + [ + s.node, + s.cpu_usage, + s.memory_usage, + s.disk_usage, + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + response = grpc_client.get_node_status() + + if is_success(response): + _construct_and_display_data(response.nodes_status) + return response.nodes_status + + +@describe.command() +@click.argument("graph", required=False) +def graph(graph): + """Show details of graph""" + + grpc_client = get_grpc_client() + if not grpc_client.connect(): + return + + response = grpc_client.list_graph() + + if is_success(response): + graphs = response.graphs + + if not graphs: + click.secho("no graph found in database.", fg="blue") + return + + specific_graph_exist = False + for g in graphs: + if graph is not None and g.name != graph: + continue + + # display + click.secho(yaml.dump(MessageToDict(g, preserving_proto_field_name=True))) + + if graph is not None and g.name == graph: + specific_graph_exist = True + break + + if graph is not None and not specific_graph_exist: + click.secho('graph "{0}" not found.'.format(graph), fg="blue") + + +@describe.command() +@click.argument("job", required=False) +def job(job): + """Show details of job""" + + grpc_client = get_grpc_client() + if not grpc_client.connect(): + return + + response = grpc_client.list_job() + + if is_success(response): + job_status = response.job_status + + if not job_status: + click.secho("no job found in database", fg="blue") + return + + specific_job_exist = False + for s in job_status: + if job is not None and s.jobid != job: + continue + + # display + click.secho(yaml.dump(MessageToDict(s, preserving_proto_field_name=True))) + + if job is not None and s.jobid == job: + specific_job_exist = True + break + + if job is not None and not specific_job_exist: + click.secho('job "{0}" not found.'.format(job), fg="blue") + + +if __name__ == "__main__": + cli() diff --git a/python/graphscope/gsctl/utils.py b/python/graphscope/gsctl/utils.py index e0861acda132..30c5d62d9d43 100644 --- a/python/graphscope/gsctl/utils.py +++ b/python/graphscope/gsctl/utils.py @@ -19,13 +19,21 @@ import os from typing import List +import click import yaml +from google.protobuf.internal.containers import ScalarMap + +from graphscope.proto import flex_pb2 + def read_yaml_file(path): """Reads YAML file and returns as a python object.""" - with open(path, "r") as file: - return yaml.safe_load(file) + try: + with open(path, "r") as file: + return yaml.safe_load(file) + except Exception as e: + click.secho(str(e), fg="red") def write_yaml_file(data, path): @@ -62,11 +70,19 @@ def _parse_dict(values, message): elif isinstance(v, list): _parse_list(v, getattr(message, k)) else: # value can be set - try: + if hasattr(message, k): setattr(message, k, v) - except AttributeError: - # treat as map - message[k] = v + else: + try: + # treat as a map + if isinstance(v, bool): + message[k].CopyFrom(flex_pb2.MapValue(b=v)) + elif isinstance(v, int): + message[k].CopyFrom(flex_pb2.MapValue(i=v)) + elif isinstance(v, str): + message[k].CopyFrom(flex_pb2.MapValue(s=v)) + except AttributeError as e: + click.secho(str(e), fg="red") _parse_dict(values, message) return message