Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] FLEX: Support Interactive both in gsctl and coordinator side #3334

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from graphscope.proto import coordinator_service_pb2_grpc

from gscoordinator.monitor import Monitor
from gscoordinator.servicer import init_flex_service_servicer
from gscoordinator.servicer import init_graphscope_one_service_servicer
from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH

Expand Down Expand Up @@ -112,7 +113,7 @@ def get_servicer(config: Config):
"GraphScope One": init_graphscope_one_service_servicer,
}

initializer = service_initializers.get(config.solution)
initializer = service_initializers.get(config.solution, init_flex_service_servicer)
if initializer is None:
raise RuntimeError(
f"Expect {service_initializers.keys()} of solution parameter"
Expand Down
1 change: 1 addition & 0 deletions coordinator/gscoordinator/servicer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
# limitations under the License.
#

from gscoordinator.servicer.flex.service import *
from gscoordinator.servicer.graphscope_one.service import *
17 changes: 17 additions & 0 deletions coordinator/gscoordinator/servicer/flex/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
28 changes: 28 additions & 0 deletions coordinator/gscoordinator/servicer/flex/interactive/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import sys

try:
sys.path.insert(0, os.path.dirname(__file__))
import hiactor_client
except ImportError:
raise

from gscoordinator.servicer.flex.interactive.hiactor import *
208 changes: 208 additions & 0 deletions coordinator/gscoordinator/servicer/flex/interactive/hiactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import itertools
import logging

import hiactor_client

from google.protobuf.json_format import MessageToDict

from graphscope.config import Config
from graphscope.proto import flex_pb2

from gscoordinator.servicer.flex.interactive.job_scheduler import (
DataloadingJobScheduler,
)
from gscoordinator.servicer.flex.job import JobStatus

from gscoordinator.utils import WORKSPACE
from gscoordinator.utils import delegate_command_to_pod
from gscoordinator.utils import run_kube_cp_command

__all__ = ["init_interactive_service"]

logger = logging.getLogger("graphscope")


# There are two workspaces for FLEX Interactive, one residing on "coordinator" node
# and the other on the "interactive" node. These two workspaces are equivalent when
# running mode is "hosts". Hence, INTERACTIVE_WORKSPACE is only effective in
# Kubernetes (k8s) environment.
INTERACTIVE_WORKSPACE = (
os.environ["INTERACTIVE_WORKSPACE"]
if "INTERACTIVE_WORKSPACE" in os.environ
else "/tmp"

Check warning on line 51 in coordinator/gscoordinator/servicer/flex/interactive/hiactor.py

View check run for this annotation

codefactor.io / CodeFactor

coordinator/gscoordinator/servicer/flex/interactive/hiactor.py#L49-L51

Probable insecure usage of temp file/directory. (B108)
)


INTERACTIVE_CONTAINER_NAME = "interactive"


class Hiactor(object):
"""Hiactor module used to interact with hiactor engine"""

def __init__(self, config: Config):
self._config = config
# hiactor admin service host
self._hiactor_host = self._get_hiactor_service_endpoints()
# workspace
self._workspace = os.path.join(WORKSPACE, "interactive")
os.makedirs(self._workspace, exist_ok=True)
# job status
self._job_status = {}
# check heartbeat to interactive engine
logger.info("Connect to hiactor service at %s", self._hiactor_host)

def _get_hiactor_service_endpoints(self):
if self._config.launcher_type == "hosts":
# TODO change to 127.0.0.1
endpoint = "http://192.168.0.9:{0}".format(
os.environ.get("HIACTOR_ADMIN_SERVICE_PORT", 7777)
)
return endpoint

@property
def hiactor_host(self):
return self._hiactor_host

@property
def job_status(self):
return self._job_status

def register_job_status(self, jobid: str, status: JobStatus):
self._job_status[jobid] = status

def write_and_distribute_file(
self, graph_name: str, basename: str, raw_data: bytes
):
# /<workspace>/raw_data/<graphname>
filedir = os.path.join(self._workspace, "raw_data", graph_name)
os.makedirs(filedir, exist_ok=True)
# /<filedir>/basename
filepath = os.path.join(filedir, basename)
# dump raw data to file
with open(filepath, "wb") as f:
f.write(raw_data)
# distribute
target_file = filepath
if self._config.launcher_type == "k8s":
# filepath is different in interactive pod
target_file = os.path.join(
INTERACTIVE_WORKSPACE, "raw_data", graph_name, basename
)
for pod in []:
container = INTERACTIVE_CONTAINER_NAME
cmd = f"mkdir -p {os.path.dirname(target_file)}"
logger.debug(delegate_command_to_pod(cmd, pod, container))
logger.debug(
run_kube_cp_command(tmp_file, target_file, pod, container, True)
)
return target_file

def create_graph(self, graph_def_dict):
with hiactor_client.ApiClient(
hiactor_client.Configuration(self._hiactor_host)
) as api_client:
api_instance = hiactor_client.GraphApi(api_client)
graph = hiactor_client.Graph.from_dict(graph_def_dict)
return api_instance.create_graph(graph)

def list_graph(self):
with hiactor_client.ApiClient(
hiactor_client.Configuration(self._hiactor_host)
) as api_client:
api_instance = hiactor_client.GraphApi(api_client)
return api_instance.list_graphs()

def delete_graph(self, graph_name: str):
with hiactor_client.ApiClient(
hiactor_client.Configuration(self._hiactor_host)
) as api_client:
api_instance = hiactor_client.GraphApi(api_client)
return api_instance.delete_graph(graph_name)

def create_job(
self,
job_type: str,
schedule_proto: flex_pb2.Schedule,
description_proto: flex_pb2.JobDescription,
):
if job_type != "DATALOADING":
raise RuntimeError(
"Job type {0} is not supported in interacive.".format(job_type)
)

# write raw data to file and distribute to interactive workspace
schema_mapping = description_proto.schema_mapping
for mapping in itertools.chain(
schema_mapping.vertex_mappings, schema_mapping.edge_mappings
):
for index, location in enumerate(mapping.inputs):
if location.startswith("@"):
# write raw data and distribute file to interactive workspace
new_location = self.write_and_distribute_file(
schema_mapping.graph,
os.path.basename(location),
mapping.raw_data[index],
)
# clear the raw_data
mapping.raw_data[index] = bytes()
# update the location
mapping.inputs[index] = new_location
# schedule
schedule = MessageToDict(
schedule_proto,
preserving_proto_field_name=True,
including_default_value_fields=True,
)
# job description
description = MessageToDict(
description_proto,
preserving_proto_field_name=True,
including_default_value_fields=True,
)
# submit
if schedule["run_now"]:
at_time = "now"
repeat = "null"
else:
at_time = schedule["at_time"]
repeat = schedule["repeat"]

scheduler = DataloadingJobScheduler(
at_time=at_time,
repeat=repeat,
description=description,
servicer=self,
)
scheduler.start()

def create_procedure(self, procedure_def_dict):
with hiactor_client.ApiClient(
hiactor_client.Configuration(self._hiactor_host)
) as api_client:
api_instance = hiactor_client.ProcedureApi(api_client)
graph_name = procedure_def_dict["bound_graph"]
procedure = hiactor_client.Procedure.from_dict(procedure_def_dict)
return api_instance.create_procedure(graph_name, procedure)


def init_interactive_service(config: Config):
return Hiactor(config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging

import hiactor_client

from gscoordinator.servicer.flex.job import JobType
from gscoordinator.servicer.flex.job import JobStatus
from gscoordinator.servicer.flex.scheduler import Scheduler

logger = logging.getLogger("graphscope")


class DataloadingJobScheduler(Scheduler):
"""Class for scheduling the dataloading process of interactive graphs"""

def __init__(self, at_time, repeat, description, servicer):
super().__init__(at_time, repeat)

self._description = description
self._servicer = servicer

self._tags = [JobType.DATALOADING.name]

def run(self):
"""This function needs to handle exception by itself"""

schema_mapping = self._description["schema_mapping"]
graph_name = schema_mapping["graph"]

detail = {"graph name": graph_name}
status = JobStatus(
jobid=self.jobid,
type=JobType.DATALOADING,
start_time=self.last_run,
detail=detail,
)

# register status to servicer
self._servicer.register_job_status(self.jobid, status)

with hiactor_client.ApiClient(
hiactor_client.Configuration(self._servicer.hiactor_host)
) as api_client:
# create an instance of the API class
api_instance = hiactor_client.DataloadingApi(api_client)

try:
api_response = api_instance.create_dataloading_job(
graph_name,
hiactor_client.SchemaMapping.from_dict(schema_mapping),
)
except Exception as e:
logger.warning(
"Failed to create dataloading job on graph %s: %s",
graph_name,
str(e),
)
status.set_failed(message=str(e))
else:
status.set_success(message=api_response)
Loading
Loading