diff --git a/k8s/converter-deployment.yaml b/k8s/converter-deployment.yaml index ef4882c..dcf3459 100644 --- a/k8s/converter-deployment.yaml +++ b/k8s/converter-deployment.yaml @@ -30,41 +30,17 @@ spec: --password ${DB_PASSWORD} \ --v6d-sock ${V6D_SOCKET} \ --v6d-size ${V6D_SIZE} \ - --etcd-endpoint ${ETCD_ENDPOINT} \ + --etcd-endpoint etcd-service:2379 \ --etcd-prefix ${ETCD_PREFIX} \ - --kafka-server ${KAFKA_SERVER} \ + --kafka-server kafka-service:9092 \ --subgraph-num ${SUBGRAPH_NUM} \ --enable-bulkload ${ENABLE_BULKLOAD} \ --rg-from-etcd 1 \ --k8s-mode yes \ --role converter && sleep infinity - env: - - name: KAFKA_SERVER - value: "kafka-service:9092" - - name: DB_HOST - value: "127.0.0.1" - - name: DB_PORT - value: "3306" - - name: DB_USER - value: "dbuser" - - name: DB_PASSWORD - value: "123456" - - name: DB_TYPE - value: "mysql" - - name: DB_NAME - value: "ldbc" - - name: V6D_SOCKET - value: "/tmp/v6d.sock" - - name: V6D_SIZE - value: "750G" - - name: ETCD_ENDPOINT - value: "etcd-service:2379" - - name: ETCD_PREFIX - value: "gart_meta_" - - name: SUBGRAPH_NUM - value: "2" - - name: ENABLE_BULKLOAD - value: "1" + envFrom: + - configMapRef: + name: gart-config diff --git a/k8s/debezium-config.yaml b/k8s/debezium-config-template.yaml similarity index 95% rename from k8s/debezium-config.yaml rename to k8s/debezium-config-template.yaml index d204694..eb3298c 100644 --- a/k8s/debezium-config.yaml +++ b/k8s/debezium-config-template.yaml @@ -9,7 +9,7 @@ data: "name": "debezium-connector-mysql", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", - "database.hostname": "172.29.161.155", + "database.hostname": "127.0.0.1", "database.port": "3306", "database.user": "dbuser", "database.password": "123456", @@ -29,7 +29,7 @@ data: "transforms.ReplaceField.exclude": "ts_ms,transaction", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enabl": "false", + "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "database.history.kafka.bootstrap.servers": "kafka-service:9092", "schema.history.internal.kafka.bootstrap.servers": "kafka-service:9092" diff --git a/k8s/gart-config-template.yaml b/k8s/gart-config-template.yaml new file mode 100644 index 0000000..af40784 --- /dev/null +++ b/k8s/gart-config-template.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: gart-config + namespace: gart +data: + DB_HOST: "127.0.0.1" + DB_PORT: "3306" + DB_USER: "dbuser" + DB_PASSWORD: "123456" + DB_TYPE: "mysql" + DB_NAME: "ldbc" + V6D_SOCKET: "/tmp/v6d.sock" + V6D_SIZE: "750G" + ETCD_PREFIX: "gart_meta_" + ENABLE_BULKLOAD: "1" + SUBGRAPH_NUM: "2" diff --git a/k8s/kafka-deployment.yaml b/k8s/kafka-deployment.yaml index 48a3550..910b97b 100644 --- a/k8s/kafka-deployment.yaml +++ b/k8s/kafka-deployment.yaml @@ -4,7 +4,7 @@ metadata: name: kafka-broker1 namespace: gart spec: - selector: # This selector must match the template labels + selector: matchLabels: app: kafka template: diff --git a/k8s/writer-deployment.yaml b/k8s/writer-deployment-template.yaml similarity index 55% rename from k8s/writer-deployment.yaml rename to k8s/writer-deployment-template.yaml index 8f6f9f8..ea95fc3 100644 --- a/k8s/writer-deployment.yaml +++ b/k8s/writer-deployment-template.yaml @@ -31,9 +31,9 @@ spec: --password ${DB_PASSWORD} \ --v6d-sock ${V6D_SOCKET} \ --v6d-size ${V6D_SIZE} \ - --etcd-endpoint ${ETCD_ENDPOINT} \ + --etcd-endpoint etcd-service:2379 \ --etcd-prefix ${ETCD_PREFIX} \ - --kafka-server ${KAFKA_SERVER} \ + --kafka-server kafka-service:9092 \ --subgraph-num ${SUBGRAPH_NUM} \ --subgraph-id $((${HOSTNAME##*-} + 0)) \ --enable-bulkload ${ENABLE_BULKLOAD} \ @@ -41,32 +41,8 @@ spec: --k8s-mode yes \ --role writer && sleep infinity - env: - - name: KAFKA_SERVER - value: "kafka-service:9092" - - name: DB_HOST - value: "127.0.0.1" - - name: DB_PORT - value: "3306" - - name: DB_USER - value: "dbuser" - - name: DB_PASSWORD - value: "123456" - - name: DB_TYPE - value: "mysql" - - name: DB_NAME - value: "ldbc" - - name: V6D_SOCKET - value: "/tmp/v6d.sock" - - name: V6D_SIZE - value: "750G" - - name: ETCD_ENDPOINT - value: "etcd-service:2379" - - name: ETCD_PREFIX - value: "gart_meta_" - - name: SUBGRAPH_NUM - value: "2" - - name: ENABLE_BULKLOAD - value: "1" + envFrom: + - configMapRef: + name: gart-config diff --git a/scripts/k8s_deployment.py b/scripts/k8s_deployment.py index e0e70b9..8235acc 100644 --- a/scripts/k8s_deployment.py +++ b/scripts/k8s_deployment.py @@ -4,16 +4,9 @@ import json import sys import yaml -import time -from urllib.parse import urlparse -import requests -import socket import os - import etcd3 -import subprocess - def get_parser(): parser = argparse.ArgumentParser( @@ -21,80 +14,217 @@ def get_parser(): formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) - parser.add_argument("--num_subgraphs", default=1, help="Number of subgraphs") - parser.add_argument("--etcd_prefix", default="gart_meta_", help="Etcd prefix") + parser.add_argument("--user_config_path", help="User config file path") parser.add_argument("--rg_mapping_file_path", help="RGMapping file path") return parser -def is_etcd_running(host, port): - """Check if etcd is running by sending a request to the member list API.""" - try: - response = requests.get(f"http://{host}:{port}/health", timeout=1) - return response.status_code == 200 and response.json().get("health") == "true" - except requests.exceptions.RequestException: - return False - - if __name__ == "__main__": arg_parser = get_parser() args = arg_parser.parse_args() gart_home = os.getenv("GART_HOME") + if not gart_home: + print("GART_HOME is not set") + sys.exit(1) - if False: - launch_etcd_pod_deployment_cmd = ( - f"kubectl apply -f {gart_home}/k8s/etcd-deployment.yaml" - ) - process = subprocess.run( - launch_etcd_pod_deployment_cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - ) - if process.returncode != 0: - print("Start Etcd Deployment Error:\n", process.stderr) - sys.exit(1) - else: - print("Etcd Deployment info: ", process.stdout) - - launch_etcd_pod_service_cmd = ( - f"kubectl apply -f {gart_home}/k8s/etcd-service.yaml" - ) - process = subprocess.run( - launch_etcd_pod_service_cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - ) - if process.returncode != 0: - print("Start Etcd Service Error:\n", process.stderr) - sys.exit(1) - else: - print("Etcd Service info: ", process.stdout) + with open(args.user_config_path, "r", encoding="UTF-8") as f: + config = json.load(f) etcd_host = "localhost" etcd_port = 12379 # FIXME: read port from yaml file etcd_client = etcd3.client(host=etcd_host, port=etcd_port) - etcd_prefix = args.etcd_prefix - - if not is_etcd_running(etcd_host, etcd_port): - print("Etcd is not running") - sys.exit(1) - - etcd_client.put(etcd_prefix + "capturer_is_up", "False") - etcd_client.put(etcd_prefix + "converter_is_up", "False") - - num_subgraphs = int(args.num_subgraphs) - for idx in range(num_subgraphs): - etcd_client.put(etcd_prefix + f"writer_{idx}_is_up", "False") with open(args.rg_mapping_file_path, "r", encoding="UTF-8") as f: rg_mapping = yaml.safe_load(f) + etcd_prefix = config.get("etcd_prefix", "gart_meta_") + etcd_client.put( etcd_prefix + "gart_rg_mapping_yaml", yaml.dump(rg_mapping, sort_keys=False) ) + + db_host = config.get("db_host", "127.0.0.1") + db_type = config.get("db_type", "mysql") + db_port = config.get("db_port", 3306 if db_type == "mysql" else 5432) + db_name = config.get("db_name", "ldbc") + db_user = config.get("db_user", "root") + db_password = config.get("db_password", "") + v6d_socket = config.get("v6d_socket", "/tmp/v6d.sock") + v6d_size = config.get("v6d_size", "10G") + total_subgraph_num = config.get("total_subgraph_num", 1) + enable_bulkload = config.get("enable_bulkload", 1) + + # update gart-config.yaml + with open(gart_home + "/k8s/gart-config-template.yaml", "r") as file, open( + gart_home + "/k8s/gart-config.yaml", "w" + ) as temp_file: + for line in file: + if "DB_HOST" in line: + temp_file.write(line.split(":")[0] + ': "' + db_host + '"\n') + elif "DB_PORT" in line: + temp_file.write(line.split(":")[0] + ': "' + str(db_port) + '"\n') + elif "DB_USER" in line: + temp_file.write(line.split(":")[0] + ': "' + db_user + '"\n') + elif "DB_PASSWORD" in line: + temp_file.write(line.split(":")[0] + ': "' + db_password + '"\n') + elif "DB_TYPE" in line: + temp_file.write(line.split(":")[0] + ': "' + db_type + '"\n') + elif "DB_NAME" in line: + temp_file.write(line.split(":")[0] + ': "' + db_name + '"\n') + elif "V6D_SOCKET" in line: + temp_file.write(line.split(":")[0] + ': "' + v6d_socket + '"\n') + elif "V6D_SIZE" in line: + temp_file.write(line.split(":")[0] + ': "' + v6d_size + '"\n') + elif "TOTAL_SUBGRAPH_NUM" in line: + temp_file.write( + line.split(":")[0] + ': "' + str(total_subgraph_num) + '"\n' + ) + elif "ENABLE_BULKLOAD" in line: + temp_file.write( + line.split(":")[0] + ': "' + str(enable_bulkload) + '"\n' + ) + else: + temp_file.write(line) + + # update writer-deployment.yaml + with open(gart_home + "/k8s/writer-deployment-template.yaml", "r") as file, open( + gart_home + "/k8s/writer-deployment.yaml", "w" + ) as temp_file: + for line in file: + if "replicas" in line: + temp_file.write( + line.split(":")[0] + ": " + str(total_subgraph_num) + "\n" + ) + else: + temp_file.write(line) + + # update debezium-config.yaml + with open(gart_home + "/k8s/debezium-config-template.yaml", "r") as file, open( + gart_home + "/k8s/debezium-config.yaml", "w" + ) as temp_file: + for line in file: + if "database.hostname" in line: + temp_file.write(line.split(":")[0] + ': "' + db_host + '",\n') + elif "database.port" in line: + temp_file.write(line.split(":")[0] + ': "' + str(db_port) + '",\n') + elif "database.user" in line: + temp_file.write(line.split(":")[0] + ': "' + db_user + '",\n') + elif "database.password" in line: + temp_file.write(line.split(":")[0] + ': "' + db_password + '",\n') + elif "database.include.list" in line: + temp_file.write(line.split(":")[0] + ': "' + db_name + '",\n') + elif "snapshot.mode" in line: + if enable_bulkload: + if db_type == "mysql": + temp_file.write(line.split(":")[0] + ': "' + "initial" + '",\n') + else: + temp_file.write(line.split(":")[0] + ': "' + "always" + '",\n') + else: + temp_file.write(line.split(":")[0] + ': "' + "never" + '",\n') + elif "table.include.list" in line: + with open(args.rg_mapping_file_path, "r") as file: + graph_schema = yaml.safe_load(file) + + # Extract the 'vertex_types' list from the dictionary + vertex_types = graph_schema.get("vertexMappings", {}).get( + "vertex_types", [] + ) + # Iterate through 'vertex_types' and collect 'dataSourceName' values + vertex_table_names = [ + vertex_type.get("dataSourceName") for vertex_type in vertex_types + ] + + vertex_type_names = [ + vertex_type.get("type_name") for vertex_type in vertex_types + ] + + # build a dict (vertex_type_name, vertex_table_name) + vertex_type_name_table_mapping = { + vertex_type_name: vertex_table_name + for vertex_type_name, vertex_table_name in zip( + vertex_type_names, vertex_table_names + ) + } + + edge_types = graph_schema.get("edgeMappings", {}).get("edge_types", []) + edge_table_names = [edge.get("dataSourceName") for edge in edge_types] + + src_names = [edge["type_pair"]["source_vertex"] for edge in edge_types] + dst_names = [ + edge["type_pair"]["destination_vertex"] for edge in edge_types + ] + + all_table_names = [] + both_vertex_edge_table_names = [] + for vertex_table_name in vertex_table_names: + if vertex_table_name not in edge_table_names: + all_table_names.append(vertex_table_name) + else: + both_vertex_edge_table_names.append(vertex_table_name) + + # build a dict (table_name, src/dst_type_names) + edge_table_src_dst_type_mapping = {} + + for idx in range(len(edge_table_names)): + edge_table_name = edge_table_names[idx] + if edge_table_name not in both_vertex_edge_table_names: + continue + + src_name = src_names[idx] + dst_name = dst_names[idx] + if vertex_type_name_table_mapping[src_name] != edge_table_name: + if edge_table_name not in edge_table_src_dst_type_mapping: + edge_table_src_dst_type_mapping[edge_table_name] = [ + src_name + ] + else: + edge_table_src_dst_type_mapping[edge_table_name].append( + src_name + ) + if vertex_type_name_table_mapping[dst_name] != edge_table_name: + if edge_table_name not in edge_table_src_dst_type_mapping: + edge_table_src_dst_type_mapping[edge_table_name] = [ + dst_name + ] + else: + edge_table_src_dst_type_mapping[edge_table_name].append( + dst_name + ) + + both_vertex_edge_table_placed = [0] * len(both_vertex_edge_table_names) + while True: + if sum(both_vertex_edge_table_placed) == len( + both_vertex_edge_table_names + ): + break + for idx in range(len(both_vertex_edge_table_names)): + if both_vertex_edge_table_placed[idx] == 1: + continue + edge_table_name = both_vertex_edge_table_names[idx] + src_dst_type_names = edge_table_src_dst_type_mapping[ + edge_table_name + ] + if all( + vertex_type_name_table_mapping[src_dst_type_name] + in all_table_names + for src_dst_type_name in src_dst_type_names + ): + all_table_names.append(edge_table_name) + both_vertex_edge_table_placed[idx] = 1 + + for edge_table_name in edge_table_names: + if edge_table_name not in all_table_names: + all_table_names.append(edge_table_name) + + tmp_db_name = db_name + if tmp_db_name == "postgresql": + tmp_db_name = "public" + new_line_list = [ + tmp_db_name + "." + table_name for table_name in all_table_names + ] + new_line = ",".join(new_line_list) + temp_file.write(line.split(":")[0] + ': "' + new_line + '",\n') + else: + temp_file.write(line) diff --git a/vegito/test/schema/k8s_config.json b/vegito/test/schema/k8s_config.json new file mode 100644 index 0000000..fd243b7 --- /dev/null +++ b/vegito/test/schema/k8s_config.json @@ -0,0 +1,14 @@ +{ + "db_host": "127.0.0.1", + "db_port": 3306, + "db_type": "mysql", + "db_name": "ldbc", + "db_user": "dbuser", + "db_password": "123456", + "rgmapping_file": "/path/to/rgmapping-ldbc.yaml", + "v6d_socket": "/tmp/ldbc.sock", + "v6d_size": "750G", + "etcd_prefix": "gart_meta_", + "total_subgraph_num": 2, + "enable_bulkload": 1 +} \ No newline at end of file