diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 04c282c3fe36..ce961ef29c85 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -30,6 +30,7 @@ from graphscope.config import Config from graphscope.proto import coordinator_service_pb2_grpc +from gscoordinator.servicer import init_interactive_service_servicer from gscoordinator.servicer import init_graphscope_one_service_servicer from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH @@ -109,6 +110,7 @@ def get_servicer(config: Config): """Get servicer of specified solution under FLEX architecture""" service_initializers = { "GraphScope One": init_graphscope_one_service_servicer, + "Interactive": init_interactive_service_servicer, } initializer = service_initializers.get(config.solution) diff --git a/coordinator/gscoordinator/scheduler.py b/coordinator/gscoordinator/scheduler.py new file mode 100644 index 000000000000..86328c95f9bf --- /dev/null +++ b/coordinator/gscoordinator/scheduler.py @@ -0,0 +1,260 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import json +import time +from abc import ABCMeta +from abc import abstractmethod + +import schedule +from schedule import CancelJob + +from gscoordinator.stoppable_thread import StoppableThread +from gscoordinator.utils import decode_datetimestr + + +class Schedule(object): + """Schedule class that wrapper dbader schedule + + Repo: https://github.com/dbader/schedule. + """ + + def __init__(self): + self._schedule = schedule.Scheduler() + self._run_pending_thread = StoppableThread(target=self.run_pending, args=()) + self._run_pending_thread.daemon = True + self._run_pending_thread.start() + + @property + def schedule(self): + return self._schedule + + def run_pending(self): + """Run all jobs that are scheduled to run.""" + while True: + try: + self._schedule.run_pending() + time.sleep(1) + except: # noqa: E722 + pass + + +schedule = Schedule().schedule # noqa: F811 + + +class Scheduler(metaclass=ABCMeta): + """ + Objects instantiated by the :class:`Scheduler ` are + factories to create jobs, keep record of scheduled jobs and + handle their execution in the :method:`run` method. + """ + + def __init__(self, at_time, repeat): + # scheduler id + self._scheduler_id = "Job-scheduler-{0}".format( + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + ) + # periodic job as used + self._job = None + # true will be run immediately + self._run_now = False + # time at which this job to schedule + self._at_time = self._decode_datetimestr(at_time) + # repeat every day or week, or run job once(no repeat) + # optional value "day", "week", "null" + self._repeat = repeat + # job running thread, note that: + # the last job should be end of execution at the beginning of the next job + self._running_thread = None + # tags + self._tags = [] + + # when the job actually scheduled, the following variables will be generated and overridden. + self._jobid = None + self._last_run = None + + def _decode_datetimestr(self, datetime_str): + if datetime_str == "now": + self._run_now = True + return datetime.datetime.now() + return decode_datetimestr(datetime_str) + + def __str__(self): + return "Scheduler(at_time={}, repeat={})".format(self._at_time, self._repeat) + + @property + def monday(self): + return self._at_time.weekday() == 0 + + @property + def tuesday(self): + return self._at_time.weekday() == 1 + + @property + def wednesday(self): + return self._at_time.weekday() == 2 + + @property + def thursday(self): + return self._at_time.weekday() == 3 + + @property + def friday(self): + return self._at_time.weekday() == 4 + + @property + def saturday(self): + return self._at_time.weekday() == 5 + + @property + def sunday(self): + return self._at_time.weekday() == 6 + + @property + def timestr(self): + """return str of the time object. + time([hour[, minute[, second[, microsecond[, tzinfo]]]]]) --> a time object + """ + return str(self._at_time.time()) + + @property + def job(self): + """A periodic job managed by the dbader scheduler. + https://github.com/dbader/schedule. + """ + return self._job + + @property + def jobid(self): + """id for the last scheduled job""" + return self._jobid + + @property + def schedulerid(self): + """id for the scheduler""" + return self._scheduler_id + + @property + def last_run(self): + """datetime of the last run""" + return self._last_run + + @property + def tags(self): + return self._tags + + @property + def running_thread(self): + return self._running_thread + + def run_once(self): + """Run the job immediately.""" + self.do_run() + return CancelJob + + def waiting_until_to_run(self): + """Run the job once at a specific time.""" + if datetime.datetime.now() >= self._at_time: + return self.run_once() + + def do_run(self): + """Start a thread for the job.""" + # overwrite for each scheduled job + self._jobid = "job-{0}".format( + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + self._last_run = datetime.datetime.now() + # schedule in a thread + self._running_thread = StoppableThread(target=self.run, args=()) + self._running_thread.daemon = True + self._running_thread.start() + + def submit(self): + if not self._run_now and self._repeat not in ["week", "day", "null", None]: + raise RuntimeError( + "Submit schedule job failed: at_time is '{0}', repeat is '{1}'".format( + self._at_time, self._repeat + ) + ) + + if self._run_now: + self._job = schedule.every().seconds.do(self.run_once) + + if not self._run_now and self._repeat == "week": + if self.monday: + self._job = schedule.every().monday.at(self.timestr).do(self.do_run) + elif self.tuesday: + self._job = schedule.every().tuesday.at(self.timestr).do(self.do_run) + elif self.wednesday: + self._job = schedule.every().wednesday.at(self.timestr).do(self.do_run) + elif self.thursday: + self._job = schedule.every().thursday.at(self.timestr).do(self.do_run) + elif self.friday: + self._job = schedule.every().friday.at(self.timestr).do(self.do_run) + elif self.saturday: + self._job = schedule.every().saturday.at(self.timestr).do(self.do_run) + elif self.sunday: + self._job = schedule.every().sunday.at(self.timestr).do(self.do_run) + + if not self._run_now and self._repeat == "day": + self._job = schedule.every().day.at(self.timestr).do(self.do_run) + + if not self._run_now and self._repeat in ["null", None]: + self._job = ( + schedule.every().day.at(self.timestr).do(self.waiting_until_to_run) + ) + + # tag + self._job.tag(self._scheduler_id, *self._tags) + + def start(self): + """Submit and schedule the job.""" + self.submit() + + def cancel(self): + """ + Set the running job thread stoppable and wait for the + thread to exit properly by using join() method. + """ + if self._running_thread is not None and self._running_thread.is_alive(): + self._running_thread.stop() + self._running_thread.join() + + @abstractmethod + def run(self): + """ + Methods that all subclasses need to implement, note that + subclass needs to handle exception by itself. + """ + raise NotImplementedError + + +def cancel_job(job, delete_scheduler=True): + """ + Cancel the job which going to scheduled or cancel the whole scheduler. + + Args: + job: Periodic job as used by :class:`Scheduler`. + delete_scheduler: True will can the whole scheduler, otherwise, + delay the next-run time by on period. + """ + if delete_scheduler: + schedule.cancel_job(job) + else: + job.next_run += job.period diff --git a/coordinator/gscoordinator/servicer/__init__.py b/coordinator/gscoordinator/servicer/__init__.py index 9cc0c51d02ed..9b169d1e23d0 100644 --- a/coordinator/gscoordinator/servicer/__init__.py +++ b/coordinator/gscoordinator/servicer/__init__.py @@ -16,4 +16,5 @@ # limitations under the License. # +from gscoordinator.servicer.interactive.service import * from gscoordinator.servicer.graphscope_one.service import * diff --git a/coordinator/gscoordinator/servicer/base_service.py b/coordinator/gscoordinator/servicer/base_service.py new file mode 100644 index 000000000000..895ea53fb3dd --- /dev/null +++ b/coordinator/gscoordinator/servicer/base_service.py @@ -0,0 +1,45 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import atexit +import logging + +from graphscope.config import Config +from graphscope.proto import coordinator_service_pb2_grpc +from graphscope.proto import message_pb2 + + +class BaseServiceServicer(coordinator_service_pb2_grpc.CoordinatorServiceServicer): + """Base class of coordinator service""" + + def __init__(self, config: Config): + self._config = config + atexit.register(self.cleanup) + + def __del__(self): + self.cleanup() + + def Connect(self, request, context): + return message_pb2.ConnectResponse(solution=self._config.solution) + + @property + def launcher_type(self): + return self._config.launcher_type + + def cleanup(self): + pass diff --git a/coordinator/gscoordinator/servicer/interactive/__init__.py b/coordinator/gscoordinator/servicer/interactive/__init__.py new file mode 100644 index 000000000000..3a8204719d16 --- /dev/null +++ b/coordinator/gscoordinator/servicer/interactive/__init__.py @@ -0,0 +1,26 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys + +try: + sys.path.insert(0, os.path.dirname(__file__)) + import interactive_client +except ImportError: + raise diff --git a/coordinator/gscoordinator/servicer/interactive/job.py b/coordinator/gscoordinator/servicer/interactive/job.py new file mode 100644 index 000000000000..0281f5f7840f --- /dev/null +++ b/coordinator/gscoordinator/servicer/interactive/job.py @@ -0,0 +1,125 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import logging +from enum import Enum + +import interactive_client +from graphscope.proto import interactive_pb2 + +from gscoordinator.scheduler import Scheduler +from gscoordinator.utils import encode_datetime + +logger = logging.getLogger("graphscope") + + +class JobType(Enum): + + GRAPH_IMPORT = 0 + + +class Status(Enum): + + RUNNING = 0 + CANCELLED = 1 + SUCCESS = 2 + FAILED = 3 + WAITING = 4 + + +class JobStatus(object): + """Base class of job status for GraphScope FLEX runnable tasks""" + + def __init__( + self, jobid, status, start_time, end_time=None, detail=dict(), message="" + ): + self.jobid = jobid + self.status = status + self.start_time = start_time + self.end_time = end_time + # detail for specific job + self.detail = detail + self.message = message + + def to_dict(self): + return { + "jobid": self.jobid, + "status": self.status.name, + "start_time": encode_datetime(self.start_time), + "end_time": encode_datetime(self.end_time), + "detail": self.detail, + "message": self.message, + } + + def set_success(self, message=""): + self.status = Status.SUCCESS + self.message = message + self.end_time = datetime.datetime.now() + + def set_failed(self, message=""): + self.status = Status.FAILED + self.message = message + self.end_time = datetime.datetime.now() + + def set_canncelled(self): + self.status = Status.CANCELLED + + +class GraphImportScheduler(Scheduler): + """This class responsible for scheduling and managing the import of interactive graphs.""" + + def __init__(self, at_time, repeat, schema_mapping, servicer): + super().__init__(at_time, repeat) + + self._schema_mapping = schema_mapping + # we use interactive servicer to get the latest in runtime + self._servicer = servicer + + self._tags = [JobType.GRAPH_IMPORT] + + def run(self): + """This function needs to handle exception by itself""" + graph_name = self._schema_mapping["graph"] + + detail = {"graph name": graph_name, "type": JobType.GRAPH_IMPORT.name} + status = JobStatus(self.jobid, Status.RUNNING, self.last_run, detail=detail) + + # register status to servicer + self._servicer.register_job_status(self.jobid, status) + + with interactive_client.ApiClient( + interactive_client.Configuration(self._servicer.interactive_host) + ) as api_client: + # create an instance of the API class + api_instance = interactive_client.DataloadingApi(api_client) + + try: + api_response = api_instance.create_dataloading_job( + graph_name, + interactive_client.SchemaMapping.from_dict(self._schema_mapping), + ) + except Exception as e: + logger.warning( + "Failed to create dataloading job on graph %s: %s", + graph_name, + str(e), + ) + status.set_failed(message=str(e)) + else: + status.set_success(message=api_response.message) diff --git a/coordinator/gscoordinator/servicer/interactive/service.py b/coordinator/gscoordinator/servicer/interactive/service.py new file mode 100644 index 000000000000..0c92fb9ded42 --- /dev/null +++ b/coordinator/gscoordinator/servicer/interactive/service.py @@ -0,0 +1,337 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Interactive Service under FLEX Architecture""" + +import itertools +import logging +import os +import threading + +import interactive_client +from google.protobuf.json_format import MessageToDict +from graphscope.config import Config +from graphscope.gsctl.utils import dict_to_proto_message +from graphscope.proto import error_codes_pb2 +from graphscope.proto import interactive_pb2 + +from gscoordinator.scheduler import schedule +from gscoordinator.servicer.base_service import BaseServiceServicer +from gscoordinator.servicer.interactive.job import GraphImportScheduler +from gscoordinator.utils import WORKSPACE +from gscoordinator.utils import delegate_command_to_pod +from gscoordinator.utils import run_kube_cp_command + +__all__ = ["InteractiveServiceServicer", "init_interactive_service_servicer"] + +logger = logging.getLogger("graphscope") + + +# There are two workspaces for FLEX Interactive, one residing on "coordinator" node +# and the other on the "interactive" node. These two workspaces are equivalent when +# running mode is "hosts". Hence, INTERACTIVE_WORKSPACE is only effective in +# Kubernetes (k8s) environment. +INTERACTIVE_WORKSPACE = ( + os.environ["INTERACTIVE_WORKSPACE"] + if "INTERACTIVE_WORKSPACE" in os.environ + else "/tmp" +) + + +INTERACTIVE_CONTAINER_NAME = "interactive" + + +class InteractiveServiceServicer(BaseServiceServicer): + """Interactive service under flex architecture.""" + + def __init__(self, config: Config): + super().__init__(config) + + # lock to protect the service + self._lock = threading.RLock() + + # interactive host + self._interactive_host = None + self._fetch_interactive_host_impl() + self._fetch_interactive_host_job = schedule.every(60).seconds.do( + self._fetch_interactive_host_impl + ) + + # interactive pod list + self._interactive_pod_list = [] + + # job status + self._job_status = {} + + def cleanup(self): + pass + + def _fetch_interactive_host_impl(self): + # TODO: get interactive service endpoint by instance id + self._interactive_host = "http://127.0.0.1:8080" + + @property + def interactive_host(self): + return self._interactive_host + + def register_job_status(self, jobid, status): + self._job_status[jobid] = status + + def write_and_distribute_file(self, graph, location, raw_data): + """ + Args: + graph(str): graph name. + location(str): file location over coordinator node. + raw_data(bytes): binary of file data. + """ + # format: _ + filename = "{0}_{1}".format(graph, os.path.basename(location)) + + # write temp file + tmp_file = os.path.join(WORKSPACE, filename) + with open(tmp_file, "wb") as f: + f.write(raw_data) + + # distribute + target_file = tmp_file + if self.launcher_type == "k8s": + # update file path in interactive pod + target_file = os.path.join(INTERACTIVE_WORKSPACE, filename) + + for pod in self._interactive_pod_list: + container = INTERACTIVE_CONTAINER_NAME + cmd = f"mkdir -p {os.path.dirname(target_file)}" + logger.debug(delegate_command_to_pod(cmd, pod, container)) + logger.debug( + run_kube_cp_command(tmp_file, target_file, pod, container, True) + ) + + return target_file + + def CreateInteractiveGraph(self, request, context): + with interactive_client.ApiClient( + interactive_client.Configuration(self._interactive_host) + ) as api_client: + # create an instance of the API class + api_instance = interactive_client.GraphApi(api_client) + + graph_def_dict = MessageToDict( + request.graph_def, preserving_proto_field_name=True + ) + graph = interactive_client.Graph.from_dict(graph_def_dict) + + try: + api_response = api_instance.create_graph(graph) + except Exception as e: + logger.warning("Failed to create interactive graph. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ApiResponse( + code=error_codes_pb2.OK, error_msg=api_response.message + ) + + def RemoveInteractiveGraph(self, request, context): + with interactive_client.ApiClient( + interactive_client.Configuration(self._interactive_host) + ) as api_client: + api_instance = interactive_client.GraphApi(api_client) + + try: + api_response = api_instance.delete_graph(request.graph_name) + except Exception as e: + logger.warning("Failed to remove interactive graph. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ApiResponse( + code=error_codes_pb2.OK, error_msg=api_response.message + ) + + def ListInteractiveGraph(self, request, context): + with interactive_client.ApiClient( + interactive_client.Configuration(self._interactive_host) + ) as api_client: + api_instance = interactive_client.GraphApi(api_client) + + try: + api_response = api_instance.list_graphs() + except Exception as e: + logger.warning("Failed to list interactive graph. %s", str(e)) + return interactive_pb2.ListInteractiveGraphResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ListInteractiveGraphResponse( + code=error_codes_pb2.OK, + graphs=[ + dict_to_proto_message(g.to_dict(), interactive_pb2.GraphProto()) + for g in api_response + ], + ) + + def ImportInteractiveGraph(self, request, context): + # write raw data to file and copy to interactive workspace + try: + schema_mapping_proto = request.schema_mapping + for mapping in itertools.chain( + schema_mapping_proto.vertex_mappings, schema_mapping_proto.edge_mappings + ): + raw_data_index = 0 + for index, location in enumerate(mapping.inputs): + if location.startswith("@"): + # write raw data and distribute file to interactive workspace + new_location = self.write_and_distribute_file( + schema_mapping_proto.graph, + location, + mapping.raw_data[raw_data_index], + ) + raw_data_index += 1 + # update the location + mapping.inputs[index] = new_location + except Exception as e: + logger.warning("Failed to distribute file. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.NETWORK_ERROR, error_msg=str(e) + ) + + # transform proto to dict + schema_mapping = MessageToDict( + schema_mapping_proto, preserving_proto_field_name=True + ) + + # create job scheduler + scheduler = GraphImportScheduler( + at_time="now", + repeat=None, + schema_mapping=schema_mapping, + servicer=self, + ) + scheduler.start() + + return interactive_pb2.ApiResponse(code=error_codes_pb2.OK) + + def ListInteractiveJob(self, request, context): + return interactive_pb2.ListInteractiveJobResponse( + code=error_codes_pb2.OK, + job_status=[ + dict_to_proto_message(s.to_dict(), interactive_pb2.JobStatus()) + for _, s in self._job_status.items() + ], + ) + + def CreateInteractiveProcedure(self, request, context): + with interactive_client.ApiClient( + interactive_client.Configuration(self._interactive_host) + ) as api_client: + # create an instance of the API class + api_instance = interactive_client.ProcedureApi(api_client) + + # transform proto to dict + procedure_def_dict = MessageToDict( + request.procedure_def, preserving_proto_field_name=True + ) + + graph_name = procedure_def_dict["bound_graph"] + procedure = interactive_client.Procedure.from_dict(procedure_def_dict) + + try: + api_response = api_instance.create_procedure(graph_name, procedure) + except Exception as e: + logger.warning("Failed to create procedure. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ApiResponse( + code=error_codes_pb2.OK, error_msg=api_response.message + ) + + def ListInteractiveProcedure(self, request, context): + with interactive_client.ApiClient( + interactive_client.Configuration(self._interactive_host) + ) as api_client: + api_instance = interactive_client.ProcedureApi(api_client) + + try: + api_response = api_instance.list_procedures(request.graph_name) + except Exception as e: + logger.warning("Failed to list procedure. %s", str(e)) + return interactive_pb2.ListInteractiveProcedureResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ListInteractiveProcedureResponse( + code=error_codes_pb2.OK, + procedures=[ + dict_to_proto_message(p.to_dict(), interactive_pb2.Procedure()) + for p in api_response + ], + ) + + def UpdateInteractiveProcedure(self, request, context): + with interactive_client.ApiClient( + interactive_client.Configuration(self._interactive_host) + ) as api_client: + api_instance = interactive_client.ProcedureApi(api_client) + + for p in request.procedures: + # transform proto to dict + procedure_def_dict = MessageToDict(p, preserving_proto_field_name=True) + + graph_name = procedure_def_dict["bound_graph"] + procedure_name = procedure_def_dict["name"] + procedure = interactive_client.Procedure.from_dict(procedure_def_dict) + + try: + api_response = api_instance.update_procedure( + graph_name, procedure_name, procedure + ) + except Exception as e: + logger.warning("Failed to update procedure. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + + return interactive_pb2.ApiResponse(code=error_codes_pb2.OK) + + def RemoveInteractiveProcedure(self, request, context): + with interactive_client.ApiClient( + interactive_client.Configuration(self._interactive_host) + ) as api_client: + api_instance = interactive_client.ProcedureApi(api_client) + + try: + api_response = api_instance.delete_procedure( + request.graph_name, request.procedure_name + ) + except Exception as e: + logger.warning("Failed to remove procedure. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ApiResponse( + code=error_codes_pb2.OK, error_msg=api_response.message + ) + + +def init_interactive_service_servicer(config: Config): + return InteractiveServiceServicer(config) diff --git a/coordinator/gscoordinator/stoppable_thread.py b/coordinator/gscoordinator/stoppable_thread.py new file mode 100644 index 000000000000..96a072e9f5a7 --- /dev/null +++ b/coordinator/gscoordinator/stoppable_thread.py @@ -0,0 +1,39 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import threading + + +class StoppableThread(threading.Thread): + """ + This is one of the simplest mechaisms for a stoppable thread to + hold an 'exit_request' flag that each thread checks on a regular + interval to see if it is time for it to exit. + """ + + def __init__(self, *args, **kwargs): + super(StoppableThread, self).__init__(*args, **kwargs) + self._stop_event = threading.Event() + + def stop(self): + self._stop_event.set() + + def stopped(self): + """The thread itself should check regularly for the stopped() condition. + """ + return self._stop_event.is_set() diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index f39d99f2b036..61095bb739c4 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -219,6 +219,26 @@ def get_timestamp() -> float: return datetime.datetime.timestamp(datetime.datetime.now()) +def decode_datetimestr(datetime_str): + formats = ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d"] + for f in formats: + try: + return datetime.datetime.strptime(datetime_str, f) + except ValueError: + pass + raise RuntimeError( + "Decode '{0}' failed: format should be one of '{1}'".format( + datetime_str, str(formats) + ) + ) + + +def encode_datetime(dt): + if isinstance(dt, datetime.datetime): + return dt.strftime("%Y-%m-%d %H:%M:%S") + return str(dt) + + def get_lib_path(app_dir: str, app_name: str) -> str: if sys.platform == "linux" or sys.platform == "linux2": return os.path.join(app_dir, "lib%s.so" % app_name) diff --git a/coordinator/requirements.txt b/coordinator/requirements.txt index 72e88e88fbe1..a8618f86746c 100644 --- a/coordinator/requirements.txt +++ b/coordinator/requirements.txt @@ -9,3 +9,5 @@ vineyard-io>=0.16.3;sys_platform!="win32" prometheus-client>=0.14.1 packaging tqdm +pydantic >= 2 +schedule diff --git a/proto/coordinator_service.proto b/proto/coordinator_service.proto index 1d5ed680f8d5..ea3b17e60267 100644 --- a/proto/coordinator_service.proto +++ b/proto/coordinator_service.proto @@ -17,6 +17,7 @@ syntax = "proto3"; package gs.rpc; import "message.proto"; +import "interactive.proto"; service CoordinatorService { // Connect a session. @@ -51,4 +52,22 @@ service CoordinatorService { // service functions under FLEX architecture rpc Connect(ConnectRequest) returns (ConnectResponse); + + rpc CreateInteractiveGraph(CreateInteractiveGraphRequest) returns (ApiResponse); + + rpc RemoveInteractiveGraph(RemoveInteractiveGraphRequest) returns (ApiResponse); + + rpc ListInteractiveGraph(ListInteractiveGraphRequest) returns (ListInteractiveGraphResponse); + + rpc ImportInteractiveGraph(ImportInteractiveGraphRequest) returns (ApiResponse); + + rpc ListInteractiveJob(ListInteractiveJobRequest) returns (ListInteractiveJobResponse); + + rpc CreateInteractiveProcedure(CreateInteractiveProcedureRequest) returns (ApiResponse); + + rpc ListInteractiveProcedure(ListInteractiveProcedureRequest) returns (ListInteractiveProcedureResponse); + + rpc UpdateInteractiveProcedure(UpdateInteractiveProcedureRequest) returns (ApiResponse); + + rpc RemoveInteractiveProcedure(RemoveInteractiveProcedureRequest) returns (ApiResponse); } diff --git a/proto/error_codes.proto b/proto/error_codes.proto index 7b9ca43e8838..1107e3bb55e4 100644 --- a/proto/error_codes.proto +++ b/proto/error_codes.proto @@ -84,6 +84,9 @@ enum Code { // Results of workers not consistent WORKER_RESULTS_INCONSISTENT_ERROR = 41; + // Api exception during communication between coordinator and engines + API_EXCEPTION_ERROR = 51; + // Unknown error. UNKNOWN_ERROR = 101; diff --git a/proto/interactive.proto b/proto/interactive.proto new file mode 100644 index 000000000000..191b87ec2faf --- /dev/null +++ b/proto/interactive.proto @@ -0,0 +1,236 @@ +// Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +package gs.rpc; + +import "error_codes.proto"; + +// property type +message PropertyTypeProto { + string primitive_type = 1; // [ DT_DOUBLE, DT_STRING, DT_SIGNED_INT32, DT_SIGNED_INT64 ] +} + +// property +message PropertyProto { + int32 property_id = 1; + string property_name = 2; + PropertyTypeProto property_type = 3; +} + +// csr params definition, used for storage optimization +message CSRParamsProto { + optional string incoming_edge_strategy = 1; // one of [Single, Multple] + optional string outgoing_edge_strategy = 2; // one of [Single, Multple] +} + +// vertex pair +message VertexPairProto { + string source_vertex = 1; // label + string destination_vertex = 2; // label + string relation = 3; // one of [ MANY_TO_MANY, ONE_TO_MANY, MANY_TO_ONE, ONE_TO_ONE, ] + CSRParamsProto x_csr_params = 4; +} + +// vertex +message VertexProto { + int32 type_id = 1; // index + string type_name = 2; // label + repeated PropertyProto properties = 3; + repeated string primary_keys = 4; +} + +// edge +message EdgeProto { + int32 type_id = 1; // index + string type_name = 2; // label + repeated VertexPairProto vertex_type_pair_relations = 3; + repeated PropertyProto properties = 4; +} + +// schema +message Schema { + repeated VertexProto vertex_types = 1; + repeated EdgeProto edge_types = 2; +} + +// stored procedure +message StoredProcedureProto { + string directory = 1; // one of [ plugins ] +} + +// graph +message GraphProto { + string name = 1; + string store_type = 2; // one of [ mutable_csr, ] + StoredProcedureProto stored_procedures = 3; + Schema schema = 4; +} + +// column +message Column { + int32 index = 1; + string name = 2; +} + +// column mapping +// column index(name) of datasource -> property +message ColumnMapping { + Column column = 1; + string property = 2; +} + +// vertex mapping +message VertexMapping { + string type_name = 1; // vertex label + repeated string inputs = 2; // protocol:///path/to/data + repeated bytes raw_data = 3; // raw data + repeated ColumnMapping column_mappings = 4; +} + +// type triplet for edge +message TypeTriplet { + string edge = 1; // edge label + string source_vertex = 2; // src vertex label + string destination_vertex = 3; // dst vertex label +} + +// edge mapping +message EdgeMapping { + TypeTriplet type_triplet = 1; + repeated string inputs = 2; + repeated bytes raw_data = 3; + repeated ColumnMapping source_vertex_mappings = 4; + repeated ColumnMapping destination_vertex_mappings = 5; + repeated ColumnMapping column_mappings = 6; +} + +// data source config +message DataSource { + string scheme = 1; // file, oss, s3, hdfs; only file is supported now + string location = 2; // unified prefix +} + +// format +message DataSourceFormat { + string type = 1; // csv + map metadata = 2; +} + +// loading config +message LoadingConfig { + DataSource data_source = 1; + string import_option = 2; // init, append, overwrite, only init is supported now + DataSourceFormat format = 3; +} + +// schema mapping +message SchemaMapping { + string graph = 1; + LoadingConfig loading_config = 2; + repeated VertexMapping vertex_mappings = 3; + repeated EdgeMapping edge_mappings = 4; +} + +// job status +message JobStatus { + string jobid = 1; + string status = 2; + string start_time = 3; + string end_time = 4; + map detail = 5; + string message = 6; +} + +// procedure runtime params, used both in property and return type +message ProcedureRuntimeParam { + string name = 1; + string type = 2; +} + +// procedure +message Procedure { + string name = 1; + string bound_graph = 2; + string description = 3; + optional string type = 4; // [ cpp, cypher ] + string query = 5; // content + bool enable = 6; + + // runtime info, assign by Interactive server side. + repeated ProcedureRuntimeParam params = 7; // parameters in call + repeated ProcedureRuntimeParam rlts_meta = 8; +} + +// message +message ApiResponse { + Code code = 1; + optional string error_msg = 2; +} + +message CreateInteractiveGraphRequest { + GraphProto graph_def = 1; +} + +message RemoveInteractiveGraphRequest { + string graph_name = 1; +} + +message ListInteractiveGraphRequest {} + +message ListInteractiveGraphResponse { + Code code = 1; + optional string error_msg = 2; + + repeated GraphProto graphs = 3; +} + +message ImportInteractiveGraphRequest { + SchemaMapping schema_mapping = 1; +} + +message ListInteractiveJobRequest {} + +message ListInteractiveJobResponse { + Code code = 1; + optional string error_msg = 2; + + repeated JobStatus job_status = 3; +} + +message CreateInteractiveProcedureRequest { + Procedure procedure_def = 1; +} + +message ListInteractiveProcedureRequest { + string graph_name = 1; +} + +message ListInteractiveProcedureResponse { + Code code = 1; + optional string error_msg = 2; + + repeated Procedure procedures = 3; +} + +message UpdateInteractiveProcedureRequest { + repeated Procedure procedures = 1; +} + +message RemoveInteractiveProcedureRequest { + string graph_name = 1; + string procedure_name = 2; +} diff --git a/python/graphscope/gsctl/rpc.py b/python/graphscope/gsctl/client/rpc.py similarity index 53% rename from python/graphscope/gsctl/rpc.py rename to python/graphscope/gsctl/client/rpc.py index f77d0fe58bae..5bdeb1c65688 100644 --- a/python/graphscope/gsctl/rpc.py +++ b/python/graphscope/gsctl/client/rpc.py @@ -18,14 +18,16 @@ import atexit import time +from typing import List import click import grpc - from graphscope.client.utils import GS_GRPC_MAX_MESSAGE_LENGTH from graphscope.client.utils import handle_grpc_error +from graphscope.gsctl.config import GS_CONFIG_DEFAULT_LOCATION from graphscope.gsctl.config import get_current_context from graphscope.proto import coordinator_service_pb2_grpc +from graphscope.proto import interactive_pb2 from graphscope.proto import message_pb2 from graphscope.version import __version__ @@ -73,6 +75,59 @@ def close(self): except: # noqa: E722 pass + @handle_grpc_error + def create_interactive_graph(self, graph_def: interactive_pb2.GraphProto): + request = interactive_pb2.CreateInteractiveGraphRequest(graph_def=graph_def) + return self._stub.CreateInteractiveGraph(request) + + @handle_grpc_error + def remove_interactive_graph(self, graph: str): + request = interactive_pb2.RemoveInteractiveGraphRequest(graph_name=graph) + return self._stub.RemoveInteractiveGraph(request) + + @handle_grpc_error + def list_interactive_graph(self): + request = interactive_pb2.ListInteractiveGraphRequest() + return self._stub.ListInteractiveGraph(request) + + @handle_grpc_error + def import_interactive_graph(self, schema_mapping: interactive_pb2.SchemaMapping): + request = interactive_pb2.ImportInteractiveGraphRequest( + schema_mapping=schema_mapping + ) + return self._stub.ImportInteractiveGraph(request) + + @handle_grpc_error + def list_interactive_job(self): + request = interactive_pb2.ListInteractiveJobRequest() + return self._stub.ListInteractiveJob(request) + + @handle_grpc_error + def create_interactive_procedure(self, procedure: interactive_pb2.Procedure): + request = interactive_pb2.CreateInteractiveProcedureRequest( + procedure_def=procedure + ) + return self._stub.CreateInteractiveProcedure(request) + + @handle_grpc_error + def list_interactive_procedure(self, graph: str): + request = interactive_pb2.ListInteractiveProcedureRequest(graph_name=graph) + return self._stub.ListInteractiveProcedure(request) + + @handle_grpc_error + def update_interactive_procedure(self, procedures: List[interactive_pb2.Procedure]): + request = interactive_pb2.UpdateInteractiveProcedureRequest( + procedures=procedures + ) + return self._stub.UpdateInteractiveProcedure(request) + + @handle_grpc_error + def remove_interactive_procedure(self, graph: str, procedure: str): + request = interactive_pb2.RemoveInteractiveProcedureRequest( + graph_name=graph, procedure_name=procedure + ) + return self._stub.RemoveInteractiveProcedure(request) + def get_grpc_client(coordinator_endpoint=None): if coordinator_endpoint is not None: @@ -82,6 +137,8 @@ def get_grpc_client(coordinator_endpoint=None): current_context = get_current_context() if current_context is None: raise RuntimeError( - "No available context found, please connect to a launched coordinator first." + "No available context found in {0}, please connect to a launched coordinator first.".format( + GS_CONFIG_DEFAULT_LOCATION + ) ) return GRPCClient(current_context.coordinator_endpoint) diff --git a/python/graphscope/gsctl/commands/__init__.py b/python/graphscope/gsctl/commands/__init__.py index 29ab31f2d6ab..8bdcbed90f5f 100644 --- a/python/graphscope/gsctl/commands/__init__.py +++ b/python/graphscope/gsctl/commands/__init__.py @@ -16,4 +16,27 @@ # limitations under the License. # -from graphscope.gsctl.commands.utils import * +import click + +from graphscope.gsctl.commands.common import cli as common_cli +from graphscope.gsctl.commands.dev import cli as dev_cli +from graphscope.gsctl.commands.interactive import cli as interactive_cli +from graphscope.gsctl.config import Context +from graphscope.gsctl.config import FLEX_INTERACTIVE + + +def get_command_collection(context: Context): + if context is None: + # treat gsctl as an utility script, providing hepler functions or utilities. e.g. + # initialize and manage cluster, install the dependencies required to build graphscope locally + commands = click.CommandCollection(sources=[common_cli, dev_cli]) + + elif context.solution == FLEX_INTERACTIVE: + commands = click.CommandCollection(sources=[common_cli, interactive_cli]) + + else: + raise RuntimeError( + f"Failed to get command collection with context {context.name}" + ) + + return commands diff --git a/python/graphscope/gsctl/commands/common_command.py b/python/graphscope/gsctl/commands/common.py similarity index 94% rename from python/graphscope/gsctl/commands/common_command.py rename to python/graphscope/gsctl/commands/common.py index ff3a02a4a3e2..c2028e90239f 100644 --- a/python/graphscope/gsctl/commands/common_command.py +++ b/python/graphscope/gsctl/commands/common.py @@ -22,7 +22,7 @@ from graphscope.gsctl.config import Context from graphscope.gsctl.config import load_gs_config -from graphscope.gsctl.rpc import get_grpc_client +from graphscope.gsctl.client.rpc import get_grpc_client @click.group() @@ -31,7 +31,7 @@ def cli(): pass -@click.command() +@cli.command() @click.option( "--coordinator-endpoint", help="Coordinator endpoint which gsctl connect to, e.g. http://127.0.0.1:9527", @@ -56,7 +56,7 @@ def connect(coordinator_endpoint): click.secho("Coordinator service connected.", fg="green") -@click.command() +@cli.command() def close(): """Close the connection from the coordinator.""" config = load_gs_config() @@ -69,9 +69,5 @@ def close(): click.secho(f"Disconnect from the {current_context.to_dict()}.", fg="green") -cli.add_command(connect) -cli.add_command(close) - - if __name__ == "__main__": cli() diff --git a/python/graphscope/gsctl/commands/dev_command.py b/python/graphscope/gsctl/commands/dev.py similarity index 100% rename from python/graphscope/gsctl/commands/dev_command.py rename to python/graphscope/gsctl/commands/dev.py diff --git a/python/graphscope/gsctl/commands/interactive.py b/python/graphscope/gsctl/commands/interactive.py new file mode 100644 index 000000000000..d4c5f510b646 --- /dev/null +++ b/python/graphscope/gsctl/commands/interactive.py @@ -0,0 +1,532 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Group of interactive commands under the FLEX architecture""" + +import itertools +import json + +import click +from graphscope.gsctl.client.rpc import get_grpc_client +from graphscope.gsctl.utils import dict_to_proto_message +from graphscope.gsctl.utils import is_valid_file_path +from graphscope.gsctl.utils import read_yaml_file +from graphscope.gsctl.utils import terminal_display +from graphscope.proto import error_codes_pb2 +from graphscope.proto import interactive_pb2 + + +@click.group() +def cli(): + pass + + +@cli.group() +def database(): + """Group of operations over graph and dataloading.""" + pass + + +@cli.group() +def get(): + """Display one or many resources.""" + pass + + +@cli.group() +def procedure(): + """Group of operations over procedure.""" + pass + + +@database.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +@click.option( + "-c", + "--config", + required=True, + help="Yaml path or json string of schema for the graph.", +) +def create(graph, config): + """Create a graph in database, with the provided schema file.""" + + graph_def_dict = config + if is_valid_file_path(config): + graph_def_dict = read_yaml_file(config) + + # override graph name + if graph is not None: + graph_def_dict["name"] = graph + + # transform graph dict to proto message + graph_def = interactive_pb2.GraphProto() + dict_to_proto_message(graph_def_dict, graph_def) + + grpc_client = get_grpc_client() + response = grpc_client.create_interactive_graph(graph_def) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho( + "Create interactive graph {0} successfully.".format(graph_def_dict["name"]), + fg="green", + ) + + +@database.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +def remove(graph): + """Remove a graph, as well as the loaded data in database.""" + + grpc_client = get_grpc_client() + response = grpc_client.remove_interactive_graph(graph) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho( + "Remove interactive graph {0} successfully.".format(graph), + fg="green", + ) + + +@database.command(name="import") +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +@click.option( + "-c", + "--config", + required=True, + help="Yaml path or raw data for loading graph.", +) +def data_import(graph, config): + """Load the raw data specified in bulk load file.""" + + def _read_and_fill_raw_data(config): + for mapping in itertools.chain( + config["vertex_mappings"], config["edge_mappings"] + ): + for index, location in enumerate(mapping["inputs"]): + # location is one of: + # 1) protocol:///path/to/the/file + # 2) @/path/to/the/file, which represents the local file + if location.startswith("@"): + if "raw_data" not in mapping: + mapping["raw_data"] = [] + + # read file and set raw data + with open(location[1:], "rb") as f: + content = f.read() + mapping["raw_data"].append(content) + + schema_mapping_dict = config + if is_valid_file_path(config): + schema_mapping_dict = read_yaml_file(config) + + if graph is not None: + schema_mapping_dict["graph"] = graph + + _read_and_fill_raw_data(schema_mapping_dict) + + # transfiorm dict to proto message + schema_mapping = interactive_pb2.SchemaMapping() + dict_to_proto_message(schema_mapping_dict, schema_mapping) + + grpc_client = get_grpc_client() + response = grpc_client.import_interactive_graph(schema_mapping) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho("Create dataloading job successfully.", fg="green") + + +@procedure.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +@click.option( + "-n", + "--name", + required=True, + help="The name of the procedure.", +) +@click.option( + "-i", + "--sourcefile", + required=True, + help="Path of [ .cc, .cypher ] file.", +) +@click.option( + "-d", + "--description", + required=False, + help="Description for the specific procedure.", +) +def create(graph, name, sourcefile, description): + """Create and compile procedure over a specific graph.""" + + # read source + with open(sourcefile, "r") as f: + query = f.read() + + # construct procedure proto + procedure = interactive_pb2.Procedure( + name=name, bound_graph=graph, description=description, query=query, enable=True + ) + + grpc_client = get_grpc_client() + response = grpc_client.create_interactive_procedure(procedure) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho("Create procedure {0} successfully.".format(name), fg="green") + + +@procedure.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +@click.option( + "-n", + "--name", + required=True, + help="List of procedure's name to enable, seprated by comma.", +) +def enable(graph, name): + """Enable stored procedures in a given graph.""" + + # remove the last "," if exists + if name.endswith(","): + name = name[:-1] + procedure_list = name.split(",") + + # list current procedures + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_procedure(graph) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + + # enable procedures locally + procedures = response.procedures + for p in procedures: + if p.name in procedure_list: + p.enable = True + procedure_list.remove(p.name) + + # check + if procedure_list: + click.secho("Procedure {0} not found.".format(str(procedure_list)), fg="red") + return + + # update procedures + response = grpc_client.update_interactive_procedure(procedures) + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho("Update procedures successfully.", fg="green") + + +@procedure.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +@click.option( + "-n", + "--name", + required=True, + help="List of procedure's name to enable, seprated by comma.", +) +def disable(graph, name): + """Disable stored procedures in a given graph.""" + + # remove the last "," if exists + if name.endswith(","): + name = name[:-1] + procedure_list = name.split(",") + + # list current procedures + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_procedure(graph) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + + # disable procedures locally + procedures = response.procedures + for p in procedures: + if p.name in procedure_list: + p.enable = False + procedure_list.remove(p.name) + + # check + if procedure_list: + click.secho("Procedure {0} not found.".format(str(procedure_list)), fg="red") + return + + # update procedures + response = grpc_client.update_interactive_procedure(procedures) + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho("Update procedures successfully.", fg="green") + + +@procedure.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +@click.option( + "-n", + "--name", + required=True, + help="The name of the procedure to be deleted.", +) +def remove(graph, name): + """Remove a procedure over a specific graph in database.""" + + grpc_client = get_grpc_client() + response = grpc_client.remove_interactive_procedure(graph, name) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho( + "Remove procedure {0} on graph {1} successfully.".format(name, graph), + fg="green", + ) + + +@get.command() +def graph(): + """List all the graphs in database.""" + + def _construct_and_display_data(graphs): + if not graphs: + click.secho("no graph found.", fg="blue") + return + + head = ["", "Name", "Store Type", "Vertex Label Size", "Edge Label Size"] + data = [head] + + for g in graphs: + # vertex_schema = [] + # for v in g.schema.vertex_types: + # vertex_schema.append(v.type_name) + # edge_schema = [] + # for e in g.schema.edge_types: + # edge_schema.append(e.type_name) + data.append( + [ + "*", # service graph + g.name, + g.store_type, + str(len(g.schema.vertex_types)), + str(len(g.schema.edge_types)), + # "({0}) {1}".format(len(vertex_schema), ",".join(vertex_schema)), + # "({0}) {1}".format(len(edge_schema), ",".join(edge_schema)), + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_graph() + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + _construct_and_display_data(response.graphs) + return response.graphs + + +@get.command() +def job(): + """List all the jobs in database.""" + + def _construct_and_display_data(job_status): + if not job_status: + click.secho("no job found.", fg="blue") + return + + head = ["JobID", "Status", "Start Time", "End Time", "Detail", "Message"] + data = [head] + + for s in job_status: + detail = {} + for k, v in s.detail.items(): + detail[k] = v + + message = s.message.replace("\n", "") + if len(message) > 63: + message = message[:63] + "..." + + data.append( + [ + s.jobid, + s.status, + s.start_time, + s.end_time, + json.dumps(detail), + message, + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_job() + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + _construct_and_display_data(response.job_status) + return response.job_status + + +@get.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +def procedure(graph): + """List all the procedures in database.""" + + def _construct_and_display_data(procedures): + if not procedures: + click.secho("no procedure found.", fg="blue") + return + + head = ["Name", "Type", "Enable", "Description"] + data = [head] + + for procedure in procedures: + data.append( + [ + procedure.name, + procedure.type, + str(procedure.enable), + procedure.description, + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_procedure(graph) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + _construct_and_display_data(response.procedures) + return response.procedures + + +if __name__ == "__main__": + cli() diff --git a/python/graphscope/gsctl/commands/utils.py b/python/graphscope/gsctl/commands/utils.py deleted file mode 100644 index 711a7b32e7f5..000000000000 --- a/python/graphscope/gsctl/commands/utils.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import click - -from graphscope.gsctl.commands.common_command import cli as common_cli -from graphscope.gsctl.commands.dev_command import cli as dev_cli -from graphscope.gsctl.config import Context - - -def get_command_collection(context: Context): - if context is None: - # treat gsctl as an utility script, providing hepler functions or utilities. e.g. - # initialize and manage cluster, install the dependencies required to build graphscope locally - commands = click.CommandCollection(sources=[common_cli, dev_cli]) - - elif context.solution == "interactive": - commands = click.CommandCollection(sources=[common_cli]) - - else: - raise RuntimeError( - f"Failed to get command collection with context {context.name}" - ) - - return commands diff --git a/python/graphscope/gsctl/config.py b/python/graphscope/gsctl/config.py index e9357c0b4919..2e286f0dcdc0 100644 --- a/python/graphscope/gsctl/config.py +++ b/python/graphscope/gsctl/config.py @@ -22,16 +22,19 @@ import random from string import ascii_letters -import yaml +from graphscope.gsctl.utils import read_yaml_file +from graphscope.gsctl.utils import write_yaml_file GS_CONFIG_DEFAULT_LOCATION = os.environ.get( "GSCONFIG", os.path.expanduser("~/.graphscope/config") ) +FLEX_INTERACTIVE = "Interactive" + class Context(object): def __init__(self, solution, coordinator_endpoint, name=None): - self.supported_solutions = ["interactive"] + self.supported_solutions = [FLEX_INTERACTIVE] if solution not in self.supported_solutions: raise RuntimeError( "The solution {0} in context {1} is not supported yet.".format( @@ -83,10 +86,10 @@ def set_and_write(self, context: Context): # write contexts = [v.to_dict() for _, v in self._contexts.items()] - with open(GS_CONFIG_DEFAULT_LOCATION, "w") as file: - yaml.dump( - {"contexts": contexts, "current-context": self._current_context}, file - ) + write_yaml_file( + {"contexts": contexts, "current-context": self._current_context}, + GS_CONFIG_DEFAULT_LOCATION, + ) def remove_and_write(self, current_context: Context): # remove @@ -95,10 +98,10 @@ def remove_and_write(self, current_context: Context): # write contexts = [v.to_dict() for _, v in self._contexts.items()] - with open(GS_CONFIG_DEFAULT_LOCATION, "w") as file: - yaml.dump( - {"contexts": contexts, "current-context": self._current_context}, file - ) + write_yaml_file( + {"contexts": contexts, "current-context": self._current_context}, + GS_CONFIG_DEFAULT_LOCATION, + ) class GSConfigLoader(object): @@ -132,9 +135,7 @@ def _parse_config(self, config_dict): return contexts, current_context def load_config(self): - config_dict = None - with open(self._config_file, "r") as file: - config_dict = yaml.safe_load(file) + config_dict = read_yaml_file(self._config_file) contexts, current_context = self._parse_config(config_dict) return GSConfig(contexts, current_context) @@ -149,8 +150,7 @@ def load_gs_config(): if not os.path.exists(config_file): workdir = os.path.dirname(config_file) os.makedirs(workdir, exist_ok=True) - with open(config_file, "w") as file: - yaml.safe_dump({}, file) + write_yaml_file({}, config_file) loader = GSConfigLoader(config_file) return loader.load_config() diff --git a/python/graphscope/gsctl/utils.py b/python/graphscope/gsctl/utils.py new file mode 100644 index 000000000000..e0861acda132 --- /dev/null +++ b/python/graphscope/gsctl/utils.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +from typing import List + +import yaml + + +def read_yaml_file(path): + """Reads YAML file and returns as a python object.""" + with open(path, "r") as file: + return yaml.safe_load(file) + + +def write_yaml_file(data, path): + """Writes python object to the YAML file.""" + with open(path, "w") as file: + yaml.dump(data, file) + + +def is_valid_file_path(path): + """Check if the path exists and corresponds to a regular file.""" + return os.path.exists(path) and os.path.isfile(path) + + +def dict_to_proto_message(values, message): + """Transform pyhon dict object to protobuf message + + Args: + values (dict): values to be transformed. + message (proto): protobuf message, such as graph_def_pb2.GraphMessage() + """ + + def _parse_list(values, message): + if isinstance(values[0], dict): # value needs to be further parsed + for v in values: + cmd = message.add() + _parse_dict(v, cmd) + else: # value can be set + message.extend(values) + + def _parse_dict(values, message): + for k, v in values.items(): + if isinstance(v, dict): # value needs to be further parsed + _parse_dict(v, getattr(message, k)) + elif isinstance(v, list): + _parse_list(v, getattr(message, k)) + else: # value can be set + try: + setattr(message, k, v) + except AttributeError: + # treat as map + message[k] = v + + _parse_dict(values, message) + return message + + +def terminal_display(data: List[list]): + """Display tablular data in terminal""" + + # Compute the maximum width for each column + column_widths = [max(len(str(item)) for item in column) for column in zip(*data)] + + # Display the data with aligned columns + for row in data: + print( + " ".join( + "{:<{}}".format(item, width) for item, width in zip(row, column_widths) + ) + )