Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Dec 21, 2023
1 parent a20a3a3 commit c561ae0
Show file tree
Hide file tree
Showing 18 changed files with 278 additions and 1,193 deletions.
5 changes: 2 additions & 3 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from graphscope.proto import coordinator_service_pb2_grpc

from gscoordinator.monitor import Monitor
from gscoordinator.servicer import init_interactive_service_servicer
from gscoordinator.servicer import init_flex_service_servicer
from gscoordinator.servicer import init_graphscope_one_service_servicer
from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH

Expand Down Expand Up @@ -111,10 +111,9 @@ def get_servicer(config: Config):
"""Get servicer of specified solution under FLEX architecture"""
service_initializers = {
"GraphScope One": init_graphscope_one_service_servicer,
"Interactive": init_interactive_service_servicer,
}

initializer = service_initializers.get(config.solution)
initializer = service_initializers.get(config.solution, init_flex_service_servicer)
if initializer is None:
raise RuntimeError(
f"Expect {service_initializers.keys()} of solution parameter"
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/servicer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# limitations under the License.
#

from gscoordinator.servicer.interactive.service import *
from gscoordinator.servicer.flex.service import *
from gscoordinator.servicer.graphscope_one.service import *
45 changes: 0 additions & 45 deletions coordinator/gscoordinator/servicer/base_service.py

This file was deleted.

17 changes: 17 additions & 0 deletions coordinator/gscoordinator/servicer/flex/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

try:
sys.path.insert(0, os.path.dirname(__file__))
import interactive_client
import hiactor_client
except ImportError:
raise

from gscoordinator.servicer.flex.interactive.hiactor import *
57 changes: 57 additions & 0 deletions coordinator/gscoordinator/servicer/flex/interactive/hiactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import logging

import hiactor_client

from graphscope.config import Config

__all__ = ["init_interactive_service"]

logger = logging.getLogger("graphscope")


class Hiactor(object):
"""Hiactor module used to interact with hiactor engine"""

def __init__(self, config: Config):
self._config = config
# hiactor admin service host
self._hiactor_host = self._get_hiactor_service_endpoints()
logger.info("Connect to hiactor service at %s", self._hiactor_host)

def _get_hiactor_service_endpoints(self):
if self._config.launcher_type == "hosts":
# TODO change to 127.0.0.1
endpoint = "http://192.168.0.9:{0}".format(
os.environ.get("HIACTOR_ADMIN_SERVICE_PORT", 7777)
)
return endpoint

def list_graph(self):
with hiactor_client.ApiClient(
hiactor_client.Configuration(self._hiactor_host)
) as api_client:
api_instance = hiactor_client.GraphApi(api_client)
return api_instance.list_graphs()


def init_interactive_service(config: Config):
return Hiactor(config)
104 changes: 104 additions & 0 deletions coordinator/gscoordinator/servicer/flex/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Service under FLEX Architecture"""

import atexit

# import itertools
import logging
import os
import threading

# from google.protobuf.json_format import MessageToDict
from graphscope.config import Config
from graphscope.gsctl.utils import dict_to_proto_message
from graphscope.proto import coordinator_service_pb2_grpc
from graphscope.proto import error_codes_pb2
from graphscope.proto import flex_pb2
from graphscope.proto import message_pb2

# from gscoordinator.scheduler import schedule
# from gscoordinator.utils import WORKSPACE
# from gscoordinator.utils import delegate_command_to_pod
# from gscoordinator.utils import run_kube_cp_command

from gscoordinator.servicer.flex.interactive import *

__all__ = ["FlexServiceServicer", "init_flex_service_servicer"]

logger = logging.getLogger("graphscope")


class FlexServiceServicer(coordinator_service_pb2_grpc.CoordinatorServiceServicer):
"""Service under flex architecture."""

services_initializer = {
"interactive": init_interactive_service,
}

def __init__(self, config: Config):
self._config = config
# We use the solution encompasses the various applications and use cases of
# the product across different industries and business scenarios,
# e.g. interactive, analytical
self._solution = self._config.solution.lower()

atexit.register(self.cleanup)

# lock to protect the service
self._lock = threading.RLock()

# initialize specific service client
self._service_client = self._initialize_service_client()

def __del__(self):
self.cleanup()

def _initialize_service_client(self):
initializer = self.services_initializer.get(self._solution)
if initializer is None:
raise RuntimeError("Failed to launch {0} service".format(self._solution))
return initializer(self._config)

def cleanup(self):
pass

def Connect(self, request, context):
return message_pb2.ConnectResponse(solution=self._solution)

def ListGraph(self, request, context):
try:
graphs = self._service_client.list_graph()
except Exception as e:
logger.warning("Failed to list graph: %s", str(e))
return flex_pb2.ListGraphResponse(
code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e)
)
else:
return flex_pb2.ListGraphResponse(
code=error_codes_pb2.OK,
graphs=[
dict_to_proto_message(g.to_dict(), flex_pb2.GraphProto())
for g in graphs
],
)


def init_flex_service_servicer(config: Config):
return FlexServiceServicer(config)
Loading

0 comments on commit c561ae0

Please sign in to comment.