Skip to content

Commit

Permalink
extract function test_cypher_endpoint
Browse files Browse the repository at this point in the history
Committed-by: xiaolei.zl from Dev container
  • Loading branch information
zhanglei1949 committed Dec 26, 2024
1 parent 741b359 commit e30a053
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 32 deletions.
35 changes: 4 additions & 31 deletions coordinator/gscoordinator/flex/core/insight/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
from gremlin_python.driver.client import Client
from kubernetes import client as kube_client
from kubernetes import config as kube_config
from neo4j import GraphDatabase
from neo4j import Session as Neo4jSession

from gscoordinator.flex.core.config import CLUSTER_TYPE
from gscoordinator.flex.core.config import CREATION_TIME
Expand All @@ -37,6 +35,7 @@
from gscoordinator.flex.core.config import GROOT_USERNAME
from gscoordinator.flex.core.config import INSTANCE_NAME
from gscoordinator.flex.core.config import NAMESPACE
from gscoordinator.flex.core.insight.utils import test_cypher_endpoint
from gscoordinator.flex.core.scheduler import schedule
from gscoordinator.flex.core.utils import data_type_to_groot
from gscoordinator.flex.core.utils import get_internal_ip
Expand Down Expand Up @@ -69,8 +68,7 @@ def __init__(self, name, creation_time, gremlin_endpoint, grpc_endpoint, cypher_
"username": GROOT_USERNAME,
"password": GROOT_PASSWORD,
}
if cypher_endpoint:
self._endpoints["cypher_endpoint"] = cypher_endpoint
self._endpoints["cypher_endpoint"] = cypher_endpoint

# kubernetes
if CLUSTER_TYPE == "KUBERNETES":
Expand Down Expand Up @@ -103,21 +101,7 @@ def _fetch_endpoints_impl(self):
grpc_endpoint, gremlin_endpoint, GROOT_USERNAME, GROOT_PASSWORD
)
g = conn.g()

cypher_raw_endpoint = "{0}:{1}".format(pod.status.pod_ip, GROOT_CYPHER_PORT)
cypher_endpoint = "neo4j://{0}".format(cypher_raw_endpoint)
try:
driver = GraphDatabase.driver(cypher_endpoint, auth=None)
sess = driver.session()
sess.run("MATCH (n) RETURN n LIMIT 1")
except Exception as e:
logger.warn(f"Cypher endpoint is not available: {str(e)}")
cypher_endpoint = None
else:
endpoints.append(cypher_raw_endpoint)
logger.info(f"Cypher endpoint is available: {cypher_endpoint}")
sess.close()
driver.close()
cypher_endpoint = test_cypher_endpoint(pod.status.pod_ip, GROOT_CYPHER_PORT)

except Exception as e:
logger.warn(f"Failed to fetch frontend endpoints: {str(e)}")
Expand Down Expand Up @@ -310,18 +294,7 @@ def get_groot_graph_from_local():
break
time.sleep(5)
# test whether cypher endpoint is ready
try:
cypher_endpoint = f"neo4j://{host}:${GROOT_CYPHER_PORT}"
driver = GraphDatabase.driver(cypher_endpoint, auth=None)
sess = driver.session()
sess.run("MATCH (n) RETURN n LIMIT 1")
except Exception as e:
logger.warn(f"Cypher endpoint is not available: {str(e)}")
cypher_endpoint = None
else:
logger.info(f"Cypher endpoint is available: {cypher_endpoint}")
sess.close()
driver.close()
cypher_endpoint = test_cypher_endpoint(host, GROOT_CYPHER_PORT)

# groot graph
return GrootGraph(
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/flex/core/insight/groot.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def list_service_status(self) -> List[dict]:
},
}
]
if "cypher_endpoint" in res[0]:
if "cypher_endpoint" in groot_endpoints and groot_endpoints["cypher_endpoint"]:
res[0]["sdk_endpoints"]["cypher"] = groot_endpoints["cypher_endpoint"]
return res

Expand Down
32 changes: 32 additions & 0 deletions coordinator/gscoordinator/flex/core/insight/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

import hashlib
import json
import logging

from urllib3.exceptions import ProtocolError

from gscoordinator.flex.core.config import BASEID
from gscoordinator.version import __version__

logger = logging.getLogger("graphscope")


def convert_to_configini(graph, ds_manager, config):
# for bulk loader to connect to groot
Expand Down Expand Up @@ -136,3 +141,30 @@ def convert_to_configini(graph, ds_manager, config):
"customConfig": custom_config,
}
return configini

def test_cypher_endpoint(host : str, port : int):
"""
Test if the cypher endpoint is available, if not return None, otherwise return the cypher endpoint
Note that we send http request to check if the cypher endpoint is available, not submitting a cypher query,
the reason is that the cypher query may raise exceptions in case of other errors.
"""
cypher_endpoint = f"neo4j://{host}:{port}"
try:
import requests
response = requests.get(f"http://{host}:{port}")
response.raise_for_status()
except (requests.exceptions.ConnectionError) as e:
if (e.args != None and len(e.args) > 0):
# Sending http request to cypher endpoint should fail with ProtocolError
if isinstance(e.args[0], ProtocolError):
logger.debug("Cypher endpoint is available: {cypher_endpoint}")
else:
cypher_endpoint = None
logger.debug(f"Cypher endpoint is not available: {str(e)}")
except Exception as e:
logger.debug(f"Cypher endpoint is not available: {str(e)}")
cypher_endpoint = None
return cypher_endpoint
else:
logger.error("Should not reach here")
return cypher_endpoint

0 comments on commit e30a053

Please sign in to comment.