From a986bb67f19898245e3c34f1db5c9d3e41e8afe4 Mon Sep 17 00:00:00 2001 From: Lei Wang Date: Tue, 22 Oct 2024 19:52:17 +0800 Subject: [PATCH] revise coordinator Signed-off-by: Lei Wang --- coordinator/flex/server/__main__.py | 4 ++ .../controllers/data_source_controller.py | 3 + .../controllers/deployment_controller.py | 2 +- .../server/controllers/graph_controller.py | 64 ++++++++++++++----- .../server/controllers/service_controller.py | 7 +- scripts/controller.py | 4 +- 6 files changed, 64 insertions(+), 20 deletions(-) diff --git a/coordinator/flex/server/__main__.py b/coordinator/flex/server/__main__.py index 503a4cd..4123330 100644 --- a/coordinator/flex/server/__main__.py +++ b/coordinator/flex/server/__main__.py @@ -3,9 +3,13 @@ import connexion from flex.server import encoder +from datetime import datetime def main(): + current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + with open("/tmp/cluster_create_time.txt", "w") as f: + f.write(current_timestamp) app = connexion.App(__name__, specification_dir='./openapi/') app.app.json_encoder = encoder.JSONEncoder app.add_api('openapi.yaml', diff --git a/coordinator/flex/server/controllers/data_source_controller.py b/coordinator/flex/server/controllers/data_source_controller.py index d871056..ddea2e5 100644 --- a/coordinator/flex/server/controllers/data_source_controller.py +++ b/coordinator/flex/server/controllers/data_source_controller.py @@ -72,6 +72,9 @@ def get_datasource_by_id(graph_id): # noqa: E501 except Exception as e: return "Failed to get data source: " + str(e), 500 + if data_source_config is None: + return (SchemaMapping.from_dict({}), 200) + data_source_config = json.loads(data_source_config.decode("utf-8")) return (SchemaMapping.from_dict(data_source_config), 200) diff --git a/coordinator/flex/server/controllers/deployment_controller.py b/coordinator/flex/server/controllers/deployment_controller.py index f477cb4..ae94921 100644 --- a/coordinator/flex/server/controllers/deployment_controller.py +++ b/coordinator/flex/server/controllers/deployment_controller.py @@ -22,7 +22,7 @@ def get_deployment_info(): # noqa: E501 """ result_dict = {} result_dict["cluster_type"] = "KUBERNETES" - with open ("/tmp/graph_schema_create_time.txt", "r") as f: + with open ("/tmp/cluster_create_time.txt", "r") as f: result_dict["creation_time"] = f.read() result_dict["instance_name"] = "gart" result_dict["frontend"] = "Cypher/Gremlin" diff --git a/coordinator/flex/server/controllers/graph_controller.py b/coordinator/flex/server/controllers/graph_controller.py index 49c603b..f3fc633 100644 --- a/coordinator/flex/server/controllers/graph_controller.py +++ b/coordinator/flex/server/controllers/graph_controller.py @@ -64,12 +64,30 @@ def get_graph_schema(): rg_mapping_str = rg_mapping_str.decode("utf-8") break try_times += 1 - time.sleep(1) + time.sleep(0.2) except Exception as e: try_times += 1 - time.sleep(1) + time.sleep(0.2) if try_times == try_max_times: + try_times = 0 + original_graph_schema_key = etcd_prefix + "gart_graph_schema_json" + while try_times < try_max_times: + try: + original_graph_schema_str, _ = etcd_client.get(original_graph_schema_key) + if original_graph_schema_str is not None: + original_graph_schema_str = original_graph_schema_str.decode("utf-8") + break + try_times += 1 + time.sleep(0.2) + except Exception as e: + try_times += 1 + time.sleep(0.2) + if try_times == try_max_times: + return result_dict + result_dict["name"] = GRAPH_ID + result_dict["id"] = GRAPH_ID + result_dict["schema"] = json.loads(original_graph_schema_str)["schema"] return result_dict rg_mapping = yaml.load(rg_mapping_str, Loader=yaml.SafeLoader) @@ -83,10 +101,10 @@ def get_graph_schema(): table_schema_str = table_schema_str.decode("utf-8") break try_times += 1 - time.sleep(1) + time.sleep(0.2) except Exception as e: try_times += 1 - time.sleep(1) + time.sleep(0.2) if try_times == try_max_times: return result_dict @@ -463,12 +481,19 @@ def get_graph_by_id(graph_id): # noqa: E501 result_dict["id"] = graph_id result_dict["name"] = graph_id - with open("/tmp/graph_schema_create_time.txt", "r") as f: - result_dict["creation_time"] = f.read() - result_dict["schema_update_time"] = result_dict["creation_time"] - - with open("/tmp/data_loading_job_created_time.txt", "r") as f: - result_dict["data_update_time"] = f.read() + try: + with open("/tmp/graph_schema_create_time.txt", "r") as f: + result_dict["creation_time"] = f.read() + result_dict["schema_update_time"] = result_dict["creation_time"] + except: + result_dict["creation_time"] = "" + result_dict["schema_update_time"] = "" + + try: + with open("/tmp/data_loading_job_created_time.txt", "r") as f: + result_dict["data_update_time"] = f.read() + except: + result_dict["data_update_time"] = "" return (GetGraphResponse.from_dict(result_dict), 200) @@ -553,12 +578,19 @@ def list_graphs(): # noqa: E501 if not result_dict: return ([GetGraphResponse.from_dict({})], 200) - with open("/tmp/graph_schema_create_time.txt", "r") as f: - result_dict["creation_time"] = f.read() - result_dict["schema_update_time"] = result_dict["creation_time"] - - with open("/tmp/data_loading_job_created_time.txt", "r") as f: - result_dict["data_update_time"] = f.read() + try: + with open("/tmp/graph_schema_create_time.txt", "r") as f: + result_dict["creation_time"] = f.read() + result_dict["schema_update_time"] = result_dict["creation_time"] + except: + result_dict["creation_time"] = "" + result_dict["schema_update_time"] = "" + + try: + with open("/tmp/data_loading_job_created_time.txt", "r") as f: + result_dict["data_update_time"] = f.read() + except: + result_dict["data_update_time"] = "" return ([GetGraphResponse.from_dict(result_dict)], 200) diff --git a/coordinator/flex/server/controllers/service_controller.py b/coordinator/flex/server/controllers/service_controller.py index f8a40e7..16e38e2 100644 --- a/coordinator/flex/server/controllers/service_controller.py +++ b/coordinator/flex/server/controllers/service_controller.py @@ -68,8 +68,11 @@ def list_service_status(): # noqa: E501 gremlin_service_name = os.getenv('GREMLIN_SERVICE_NAME', 'gremlin-service') gremlin_service_port = os.getenv('GIE_GREMLIN_PORT', '8182') gremlin_service_ip = get_external_ip_of_a_service(gremlin_service_name, k8s_namespace) - with open("/tmp/graph_id.txt", "r") as f: - graph_id = f.read() + try: + with open("/tmp/graph_id.txt", "r") as f: + graph_id = f.read() + except: + graph_id = "gart_graph" result_dict["graph_id"] = graph_id result_dict["status"] = "Running" result_dict["sdk_endpoints"] = {} diff --git a/scripts/controller.py b/scripts/controller.py index 8321acb..59b8948 100755 --- a/scripts/controller.py +++ b/scripts/controller.py @@ -610,7 +610,7 @@ def get_all_available_read_epochs_internal(): num_fragment = os.getenv("SUBGRAPH_NUM", "1") latest_read_epoch = get_latest_read_epoch() if latest_read_epoch == 2**64 - 1: - return [] + return [[], []] available_epochs = [] available_epochs_internal = [] for epoch in range(latest_read_epoch + 1): @@ -659,6 +659,8 @@ def get_latest_read_epoch(): etcd_key = etcd_prefix + "gart_latest_epoch_p" + str(idx) try: etcd_value, _ = etcd_client.get(etcd_key) + if etcd_value is None: + etcd_value = 2**64 - 1 if latest_epoch > int(etcd_value): latest_epoch = int(etcd_value) except Exception as e: