Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(coordinator): Add cypher endpoint info when list_service_status for groot #4382

Merged
merged 8 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions charts/graphscope-store/templates/portal/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ spec:
value: {{ .Values.frontend.service.servicePort | quote }}
- name: GROOT_GREMLIN_PORT
value: {{ .Values.frontend.service.gremlinPort | quote }}
- name: GROOT_CYPHER_PORT
value: {{ .Values.frontend.service.cypherPort | quote }}
- name: INSTANCE_NAME
value: {{ .Release.Name | quote }}
- name: NAMESPACE
Expand Down
2 changes: 2 additions & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ frontend:

gremlinPort: 12312

cypherPort: 7687

## Internal port for communication between components.
##
port: 55555
Expand Down
1 change: 1 addition & 0 deletions coordinator/gscoordinator/flex/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def str_to_bool(s):
# groot
GROOT_GRPC_PORT = os.environ.get("GROOT_GRPC_PORT", 55556)
GROOT_GREMLIN_PORT = os.environ.get("GROOT_GREMLIN_PORT", 12312)
GROOT_CYPHER_PORT = os.environ.get("GROOT_CYPHER_PORT", 7687)
GROOT_USERNAME = os.environ.get("GROOT_USERNAME", "")
GROOT_PASSWORD = os.environ.get("GROOT_PASSWORD", "")
try:
Expand Down
27 changes: 20 additions & 7 deletions coordinator/gscoordinator/flex/core/insight/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@

from gscoordinator.flex.core.config import CLUSTER_TYPE
from gscoordinator.flex.core.config import CREATION_TIME
from gscoordinator.flex.core.config import GROOT_CYPHER_PORT
from gscoordinator.flex.core.config import GROOT_GREMLIN_PORT
from gscoordinator.flex.core.config import GROOT_GRPC_PORT
from gscoordinator.flex.core.config import GROOT_PASSWORD
from gscoordinator.flex.core.config import GROOT_USERNAME
from gscoordinator.flex.core.config import INSTANCE_NAME
from gscoordinator.flex.core.config import NAMESPACE
from gscoordinator.flex.core.insight.utils import test_cypher_endpoint
from gscoordinator.flex.core.scheduler import schedule
from gscoordinator.flex.core.utils import data_type_to_groot
from gscoordinator.flex.core.utils import get_internal_ip
Expand All @@ -46,7 +48,7 @@
class GrootGraph(object):
"""Graph class for GraphScope store"""

def __init__(self, name, creation_time, gremlin_endpoint, grpc_endpoint):
def __init__(self, name, creation_time, gremlin_endpoint, grpc_endpoint, cypher_endpoint = None):
self._id = "1"
self._name = name

Expand All @@ -60,12 +62,14 @@ def __init__(self, name, creation_time, gremlin_endpoint, grpc_endpoint):
)
self._g = self._conn.g()
self._schema = self._g.schema().to_dict()
self._gremlin_interface = {
self._endpoints = {
"gremlin_endpoint": gremlin_endpoint,
"grpc_endpoint": grpc_endpoint,
"username": GROOT_USERNAME,
"password": GROOT_PASSWORD,
}
self._endpoints["cypher_endpoint"] = cypher_endpoint

# kubernetes
if CLUSTER_TYPE == "KUBERNETES":
self._api_client = resolve_api_client()
Expand Down Expand Up @@ -97,22 +101,27 @@ def _fetch_endpoints_impl(self):
grpc_endpoint, gremlin_endpoint, GROOT_USERNAME, GROOT_PASSWORD
)
g = conn.g()
cypher_endpoint = test_cypher_endpoint(pod.status.pod_ip, GROOT_CYPHER_PORT)

except Exception as e:
logger.warn(f"Failed to fetch frontend endpoints: {str(e)}")
else:
if (
gremlin_endpoint != self._gremlin_interface["gremlin_endpoint"]
or grpc_endpoint != self._gremlin_interface["grpc_endpoint"]
gremlin_endpoint != self._endpoints["gremlin_endpoint"]
or grpc_endpoint != self._endpoints["grpc_endpoint"]
or cypher_endpoint != self._endpoints.get("cypher_endpoint")
):
self._conn = conn
self._g = g
self._schema = self._g.schema().to_dict()
self._gremlin_interface = {
self._endpoints = {
"gremlin_endpoint": gremlin_endpoint,
"grpc_endpoint": grpc_endpoint,
"username": GROOT_USERNAME,
"password": GROOT_PASSWORD,
}
if cypher_endpoint:
self._endpoints["cypher_endpoint"] = cypher_endpoint
logger.info(f"Update frontend endpoints: {str(endpoints)}")

def __del__(self):
Expand All @@ -131,8 +140,8 @@ def name(self):
return self._name

@property
def gremlin_interface(self):
return self._gremlin_interface
def groot_endpoints(self):
return self._endpoints

@property
def schema(self):
Expand Down Expand Up @@ -284,12 +293,16 @@ def get_groot_graph_from_local():
client.close()
break
time.sleep(5)
# test whether cypher endpoint is ready
cypher_endpoint = test_cypher_endpoint(host, GROOT_CYPHER_PORT)

# groot graph
return GrootGraph(
name=INSTANCE_NAME,
creation_time=CREATION_TIME,
gremlin_endpoint=gremlin_endpoint,
grpc_endpoint=grpc_endpoint,
cypher_endpoint=cypher_endpoint,
)


Expand Down
19 changes: 11 additions & 8 deletions coordinator/gscoordinator/flex/core/insight/groot.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,21 @@ def check_graph_exists(self, graph_id: str):
raise RuntimeError(f"Graph {graph_id} not exist.")

def list_service_status(self) -> List[dict]:
gremlin_interface = self._graph.gremlin_interface
return [
groot_endpoints = self._graph.groot_endpoints
res = [
{
"graph_id": self._graph.id,
"status": "Running",
"start_time": CREATION_TIME,
"sdk_endpoints": {
"gremlin": gremlin_interface["gremlin_endpoint"],
"grpc": gremlin_interface["grpc_endpoint"],
"gremlin": groot_endpoints["gremlin_endpoint"],
"grpc": groot_endpoints["grpc_endpoint"],
},
}
]
if "cypher_endpoint" in groot_endpoints and groot_endpoints["cypher_endpoint"]:
res[0]["sdk_endpoints"]["cypher"] = groot_endpoints["cypher_endpoint"]
return res

def create_graph(self, graph: dict) -> dict:
raise RuntimeError("Create graph is not supported yet.")
Expand Down Expand Up @@ -284,12 +287,12 @@ def get_storage_usage(self) -> dict:

def gremlin_service_available(self) -> bool:
try:
gremlin_interface = self._graph.gremlin_interface
groot_endpoints = self._graph.groot_endpoints
client = Client(
gremlin_interface["gremlin_endpoint"],
groot_endpoints["gremlin_endpoint"],
"g",
username=gremlin_interface["username"],
password=gremlin_interface["password"],
username=groot_endpoints["username"],
password=groot_endpoints["password"],
)
client.submit(
"g.with('evaluationTimeout', 5000).V().limit(1)"
Expand Down
40 changes: 36 additions & 4 deletions coordinator/gscoordinator/flex/core/insight/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@

import hashlib
import json
import logging

from urllib3.exceptions import ProtocolError

from gscoordinator.flex.core.config import BASEID
from gscoordinator.version import __version__

logger = logging.getLogger("graphscope")


def convert_to_configini(graph, ds_manager, config):
# for bulk loader to connect to groot
gremlin_interface = graph.gremlin_interface
groot_endpoints = graph.groot_endpoints
# column mapping config
column_mapping_config = {}
# project
Expand Down Expand Up @@ -112,12 +117,12 @@ def convert_to_configini(graph, ds_manager, config):
# custom_config
custom_config = {
"separatr": "\\\\|", # fixed
"graphEndpoint": gremlin_interface["grpc_endpoint"],
"graphEndpoint": groot_endpoints["grpc_endpoint"],
"project": project,
"outputTable": output_table,
"columnMappingConfig": json.dumps(column_mapping_config),
"authUsername": gremlin_interface["username"],
"authPassword": gremlin_interface["password"],
"authUsername": groot_endpoints["username"],
"authPassword": groot_endpoints["password"],
"dataSinkType": "volume",
"odpsVolumeProject": project,
# "-" is not allowed
Expand All @@ -136,3 +141,30 @@ def convert_to_configini(graph, ds_manager, config):
"customConfig": custom_config,
}
return configini

def test_cypher_endpoint(host : str, port : int):
"""
Test if the cypher endpoint is available, if not return None, otherwise return the cypher endpoint
Note that we send http request to check if the cypher endpoint is available, not submitting a cypher query,
the reason is that the cypher query may raise exceptions in case of other errors.
"""
cypher_endpoint = f"neo4j://{host}:{port}"
try:
import requests
response = requests.get(f"http://{host}:{port}")
response.raise_for_status()
except (requests.exceptions.ConnectionError) as e:
if (e.args != None and len(e.args) > 0):
# Sending http request to cypher endpoint should fail with ProtocolError
if isinstance(e.args[0], ProtocolError):
logger.debug("Cypher endpoint is available: {cypher_endpoint}")
else:
cypher_endpoint = None
logger.debug(f"Cypher endpoint is not available: {str(e)}")
except Exception as e:
logger.debug(f"Cypher endpoint is not available: {str(e)}")
cypher_endpoint = None
return cypher_endpoint
else:
logger.error("Should not reach here")
return cypher_endpoint
Loading