From afa3fafbb59f7743ee7b62d5a14f8dcf52162d52 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 9 May 2024 06:59:00 +0000 Subject: [PATCH] introduce proxy_server minor code refactor refine proxy server todo: bind to pvc and deploy on internal try to test on internal modify dockerfile able to launch, todo: fix graph creation and loading minor fix add log fix codegen error minor introduce proxy_server minor code refactor refine proxy server todo: bind to pvc and deploy on internal try to test on internal modify dockerfile able to launch, todo: fix graph creation and loading minor fix add log minor update memory request add test script increase memroy request --- .github/workflows/interactive.yml | 14 + charts/graphscope-interactive.yaml | 443 ++++++++++++++++++ charts/graphscope-interactive/README.md | 23 + .../backup/admin_virtual_server.yaml | 25 + .../frontend/statefulset.yaml | 54 ++- .../{templates => backup}/frontend/svc.yaml | 8 +- .../backup/ingress_test.yaml | 19 + .../backup/nginx_ingress_custom.yaml | 17 + .../backup/query_virtual_server.yaml | 49 ++ charts/graphscope-interactive/settings.yaml | 77 +++ .../templates/_helpers.tpl | 35 +- .../templates/configmap.yaml | 18 +- .../templates/engine/statefulset.yaml | 167 ------- .../templates/engine/svc-headless.yaml | 37 -- .../templates/primary/statefulset.yaml | 273 +++++++++++ .../templates/primary/svc.yaml | 37 ++ .../templates/secondary/statefulset.yaml | 169 +++++++ .../templates/secondary/svc.yaml | 37 ++ charts/graphscope-interactive/values.yaml | 320 ++++++++++++- flex/CMakeLists.txt | 1 + flex/bin/CMakeLists.txt | 6 + flex/bin/bulk_loader.cc | 4 +- flex/bin/proxy_server.cc | 130 +++++ flex/engines/hqps_db/core/utils/hqps_utils.h | 7 + flex/engines/http_server/CMakeLists.txt | 12 + .../http_server/actor/proxy_actor.act.cc | 109 +++++ .../http_server/actor/proxy_actor.act.h | 44 ++ .../http_server/handler/admin_http_handler.cc | 1 + .../http_server/handler/admin_http_handler.h | 6 +- .../http_server/handler/hqps_http_handler.cc | 33 ++ .../http_server/handler/hqps_http_handler.h | 14 + .../engines/http_server/handler/http_proxy.cc | 271 +++++++++++ flex/engines/http_server/handler/http_proxy.h | 112 +++++ flex/engines/http_server/handler/http_utils.h | 4 +- .../http_server/handler/proxy_http_handler.cc | 95 ++++ .../http_server/handler/proxy_http_handler.h | 63 +++ flex/engines/http_server/options.cc | 1 + flex/engines/http_server/options.h | 2 + .../http_server/service/proxy_service.cc | 63 +++ .../http_server/service/proxy_service.h | 66 +++ flex/engines/http_server/types.h | 4 + flex/interactive/docker/entrypoint.sh | 124 +++-- .../docker/interactive-runtime.Dockerfile | 6 +- .../sdk/examples/python/get_service_status.py | 79 ++++ .../python/interactive_sdk/client/driver.py | 4 +- .../metadata/local_file_metadata_store.cc | 2 +- flex/tests/hqps/engine_config_test_2.yaml | 39 ++ flex/tests/hqps/hqps_proxy_server_test.sh | 161 +++++++ flex/utils/result.h | 3 + 49 files changed, 2980 insertions(+), 308 deletions(-) create mode 100644 charts/graphscope-interactive.yaml create mode 100644 charts/graphscope-interactive/backup/admin_virtual_server.yaml rename charts/graphscope-interactive/{templates => backup}/frontend/statefulset.yaml (71%) rename charts/graphscope-interactive/{templates => backup}/frontend/svc.yaml (85%) create mode 100644 charts/graphscope-interactive/backup/ingress_test.yaml create mode 100644 charts/graphscope-interactive/backup/nginx_ingress_custom.yaml create mode 100644 charts/graphscope-interactive/backup/query_virtual_server.yaml create mode 100644 charts/graphscope-interactive/settings.yaml delete mode 100644 charts/graphscope-interactive/templates/engine/statefulset.yaml delete mode 100644 charts/graphscope-interactive/templates/engine/svc-headless.yaml create mode 100644 charts/graphscope-interactive/templates/primary/statefulset.yaml create mode 100644 charts/graphscope-interactive/templates/primary/svc.yaml create mode 100644 charts/graphscope-interactive/templates/secondary/statefulset.yaml create mode 100644 charts/graphscope-interactive/templates/secondary/svc.yaml create mode 100644 flex/bin/proxy_server.cc create mode 100644 flex/engines/http_server/actor/proxy_actor.act.cc create mode 100644 flex/engines/http_server/actor/proxy_actor.act.h create mode 100644 flex/engines/http_server/handler/http_proxy.cc create mode 100644 flex/engines/http_server/handler/http_proxy.h create mode 100644 flex/engines/http_server/handler/proxy_http_handler.cc create mode 100644 flex/engines/http_server/handler/proxy_http_handler.h create mode 100644 flex/engines/http_server/service/proxy_service.cc create mode 100644 flex/engines/http_server/service/proxy_service.h create mode 100644 flex/interactive/sdk/examples/python/get_service_status.py create mode 100644 flex/tests/hqps/engine_config_test_2.yaml create mode 100644 flex/tests/hqps/hqps_proxy_server_test.sh diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index 3ebcba777388..e1005d20035c 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -167,6 +167,20 @@ jobs: sed -i 's/default_graph: modern_graph/default_graph: ldbc/g' ./engine_config_test.yaml sed -i 's/temp_workspace/interactive_workspace/g' ./engine_config_test.yaml + - name: Proxy Server test + env: + INTERACTIVE_WORKSPACE: /tmp/interactive_workspace + run: | + cd ${GITHUB_WORKSPACE}/flex/tests/hqps + bash hqps_proxy_server_test.sh ${INTERACTIVE_WORKSPACE} ./engine_config_test.yaml + + - name: Proxy Server test + env: + INTERACTIVE_WORKSPACE: /tmp/interactive_workspace + run: | + cd ${GITHUB_WORKSPACE}/flex/tests/hqps + bash hqps_proxy_server_test.sh ${INTERACTIVE_WORKSPACE} ./engine_config_test.yaml + - name: Sample Query test env: GS_TEST_DIR: ${{ github.workspace }}/gstest diff --git a/charts/graphscope-interactive.yaml b/charts/graphscope-interactive.yaml new file mode 100644 index 000000000000..1aa1bfb55d7e --- /dev/null +++ b/charts/graphscope-interactive.yaml @@ -0,0 +1,443 @@ +--- +# Source: graphscope-interactive/templates/configmap.yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: release-name-graphscope-interactive-config + namespace: kubetask + labels: + helm.sh/chart: graphscope-interactive-0.0.2 + app.kubernetes.io/name: graphscope-interactive + app.kubernetes.io/instance: release-name + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/component: configmap +data: + engine_config.yaml: |- + directories: + workspace: /tmp/interactive_workspace + subdirs: + data: data + logs: logs + conf: conf + log_level: INFO + default_graph: modern_graph + compute_engine: + type: hiactor + workers: + - localhost:10000 + thread_num_per_worker: 1 + compiler: + planner: + is_on: true + opt: RBO + rules: + - FilterIntoJoinRule + - FilterMatchRule + - NotMatchToAntiJoinRule + endpoint: + default_listen_address: localhost + bolt_connector: + disabled: false + port: 7687 + gremlin_connector: + disabled: true + port: 8182 + query_timeout: 30000 + http_service: + default_listen_address: localhost + admin_port: 7777 + query_port: 10000 +--- +# Source: graphscope-interactive/templates/primary/svc.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-graphscope-interactive-primary + namespace: kubetask + labels: + helm.sh/chart: graphscope-interactive-0.0.2 + app.kubernetes.io/name: graphscope-interactive + app.kubernetes.io/instance: release-name + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/component: primary + annotations: +spec: + type: NodePort + ports: + - name: admin-port + port: 7777 + protocol: TCP + targetPort: 7777 + - name: query-port + port: 10000 + protocol: TCP + targetPort: 10000 + selector: + app.kubernetes.io/name: graphscope-interactive + app.kubernetes.io/instance: release-name + app.kubernetes.io/component: primary +--- +# Source: graphscope-interactive/templates/secondary/svc.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-graphscope-interactive-secondary + namespace: kubetask + labels: + helm.sh/chart: graphscope-interactive-0.0.2 + app.kubernetes.io/name: graphscope-interactive + app.kubernetes.io/instance: release-name + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/component: secondary + annotations: +spec: + type: LoadBalancer + ports: + - name: admin-port + port: 7777 + protocol: TCP + targetPort: 7777 + - name: query-port + port: 10000 + protocol: TCP + targetPort: 10000 + selector: + app.kubernetes.io/name: graphscope-interactive + app.kubernetes.io/instance: release-name + app.kubernetes.io/component: secondary +--- +# Source: graphscope-interactive/templates/primary/statefulset.yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: release-name-graphscope-interactive-primary + namespace: kubetask + labels: + helm.sh/chart: graphscope-interactive-0.0.2 + app.kubernetes.io/name: graphscope-interactive + app.kubernetes.io/instance: release-name + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/component: primary +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: graphscope-interactive + app.kubernetes.io/instance: release-name + app.kubernetes.io/component: primary + serviceName: release-name-graphscope-interactive-primary + updateStrategy: + type: RollingUpdate + template: + metadata: + annotations: + labels: + helm.sh/chart: graphscope-interactive-0.0.2 + app.kubernetes.io/name: graphscope-interactive + app.kubernetes.io/instance: release-name + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/component: primary + spec: + hostNetwork: + hostIPC: false + serviceAccountName: default + initContainers: + containers: + - name: proxy-admin + image: registry.cn-hongkong.aliyuncs.com/graphscope/interactive:debug + imagePullPolicy: "Always" + # command: ["sleep", "infinity"] + command: + - /bin/bash + - -c + - | + POD_NAME=$MY_POD_NAME + if [ -z "$POD_NAME" ]; then + POD_NAME=$(hostname) + fi + echo "POD_NAME: $POD_NAME" + secondary_pod_dns_names="" + # cnt=1 + # for i from 0 to $SECONDARY_REPLICAS + for ((i=0; i ${REAL_ENGINE_CONFIG_PATH} - echo "Finish set ENGINE_SERVICE_HOST to ${ENGINE_SERVICE_HOST}" \ No newline at end of file + default_listen_address: localhost + admin_port: {{ .Values.primary.service.adminPort }} + query_port: {{ .Values.primary.service.queryPort }} \ No newline at end of file diff --git a/charts/graphscope-interactive/templates/engine/statefulset.yaml b/charts/graphscope-interactive/templates/engine/statefulset.yaml deleted file mode 100644 index f9455b53fd71..000000000000 --- a/charts/graphscope-interactive/templates/engine/statefulset.yaml +++ /dev/null @@ -1,167 +0,0 @@ -{{- $frontendFullname := include "graphscope-interactive.frontend.fullname" . }} -{{- $engineFullName := include "graphscope-interactive.engine.fullname" . }} -{{- $releaseNamespace := .Release.Namespace }} -{{- $clusterDomain := .Values.clusterDomain }} - -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: {{ include "graphscope-interactive.engine.fullname" . }} - namespace: {{ .Release.Namespace }} - labels: {{- include "graphscope-interactive.labels" . | nindent 4 }} - app.kubernetes.io/component: engine - {{- if .Values.commonLabels }} - {{- include "graphscope-interactive.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }} - {{- end }} - {{- if .Values.commonAnnotations }} - annotations: {{- include "graphscope-interactive.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} - {{- end }} -spec: - replicas: {{ .Values.engine.replicaCount }} - selector: - matchLabels: {{ include "graphscope-interactive.selectorLabels" . | nindent 6 }} - app.kubernetes.io/component: engine - serviceName: {{ include "graphscope-interactive.engine.fullname" . }}-headless - updateStrategy: - type: {{ .Values.engine.updateStrategy }} - {{- if (eq "Recreate" .Values.engine.updateStrategy) }} - rollingUpdate: null - {{- end }} - template: - metadata: - annotations: - {{- if .Values.engine.podAnnotations }} - {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.engine.podAnnotations "context" $) | nindent 8 }} - {{- end }} - labels: {{- include "graphscope-interactive.labels" . | nindent 8 }} - app.kubernetes.io/component: engine - {{- if .Values.commonLabels }} - {{- include "graphscope-interactive.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 8 }} - {{- end }} - spec: - {{- with .Values.imagePullSecrets }} - imagePullSecrets: {{- toYaml . | nindent 8 }} - {{- end }} - serviceAccountName: {{ include "graphscope-interactive.serviceAccountName" . }} - {{- if .Values.engine.affinity }} - affinity: {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.engine.affinity "context" $) | nindent 8 }} - {{- end }} - initContainers: - {{- if .Values.engine.initContainers }} - {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.engine.initContainers "context" $) | nindent 8 }} - {{- end }} - containers: - - name: engine - image: {{ include "graphscope-interactive.engine.image" . }} - imagePullPolicy: {{ .Values.engine.image.pullPolicy | quote }} - command: - - /bin/bash - - -c - - | - echo "Starting engine..." - # first check engine_config.yaml exists - if [ ! -f ${ENGINE_CONFIG_PATH} ]; then - #error exit - echo "${ENGINE_CONFIG_PATH} not found, exiting..." - exit 1 - fi - # then check interactive_server binary exists and executable - if [ ! -x ${ENGINE_BINARY_PATH} ]; then - #error exit - echo "${ENGINE_BINARY_PATH} binary not found or not executable, exiting..." - exit 1 - fi - # always try to load the built-in graph: gs_interactive_default_graph - # for case CURRENT_GRAPH is not the default_graph, we assume the data is already loaded. - # TODO. - builtin_graph_schema_path="${INTERACTIVE_WORKSPACE}/data/${DEFAULT_GRAPH_NAME}/graph.yaml" - builtin_graph_data_path="${INTERACTIVE_WORKSPACE}/data/${DEFAULT_GRAPH_NAME}/indices/" - builtin_graph_import_path="${INTERACTIVE_WORKSPACE}/data/${DEFAULT_GRAPH_NAME}/bulk_load.yaml" - # if builtin_graph_data_path exists, skip - if [ ! -d ${builtin_graph_data_path} ]; then - mkdir -p ${INTERACTIVE_WORKSPACE}/data/${DEFAULT_GRAPH_NAME} - echo "Loading builtin graph: ${DEFAULT_GRAPH_NAME} with command: $builtin_graph_loader_cmd" - cp /opt/flex/share/gs_interactive_default_graph/graph.yaml ${builtin_graph_schema_path} - cp /opt/flex/share/gs_interactive_default_graph/bulk_load.yaml ${builtin_graph_import_path} - export FLEX_DATA_DIR=/opt/flex/share/gs_interactive_default_graph/ - - builtin_graph_loader_cmd="${BULK_LOADER_BINARY_PATH} -g ${builtin_graph_schema_path} -d ${builtin_graph_data_path} -l ${builtin_graph_import_path}" - echo "Loading builtin graph: ${DEFAULT_GRAPH_NAME} with command: $builtin_graph_loader_cmd" - eval $builtin_graph_loader_cmd - fi - - bash /etc/interactive/setup.sh - cmd="GLOG_v=10 ${ENGINE_BINARY_PATH} -c ${REAL_ENGINE_CONFIG_PATH}" - #cmd="${cmd} --enable-admin-service false -w ${INTERACTIVE_WORKSPACE}" - cmd="${cmd} -g ${builtin_graph_schema_path} --data-path ${builtin_graph_data_path}" - echo "Starting engine with command: $cmd" - eval $cmd - env: - - name: INTERACTIVE_WORKSPACE - value: {{ .Values.workspace | quote }} - - name: ENGINE_SERVICE_HOST - value: {{ $engineFullName }}-headless.{{ $releaseNamespace }}.svc.{{ $clusterDomain }} - - name: ENGINE_CONFIG_PATH - value: {{ include "graphscope-interactive.engineConfigPath" . }} - - name: REAL_ENGINE_CONFIG_PATH - value: {{ include "graphscope-interactive.realEngineConfigPath" . }} - - name: ENGINE_BINARY_PATH - value: {{ include "graphscope-interactive.engineBinaryPath" . }} - - name: ENGINE_SHARD_NUM - value: {{ .Values.engine.threadNumPerWorker | quote }} - - name: BULK_LOADER_BINARY_PATH - value: /opt/flex/bin/bulk_loader - - name: DEFAULT_GRAPH_NAME - value: {{ .Values.defaultGraph }} - ports: - - name: admin-port - containerPort: {{ .Values.engine.service.adminPort }} - - name: query-port - containerPort: {{ .Values.engine.service.queryPort }} - {{- if .Values.engine.resources }} - resources: {{- toYaml .Values.engine.resources | nindent 12 }} - {{- end }} - volumeMounts: - - name: workspace - mountPath: {{ .Values.workspace }} - - name: config - mountPath: {{ include "graphscope-interactive.engineConfigPath" . }} - subPath: engine_config.yaml - - name: config - mountPath: /etc/interactive/setup.sh - subPath: setup.sh - volumes: - - name: config - configMap: - name: {{ include "graphscope-interactive.configmapName" . }} - defaultMode: 0755 - {{- if and .Values.engine.persistence.enabled .Values.engine.persistence.existingClaim }} - - name: workspace - persistentVolumeClaim: - claimName: {{ tpl .Values.engine.persistence.existingClaim . }} - {{- else if not .Values.engine.persistence.enabled }} - - name: workspace - emptyDir: {} - {{- else if and .Values.engine.persistence.enabled (not .Values.engine.persistence.existingClaim) }} - volumeClaimTemplates: - - metadata: - name: workspace - {{- if .Values.persistence.annotations }} - annotations: {{- include "common.tplvalues.render" (dict "value" .Values.persistence.annotations "context" $) | nindent 10 }} - {{- end }} - {{- if .Values.persistence.labels }} - labels: {{- include "common.tplvalues.render" (dict "value" .Values.persistence.labels "context" $) | nindent 10 }} - {{- end }} - spec: - accessModes: - {{- range .Values.persistence.accessModes }} - - {{ . | quote }} - {{- end }} - resources: - requests: - storage: {{ .Values.engine.persistence.size | quote }} - {{ include "graphscope-interactive.storageClass" . | nindent 8 }} - {{- if .Values.engine.persistence.selector }} - selector: {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.engine.persistence.selector "context" $) | nindent 10 }} - {{- end -}} - {{- end }} diff --git a/charts/graphscope-interactive/templates/engine/svc-headless.yaml b/charts/graphscope-interactive/templates/engine/svc-headless.yaml deleted file mode 100644 index a49094e692d2..000000000000 --- a/charts/graphscope-interactive/templates/engine/svc-headless.yaml +++ /dev/null @@ -1,37 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: {{ include "graphscope-interactive.engine.fullname" . }}-headless - namespace: {{ .Release.Namespace }} - labels: {{- include "graphscope-interactive.labels" . | nindent 4 }} - app.kubernetes.io/component: engine - {{- if .Values.commonLabels }} - {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }} - {{- end }} - annotations: - {{- if .Values.commonAnnotations }} - {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} - {{- end }} -spec: - type: {{ .Values.engine.service.type }} - {{- if and (eq .Values.engine.service.type "ClusterIP") .Values.engine.service.clusterIP }} - clusterIP: {{ .Values.engine.service.clusterIP }} - {{- end }} - {{- if and .Values.engine.service.loadBalancerIP (eq .Values.engine.service.type "LoadBalancer") }} - loadBalancerIP: {{ .Values.engine.service.loadBalancerIP }} - externalTrafficPolicy: {{ .Values.engine.service.externalTrafficPolicy | quote }} - {{- end }} - {{- if and (eq .Values.engine.service.type "LoadBalancer") .Values.engine.service.loadBalancerSourceRanges }} - loadBalancerSourceRanges: {{- toYaml .Values.engine.service.loadBalancerSourceRanges | nindent 4 }} - {{- end }} - ports: - - name: admin-port - port: {{ .Values.engine.service.adminPort }} - protocol: TCP - targetPort: admin-port - - name: query-port - port: {{ .Values.engine.service.queryPort }} - protocol: TCP - targetPort: query-port - selector: {{- include "graphscope-interactive.selectorLabels" . | nindent 4 }} - app.kubernetes.io/component: engine diff --git a/charts/graphscope-interactive/templates/primary/statefulset.yaml b/charts/graphscope-interactive/templates/primary/statefulset.yaml new file mode 100644 index 000000000000..b8c385730dcc --- /dev/null +++ b/charts/graphscope-interactive/templates/primary/statefulset.yaml @@ -0,0 +1,273 @@ +{{- $frontendFullname := include "graphscope-interactive.frontend.fullname" . }} +{{- $primaryFullName := include "graphscope-interactive.primary.fullname" . }} +{{- $secondaryFullName := include "graphscope-interactive.secondary.fullname" . }} +{{- $releaseNamespace := .Release.Namespace }} +{{- $clusterDomain := .Values.clusterDomain }} + +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {{ include "graphscope-interactive.primary.fullname" . }} + namespace: {{ .Release.Namespace }} + labels: {{- include "graphscope-interactive.labels" . | nindent 4 }} + app.kubernetes.io/component: primary + {{- if .Values.commonLabels }} + {{- include "graphscope-interactive.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }} + {{- end }} + {{- if .Values.commonAnnotations }} + annotations: {{- include "graphscope-interactive.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} + {{- end }} +spec: + replicas: {{ .Values.primary.replicaCount }} + selector: + matchLabels: {{ include "graphscope-interactive.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: primary + serviceName: {{ include "graphscope-interactive.primary.fullname" . }} + updateStrategy: + type: {{ .Values.primary.updateStrategy }} + {{- if (eq "Recreate" .Values.primary.updateStrategy) }} + rollingUpdate: null + {{- end }} + template: + metadata: + annotations: + {{- if .Values.primary.podAnnotations }} + {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.primary.podAnnotations "context" $) | nindent 8 }} + {{- end }} + labels: {{- include "graphscope-interactive.labels" . | nindent 8 }} + app.kubernetes.io/component: primary + {{- if .Values.primary.podLabels }} + {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.primary.podLabels "context" $) | nindent 8 }} + {{- end }} + spec: + {{- if .Values.imagePullSecrets }} + imagePullSecrets: {{- toYaml . | nindent 8 }} + {{- end }} + {{- if .Values.primary.hostAliases }} + hostAliases: {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.primary.hostAliases "context" $) | nindent 8 }} + {{- end }} + hostNetwork: {{ .Values.primary.hostNetwork }} + hostIPC: {{ .Values.primary.hostIPC }} + {{- if .Values.primary.schedulerName }} + schedulerName: {{ .Values.primary.schedulerName | quote }} + {{- end }} + {{- if .Values.nodeSelector }} + nodeSelector: {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.nodeSelector "context" $) | nindent 8 }} + {{- end }} + {{- if .Values.dnsPolicy }} + dnsPolicy: {{ .Values.dnsPolicy | quote }} + {{- end }} + {{- if .Values.dnsConfig }} + dnsConfig: {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.dnsConfig "context" $) | nindent 8 }} + {{- end }} + {{- if .Values.tolerations }} + tolerations: {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.tolerations "context" $) | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "graphscope-interactive.serviceAccountName" . }} + {{- if .Values.primary.affinity }} + affinity: {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.primary.affinity "context" $) | nindent 8 }} + {{- end }} + initContainers: + {{- if .Values.primary.initContainers }} + {{- include "graphscope-interactive.tplvalues.render" (dict "value" .Values.primary.initContainers "context" $) | nindent 8 }} + {{- end }} + containers: + - name: proxy-admin + image: {{ include "graphscope-interactive.primary.image" . }} + imagePullPolicy: {{ .Values.primary.image.pullPolicy | quote }} + # command: ["sleep", "infinity"] + command: + - /bin/bash + - -c + - | + POD_NAME=$MY_POD_NAME + if [ -z "$POD_NAME" ]; then + POD_NAME=$(hostname) + fi + echo "POD_NAME: $POD_NAME" + secondary_pod_dns_names="" + # cnt=1 + # for i from 0 to $SECONDARY_REPLICAS + for ((i=0; i @@ -245,10 +276,14 @@ engine: service: ## Service type ## - type: ClusterIP + type: NodePort ## Service port ## - servicePort: 55557 + ports: + - name: query_port + port: 10000 + targetPort: 10000 + protocol: TCP queryPort: 10000 @@ -296,8 +331,246 @@ engine: ## # maxUnavailable: 1 - ## GraphScope Interactive pod label. If labels are same as commonLabels , this will take precedence. + # ## GraphScope Interactive pod label. If labels are same as commonLabels , this will take precedence. + # ## + podLabels: {} + + +## GraphScope Interactive parameters +## +secondary: + image: + registry: registry.cn-hongkong.aliyuncs.com + repository: graphscope/interactive + # Overrides the image tag whose default is the chart appVersion. + tag: "debug" + ## Specify a imagePullPolicy + ## Defaults to 'Always' if image tag is 'latest', else set to 'IfNotPresent' + ## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images + ## + pullPolicy: Always + ## Optionally specify an array of imagePullSecrets (secrets must be manually created in the namespace) + ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ + ## Example: + ## pullSecrets: + ## - myRegistryKeySecretName + ## + pullSecrets: [] + + replicaCount: 2 + + logLevel: INFO + + # Number of thread each worker will use + threadNumPerWorker: 1 + + ## updateStrategy for GraphScope Interactive statefulset + ## ref: https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#update-strategies + ## + updateStrategy: RollingUpdate + + ## GraphScope Interactive pod annotations + ## ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/ + ## + podAnnotations: {} + + ## GraphScope Interactive pod affinity preset + ## ref: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#inter-pod-affinity-and-anti-affinity + ## Allowed values: soft, hard + ## + podAffinityPreset: "" + + ## GraphScope Interactive pod anti-affinity preset + ## ref: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#inter-pod-affinity-and-anti-affinity + ## Allowed values: soft, hard + ## + podAntiAffinityPreset: soft + + ## Affinity for GraphScope Interactive pods assignment + ## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity + ## Note: podAffinityPreset, podAntiAffinityPreset, and nodeAffinityPreset will be ignored when it's set + ## + affinity: {} + # affinity: + # nodeAffinity: + # requiredDuringSchedulingIgnoredDuringExecution: + # nodeSelectorTerms: + # - matchExpressions: + # - key: app + # operator: In + # values: + # - interactive_single_node + + ## Node labels for GraphScope Interactive pods assignment + ## ref: https://kubernetes.io/docs/user-guide/node-selection/ + ## + nodeSelector: {} + + hostAliases: {} + + hostIPC: false + + ## Tolerations for GraphScope Interactive pods assignment + ## ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/ + ## + tolerations: [] + + ## GraphScope Interactive Pod security context + ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod ## + podSecurityContext: + enabled: false + fsGroup: 1001 + + ## GraphScope Interactive container security context + ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-container + ## + containerSecurityContext: + enabled: false + runAsUser: 1001 + + ## GraphScope Interactive container's resource requests and limits + ## ref: http://kubernetes.io/docs/user-guide/compute-resources/ + ## + resources: + limits: + cpu: 320000m + memory: 200Gi + requests: + cpu: 32000m + memory: 200Gi + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + ## GraphScope Interactive container's liveness and readiness probes + ## ref: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#container-probes + ## + livenessProbe: + enabled: false + initialDelaySeconds: 120 + periodSeconds: 10 + timeoutSeconds: 1 + failureThreshold: 3 + successThreshold: 1 + readinessProbe: + enabled: false + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 1 + failureThreshold: 3 + successThreshold: 1 + + ## Enable persistence using Persistent Volume Claims + ## ref: http://kubernetes.io/docs/user-guide/persistent-volumes/ + ## + persistence: + ## If true, use a Persistent Volume Claim, If false, use emptyDir + ## + enabled: true + ## Name of existing PVC to hold GraphScope Interactive data + ## NOTE: When it's set the rest of persistence parameters are ignored + ## + # existingClaim: "graphscope-interactive-pvc" + existingClaim: "" + + ## Persistent Volume Storage Class + ## If defined, storageClassName: + ## If set to "-", storageClassName: "", which disables dynamic provisioning + ## If undefined (the default) or set to null, no storageClassName spec is + ## set, choosing the default provisioner. (gp2 on AWS, standard on + ## GKE, AWS & OpenStack) + ## + # storageClass: "manual" + ## Persistent Volume Claim annotations + ## + annotations: {} + ## Persistent Volume Access Mode + ## + accessModes: + - ReadWriteOnce # read and write by a single node. + ## Persistent Volume size + ## + size: 8Gi + ## selector can be used to match an existing PersistentVolume + ## selector: + ## matchLabels: + ## app: my-app + ## + selector: {} + + initContainers: [] + + ## GraphScope interactive Service parameters + ## + service: + ## Service type + ## + #type: ClusterIP + type: LoadBalancer + ## Service port + ## + ports: + - name: query_port + port: 10000 + targetPort: 10000 + protocol: TCP + + queryPort: 10000 + + adminPort: 7777 + + ## Specify the nodePort value for the LoadBalancer and NodePort service types. + ## ref: https://kubernetes.io/docs/concepts/services-networking/service/#type-nodeport + ## + nodePorts: + service: "" + query: "" + admin: "" + ## Service clusterIP + ## + # clusterIP: None + clusterIP: "" + ## Set the LoadBalancer service type to internal only. + ## ref: https://kubernetes.io/docs/concepts/services-networking/service/#internal-load-balancer + ## + loadBalancerIP: "" + ## Enable client source IP preservation + ## ref http://kubernetes.io/docs/tasks/access-application-cluster/create-external-load-balancer/#preserving-the-client-source-ip + ## + externalTrafficPolicy: Cluster + ## Load Balancer sources + ## https://kubernetes.io/docs/tasks/access-application-cluster/configure-cloud-provider-firewall/#restrict-access-for-loadbalancer-service + ## E.g. + ## loadBalancerSourceRanges: + ## - 10.10.10.0/24 + ## + loadBalancerSourceRanges: [] + ## Provide any additional annotations which may be required + ## + annotations: {} + + ## GraphScope Interactive Pod Disruption Budget configuration + ## ref: https://kubernetes.io/docs/tasks/run-application/configure-pdb/ + ## + pdb: + enabled: false + ## Min number of pods that must still be available after the eviction + ## + minAvailable: 1 + ## Max number of pods that can be unavailable after the eviction + ## + # maxUnavailable: 1 + + # ## GraphScope Interactive pod label. If labels are same as commonLabels , this will take precedence. + # ## podLabels: {} ## GraphScope Frontend parameters @@ -307,12 +580,12 @@ frontend: registry: registry.cn-hongkong.aliyuncs.com repository: graphscope/interactive # Overrides the image tag whose default is the chart appVersion. - tag: "v0.0.3" + tag: "debug" ## Specify a imagePullPolicy ## Defaults to 'Always' if image tag is 'latest', else set to 'IfNotPresent' ## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images ## - pullPolicy: IfNotPresent + pullPolicy: Always ## Optionally specify an array of imagePullSecrets (secrets must be manually created in the namespace) ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ ## Example: @@ -349,7 +622,7 @@ frontend: ## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity ## Note: podAffinityPreset, podAntiAffinityPreset, and nodeAffinityPreset will be ignored when it's set ## - ## affinity: {} + affinity: {} # affinity: # nodeAffinity: # requiredDuringSchedulingIgnoredDuringExecution: @@ -365,15 +638,25 @@ frontend: ## nodeSelector: {} + hostIPC: false + + hostAliases: {} + ## Tolerations for GraphScope Interactive pods assignment ## ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/ ## - tolerations: [] + # tolerations: [] ## GraphScope Interactive container's resource requests and limits ## ref: http://kubernetes.io/docs/user-guide/compute-resources/ ## - resources: {} + resources: + limits: + cpu: 1000m + memory: 1Gi + requests: + cpu: 1000m + memory: 1Gi # We usually recommend not to specify default resources and to leave this as a conscious # choice for the user. This also increases chances charts run on environments with little # resources, such as Minikube. If you do want to specify resources, uncomment the following @@ -410,10 +693,7 @@ frontend: service: ## Service type ## - type: LoadBalancer - ## Service port - ## - servicePort: 55556 + type: ClusterIP ## Gremlin console port ## @@ -471,5 +751,3 @@ frontend: ## podLabels: {} -global: - storageClass: "" diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index 3ec503a64c47..aeeb3907de33 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -15,6 +15,7 @@ option(BUILD_TEST "Whether to build test" ON) option(BUILD_DOC "Whether to build doc" OFF) option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" OFF) option(USE_PTHASH "Whether to use pthash" OFF) +option(BUILD_PROXY "Whether to build proxy" ON) #print options message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}") diff --git a/flex/bin/CMakeLists.txt b/flex/bin/CMakeLists.txt index cb3b1a271188..e3ab4b8ee009 100644 --- a/flex/bin/CMakeLists.txt +++ b/flex/bin/CMakeLists.txt @@ -32,6 +32,12 @@ if(BUILD_HQPS) install(PROGRAMS load_plan_and_gen.sh DESTINATION bin) endif() +if (BUILD_PROXY) + add_executable(proxy_server proxy_server.cc) + target_link_libraries(proxy_server flex_utils flex_server ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES}) + install_without_export_flex_target(proxy_server) +endif() + include_directories(${Boost_INCLUDE_DIRS}) add_executable(bulk_loader bulk_loader.cc) target_link_libraries(bulk_loader flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES}) diff --git a/flex/bin/bulk_loader.cc b/flex/bin/bulk_loader.cc index 12e385a80fcd..5892ccbb5a20 100644 --- a/flex/bin/bulk_loader.cc +++ b/flex/bin/bulk_loader.cc @@ -37,10 +37,10 @@ void signal_handler(int signal) { << ",Clearing directory: " << work_dir << ", exiting..."; // remove all files in work_dir std::filesystem::remove_all(work_dir); - exit(0); + exit(signal); } else { LOG(ERROR) << "Received unexpected signal " << signal << ", exiting..."; - exit(1); + exit(signal); } } diff --git a/flex/bin/proxy_server.cc b/flex/bin/proxy_server.cc new file mode 100644 index 000000000000..1bc3c1e5919a --- /dev/null +++ b/flex/bin/proxy_server.cc @@ -0,0 +1,130 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "stdlib.h" + +#include "flex/engines/hqps_db/core/utils/hqps_utils.h" +#include "flex/engines/http_server/service/proxy_service.h" +#include "flex/utils/service_utils.h" + +#include + +#include + +namespace bpo = boost::program_options; + +namespace gs { +// Function to parse endpoints from a string +bool parse_endpoints(const std::string& input_string, + std::vector>& endpoints) { + std::istringstream iss(input_string); + std::string endpoint; + + while (std::getline(iss, endpoint, ',')) { + // Split the endpoint into host and port using ':' + size_t delimiter_pos = endpoint.find(':'); + if (delimiter_pos == std::string::npos) { + std::cerr << "Invalid endpoint: " << endpoint << ", missing delimiter ':'" + << std::endl; + continue; + } + + std::string host = endpoint.substr(0, delimiter_pos); + std::string port_str = endpoint.substr(delimiter_pos + 1); + uint16_t port; + try { + port = std::stoull(port_str); + } catch (const std::invalid_argument& e) { + LOG(ERROR) << "Invalid port: " << port_str << ", must be a number" + << std::endl; + return false; + } + + // Check for valid port range + if (port < 1 || port > 65535) { + LOG(ERROR) << "Invalid port: " << port << ", must be between 1 and 65535" + << std::endl; + return false; + } + endpoints.push_back({host, port}); + } + return true; +} +} // namespace gs + +/** + * The main entrance for ProxyServer. + * The ProxyServer will block if one request is not executed by the server. + */ +int main(int argc, char** argv) { + bpo::options_description desc("Usage:"); + desc.add_options()("help,h", "Display help messages")( + "endpoints,e", bpo::value()->required(), + "The endpoints of the proxy server, e.g., {ip}:{port},{ip}:{port},...")( + "heartbeat-interval,i", bpo::value()->default_value(1), + "The interval of heartbeat check in seconds")( + "enable-heartbeat-check", bpo::value()->default_value(false), + "Enable heartbeat check or not")( + "port,p", bpo::value()->default_value(9999), + "The port of the proxy server")( + "hang-until-success", bpo::value()->default_value(true), + "Hang until the request is successfully forwarded"); + + setenv("TZ", "Asia/Shanghai", 1); + tzset(); + + bpo::variables_map vm; + bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm); + bpo::notify(vm); + + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + + if (!vm.count("endpoints")) { + LOG(FATAL) << "endpoints is not specified"; + return 0; + } + std::vector> endpoints; + if (!gs::parse_endpoints(vm["endpoints"].as(), endpoints)) { + LOG(FATAL) << "Failed to parse endpoints"; + return 0; + } + + LOG(INFO) << "got endpoints of size: " << endpoints.size() + << ", :" << gs::to_string(endpoints); + + uint32_t shard_num = 1; + uint16_t http_port = 9999; + if (vm.count("port")) { + http_port = vm["port"].as(); + } + + if (!server::ProxyService::get() + .init(shard_num, http_port, endpoints, + vm["enable-heartbeat-check"].as(), + vm["heartbeat-interval"].as(), + vm["hang-until-success"].as()) + .ok()) { + LOG(FATAL) << "Failed to init ProxyService"; + return 0; + } + server::ProxyService::get().run_and_wait_for_exit(); + + return 0; +} diff --git a/flex/engines/hqps_db/core/utils/hqps_utils.h b/flex/engines/hqps_db/core/utils/hqps_utils.h index 5cae552776e0..781693e74e49 100644 --- a/flex/engines/hqps_db/core/utils/hqps_utils.h +++ b/flex/engines/hqps_db/core/utils/hqps_utils.h @@ -803,6 +803,13 @@ struct to_string_impl { } }; +template <> +struct to_string_impl { + static inline std::string to_string(const uint16_t& empty) { + return std::to_string((int32_t) empty); + } +}; + template <> struct to_string_impl { static inline std::string to_string(const int64_t& empty) { diff --git a/flex/engines/http_server/CMakeLists.txt b/flex/engines/http_server/CMakeLists.txt index 07cd7034d813..a7c80d3d1419 100644 --- a/flex/engines/http_server/CMakeLists.txt +++ b/flex/engines/http_server/CMakeLists.txt @@ -11,6 +11,10 @@ if (Hiactor_FOUND) list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*codegen.*") endif () + if (NOT BUILD_PROXY) + list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*proxy.*") + endif () + # get all .cc files in current directory, except for generated/ file(GLOB_RECURSE SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") list(FILTER SERVER_FILES EXCLUDE REGEX ".*generated.*") @@ -22,6 +26,14 @@ if (Hiactor_FOUND) list(FILTER SERVER_FILES EXCLUDE REGEX ".*workdir_manipulator*") endif () + if (NOT BUILD_PROXY) + list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_actor*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_http*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_service*") + endif () + + message(STATUS "SERVER_FILES: ${SERVER_FILES}") + add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files}) add_dependencies(flex_server server_actor_autogen) target_compile_options (flex_server diff --git a/flex/engines/http_server/actor/proxy_actor.act.cc b/flex/engines/http_server/actor/proxy_actor.act.cc new file mode 100644 index 000000000000..3c87fad5790a --- /dev/null +++ b/flex/engines/http_server/actor/proxy_actor.act.cc @@ -0,0 +1,109 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/actor/proxy_actor.act.h" +#include "flex/engines/http_server/service/proxy_service.h" + +#include "nlohmann/json.hpp" + +#include + +namespace server { + +proxy_actor::~proxy_actor() { + // finalization + // ... +} + +proxy_actor::proxy_actor(hiactor::actor_base* exec_ctx, + const hiactor::byte_t* addr) + : hiactor::actor(exec_ctx, addr) { + set_max_concurrency(1); // set max concurrency for task reentrancy (stateful) + // initialization + // ... +} + +seastar::future proxy_actor::do_query( + proxy_request&& request_payload) { + auto& request = request_payload.content; + VLOG(10) << "proxy_actor::forward_request, method: " << request->_method + << ", path: " << request->_url << ", query: " << request->content; + + // recover the old url with paramters in request + auto& proxy_service = ProxyService::get(); + auto& client = proxy_service.get_client(); + return client + .forward_request(request->_url, request->_method, request->content, + request->_headers) + .then([&proxy_service](gs::Result&& result) { + if (!result.ok()) { + return seastar::make_ready_future( + proxy_query_result{(result.status())}); + } + auto& content = result.value(); + if (content.size() == 0) { + return seastar::make_exception_future( + std::runtime_error("Got no responses when forwarding request " + "to interactive servers.")); + } + // Check all responses are ok, if not ok, return error + seastar::sstring res_string; + size_t error_count = 0; + for (size_t i = 0; i < content.size(); ++i) { + auto& response = content[i]; + if (response.first != 200) { + error_count++; + } + } + if (error_count == 0) { + res_string = content[0].second; + return seastar::make_ready_future( + proxy_query_result{std::move(res_string)}); + } else { + res_string = + "Got error response when forwarding request " + "to interactive servers, error count: " + + std::to_string(error_count) + "\n"; + for (size_t i = 0; i < content.size(); ++i) { + auto& response = content[i]; + if (response.first != 200) { + LOG(ERROR) << "Got error response when forwarding request " + "to interactive servers at index: " + << std::to_string(i) << ", endpoint: " + << proxy_service.get_endpoints()[i].first + ":" + << std::to_string( + proxy_service.get_endpoints()[i].second) + << std::to_string(response.first) + ", msg:" + << response.second; + std::string tmp = + "Got error response when forwarding request " + "to interactive servers at index: " + + std::to_string(i) + + ", endpoint: " + proxy_service.get_endpoints()[i].first + + ":" + + std::to_string(proxy_service.get_endpoints()[i].second) + + ", code: " + std::to_string(response.first) + + ", msg: " + response.second + "\n"; + res_string += tmp; + } + } + return seastar::make_ready_future( + proxy_query_result{gs::Result(gs::Status( + gs::StatusCode::QueryFailed, std::move(res_string)))}); + } + }); +} + +} // namespace server diff --git a/flex/engines/http_server/actor/proxy_actor.act.h b/flex/engines/http_server/actor/proxy_actor.act.h new file mode 100644 index 000000000000..05f606d16d6f --- /dev/null +++ b/flex/engines/http_server/actor/proxy_actor.act.h @@ -0,0 +1,44 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_ +#define ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_ + + +#include "flex/engines/http_server/types.h" + +#include +#include +#include + +namespace server { + +class ANNOTATION(actor:impl) proxy_actor : public hiactor::actor { + public: + proxy_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr); + ~proxy_actor() override; + + seastar::future ANNOTATION(actor:method) do_query(proxy_request&& param); + + // DECLARE_RUN_QUERIES; + /// Declare `do_work` func here, no need to implement. + ACTOR_DO_WORK() + + private: + int32_t your_private_members_ = 0; +}; +} + +#endif // ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_ \ No newline at end of file diff --git a/flex/engines/http_server/handler/admin_http_handler.cc b/flex/engines/http_server/handler/admin_http_handler.cc index f7c7ad3707c2..096cf48de06d 100644 --- a/flex/engines/http_server/handler/admin_http_handler.cc +++ b/flex/engines/http_server/handler/admin_http_handler.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "flex/engines/http_server/generated/actor/admin_actor_ref.act.autogen.h" #include "flex/engines/http_server/types.h" diff --git a/flex/engines/http_server/handler/admin_http_handler.h b/flex/engines/http_server/handler/admin_http_handler.h index 14465e459a95..c7774b56934f 100644 --- a/flex/engines/http_server/handler/admin_http_handler.h +++ b/flex/engines/http_server/handler/admin_http_handler.h @@ -16,13 +16,15 @@ #ifndef ENGINES_HTTP_SERVER_HANDLER_ADMIN_HTTP_HANDLER_H_ #define ENGINES_HTTP_SERVER_HANDLER_ADMIN_HTTP_HANDLER_H_ -#include -#include #include #include "flex/engines/http_server/handler/http_utils.h" #include "flex/engines/http_server/types.h" #include "flex/utils/service_utils.h" +#include +#include +#include + namespace server { class InteractiveAdminService; diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 2ddd83c984ab..cdec4aaa462f 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -20,6 +20,7 @@ #include "opentelemetry/trace/span_startoptions.h" #endif // HAVE_OPENTELEMETRY_CPP +#include #include "flex/engines/graph_db/database/graph_db_session.h" #include "flex/engines/http_server/executor_group.actg.h" #include "flex/engines/http_server/options.h" @@ -72,6 +73,33 @@ class optional_param_matcher : public matcher { namespace server { +hqps_heartbeat_handler::hqps_heartbeat_handler() {} + +hqps_heartbeat_handler::~hqps_heartbeat_handler() = default; + +// TODO: return snapshot_id. +seastar::future> +hqps_heartbeat_handler::handle(const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) { + if (path.find("sampleQuery") != seastar::sstring::npos) { + using namespace std::chrono_literals; + LOG(INFO) << "Before sampleQuery"; + return seastar::sleep(10s).then([rep = std::move(rep)]() mutable { + rep->write_body("bin", seastar::sstring{"OK"}); + rep->done(); + LOG(INFO) << "Finish sampleQuery"; + return seastar::make_ready_future>( + std::move(rep)); + }); + } else { + rep->write_body("bin", seastar::sstring{"Heartbeat OK"}); + rep->done(); + return seastar::make_ready_future>( + std::move(rep)); + } +} + hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency) @@ -537,6 +565,7 @@ hqps_http_handler::hqps_http_handler(uint16_t http_port, int32_t shard_num) : http_port_(http_port), actors_running_(true) { ic_handlers_.resize(shard_num); adhoc_query_handlers_.resize(shard_num); + heart_beat_handler_ = new hqps_heartbeat_handler(); } hqps_http_handler::~hqps_http_handler() { @@ -627,6 +656,10 @@ seastar::future<> hqps_http_handler::set_routes() { ic_handlers_[hiactor::local_shard_id()] = ic_handler; adhoc_query_handlers_[hiactor::local_shard_id()] = adhoc_query_handler; + r.add(seastar::httpd::operation_type::GET, + seastar::httpd::url("/heartbeat"), heart_beat_handler_); + r.add(seastar::httpd::operation_type::GET, + seastar::httpd::url("/sampleQuery"), heart_beat_handler_); return seastar::make_ready_future<>(); }); diff --git a/flex/engines/http_server/handler/hqps_http_handler.h b/flex/engines/http_server/handler/hqps_http_handler.h index a89e97dfe6e5..30ac4448f725 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.h +++ b/flex/engines/http_server/handler/hqps_http_handler.h @@ -29,6 +29,19 @@ namespace server { +class hqps_heartbeat_handler : public seastar::httpd::handler_base { + public: + hqps_heartbeat_handler(); + ~hqps_heartbeat_handler() override; + + seastar::future> handle( + const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) override; + + private: +}; + class hqps_ic_handler : public seastar::httpd::handler_base { public: // extra headers @@ -133,6 +146,7 @@ class hqps_http_handler { std::vector ic_handlers_; std::vector adhoc_query_handlers_; + hqps_heartbeat_handler* heart_beat_handler_; }; } // namespace server diff --git a/flex/engines/http_server/handler/http_proxy.cc b/flex/engines/http_server/handler/http_proxy.cc new file mode 100644 index 000000000000..56db91b63c15 --- /dev/null +++ b/flex/engines/http_server/handler/http_proxy.cc @@ -0,0 +1,271 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/handler/http_proxy.h" +#include "flex/engines/hqps_db/core/utils/hqps_utils.h" + +#include + +namespace server { + +HeartBeatChecker::HeartBeatChecker( + std::vector& clients, + const std::vector>& endpoints, + int32_t heart_beat_interval) + : running_(false), + heart_beat_interval_(DEFAULT_HEART_BEAT_INTERVAL), + clients_(clients), + endpoints_(endpoints) { + endpoint_status_.resize(clients.size(), true); +} + +HeartBeatChecker::~HeartBeatChecker() { + if (running_) { + stop(); + } +} + +gs::Status HeartBeatChecker::start() { + running_ = true; + heartbeat_thread_ = std::thread(&HeartBeatChecker::check_heartbeat, this); + VLOG(10) << "HeartBeatChecker started"; + return gs::Status::OK(); +} + +gs::Status HeartBeatChecker::stop() { + running_ = false; + VLOG(10) << "Stopping HeartBeatChecker"; + while (!heartbeat_thread_.joinable()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + heartbeat_thread_.join(); + VLOG(10) << "HeartBeatChecker stopped"; + return gs::Status::OK(); +} + +void HeartBeatChecker::check_heartbeat() { + while (running_) { + for (size_t i = 0; i < clients_.size(); ++i) { + httplib::Client client(endpoints_[i].first, endpoints_[i].second); + auto res = client.Get("/"); + if (!res) { + LOG(ERROR) << "Failed to connect to endpoint at index: " << i; + endpoint_status_[i] = false; + } else { + VLOG(10) << "Heartbeat check to " << i << " is OK"; + endpoint_status_[i] = true; + } + } + std::this_thread::sleep_for(std::chrono::seconds(heart_beat_interval_)); + } +} + +const std::vector& HeartBeatChecker::get_endpoint_status() const { + return endpoint_status_; +} + +// Utils functions + +HttpForwardingResponse to_response(const httplib::Result& res) { + if (res.error() != httplib::Error::Success) { + LOG(ERROR) << "Failed to send request: " << res.error(); + return std::make_pair(-1, httplib::to_string(res.error())); + } + return std::make_pair(res->status, res->body); +} + +// std::multimap; +httplib::Headers to_httplib_headers(const seastar_http_headers_t& headers) { + httplib::Headers httplib_headers; + for (auto& header : headers) { + // Those headers should not be forwarded, otherwise will cause error. + if (header.first == "Host" || header.first == "User-Agent" || + header.first == "Content-Length") { + continue; + } + httplib_headers.emplace(std::string(header.first.c_str()), + std::string(header.second.c_str())); + } + return httplib_headers; +} + +HttpProxy::HttpProxy() : initialized_(false), enable_heart_beat_check_(false) {} + +HttpProxy::~HttpProxy() { close(); } + +void HttpProxy::close() { + if (initialized_) { + if (heartbeat_checker_) { + heartbeat_checker_->stop(); + } + for (auto& client : clients_) { + client.stop(); + } + initialized_ = false; + } +} + +gs::Status HttpProxy::init( + const std::vector>& endpoints, + bool enable_heart_beat_check, int32_t heart_beat_interval, + bool hang_until_success) { + enable_heart_beat_check_ = enable_heart_beat_check; + hang_until_success_ = hang_until_success; + endpoints_ = endpoints; + if (endpoints_.empty()) { + return gs::Status(gs::StatusCode::InValidArgument, "No endpoint provided"); + } + // TODO: check connection to endpoint, if not connected, return error + clients_.reserve(endpoints_.size()); + for (auto& endpoint : endpoints_) { + httplib::Client client(endpoint.first, endpoint.second); + client.set_connection_timeout(CONNECTION_TIMEOUT, 0); // 5s + client.set_read_timeout(READ_TIMEOUT, 0); // 10s + client.set_write_timeout(WRITE_TIMEOUT, 0); // 10s + clients_.emplace_back(std::move(client)); + } + // test connection + for (auto& client : clients_) { + auto res = client.Get("/heartbeat"); + if (!res) { + return gs::Status(gs::StatusCode::InternalError, + "Failed to connect to endpoint"); + } + } + // start heart beat check + if (enable_heart_beat_check_) { + heartbeat_checker_ = + std::make_unique(clients_, endpoints_); + RETURN_IF_NOT_OK(heartbeat_checker_->start()); + } + initialized_ = true; + return gs::Status::OK(); +} + +seastar::future> HttpProxy::forward_request( + const std::string& path, const std::string& method, const std::string& body, + const seastar_http_headers_t& headers) { + LOG(INFO) << "Forwarding request to " << path << ", method: " << method + << ", body: " << body << ", headers: " << headers.size(); + if (!initialized_) { + return seastar::make_ready_future>( + HttpForwardingResponses{}); + } + // std::vector> reply_futs; + // Get the status of the endpoints from last heartbeat check + { + bool all_endpoints_ready = true; + if (heartbeat_checker_) { + const auto& endpoint_status = heartbeat_checker_->get_endpoint_status(); + // First check if all the endpoints + for (size_t i = 0; i < clients_.size(); ++i) { + if (!endpoint_status[i]) { + LOG(WARNING) << "Endpoint at index " << i << " is not available"; + all_endpoints_ready = false; + } + } + } + if (!all_endpoints_ready) { + // TODO: add results to indicate the endpoint is not available + return seastar::make_ready_future>( + HttpForwardingResponses{}); + } + } + // HttpForwardingResponses replies; + // First send to client 0 and then send to client 1 + return do_send_requests(path, method, body, headers, clients_) + .then_wrapped([](seastar::future&& fut) { + try { + auto responses = fut.get(); + return gs::Result(std::move(responses)); + } catch (const std::exception& e) { + return gs::Result( + gs::Status(gs::StatusCode::InternalError, e.what())); + } + }); +} + +seastar::future HttpProxy::do_send_request( + const std::string& path, const std::string& method, const std::string& body, + const seastar_http_headers_t& headers, + std::vector& clients, size_t ind, + HttpForwardingResponses&& responses) { + if (ind >= clients.size()) { + return seastar::make_ready_future( + std::move(responses)); + } + + if (method != "GET" && method != "POST" && method != "DELETE" && + method != "PUT") { + LOG(ERROR) << "Unsupported method: " << method; + return seastar::make_exception_future( + std::runtime_error("Unsupported method: " + method)); + } + + HttpForwardingResponse response; + + auto lambda = [this, &path, &method, &body, &headers, &clients, ind, + &responses]() { + if (method == "GET") { + VLOG(10) << "Forwarding GET request to " << path; + return to_response( + clients[ind].Get(path.c_str(), to_httplib_headers(headers))); + } else if (method == "POST") { + return to_response(clients[ind].Post( + path.c_str(), to_httplib_headers(headers), body, "application/json")); + } else if (method == "DELETE") { + return to_response( + clients[ind].Delete(path.c_str(), to_httplib_headers(headers))); + } else { // must be put + return to_response(clients[ind].Put( + path.c_str(), to_httplib_headers(headers), body, "application/json")); + } + }; + + if (hang_until_success_) { + while (true) { + response = lambda(); + if (response.first == 200) { + responses.emplace_back(std::move(response)); + break; + } else { + LOG(ERROR) << "Failed to send request to endpoint at index " << ind + << ", status: " << response.first + << ", msg: " << response.second; + if (response.first == 404) { + LOG(ERROR) << "Endpoint not found, skip it"; + responses.emplace_back(std::move(response)); + break; + } + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + } + } else { + response = lambda(); + responses.emplace_back(std::move(response)); + } + return do_send_request(path, method, body, headers, clients, ind + 1, + std::move(responses)); +} +seastar::future HttpProxy::do_send_requests( + const std::string& path, const std::string& method, const std::string& body, + const seastar_http_headers_t& headers, + std::vector& clients) { + HttpForwardingResponses responses; + return do_send_request(path, method, body, headers, clients, 0, + std::move(responses)); +} + +} // namespace server \ No newline at end of file diff --git a/flex/engines/http_server/handler/http_proxy.h b/flex/engines/http_server/handler/http_proxy.h new file mode 100644 index 000000000000..5abe25d03276 --- /dev/null +++ b/flex/engines/http_server/handler/http_proxy.h @@ -0,0 +1,112 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef ENGINES_HTTP_SERVER_HANDLER_FORWARD_HTTP_CLIENT_H_ +#define ENGINES_HTTP_SERVER_HANDLER_FORWARD_HTTP_CLIENT_H_ + +#include + +#include "flex/third_party/httplib.h" +#include "flex/utils/result.h" + +#include +#include + +namespace server { + +class HeartBeatChecker { + public: + static constexpr int32_t DEFAULT_HEART_BEAT_INTERVAL = 2; // 2s + HeartBeatChecker( + std::vector& clients, + const std::vector>& endpoints, + int32_t heart_beat_interval = DEFAULT_HEART_BEAT_INTERVAL); + ~HeartBeatChecker(); + + gs::Status start(); + + gs::Status stop(); + + const std::vector& get_endpoint_status() const; + + private: + void check_heartbeat(); + + std::atomic running_; + int32_t heart_beat_interval_; + std::vector& clients_; + const std::vector>& endpoints_; + std::vector endpoint_status_; // to mark whether the endpoint is alive + std::thread heartbeat_thread_; +}; + +using HttpForwardingResponse = std::pair; +using HttpForwardingResponses = std::vector; +using seastar_http_headers_t = + std::unordered_map; + +// A wrapped http client which will send request to multiple endpoints and +// return the summary of the responses. +// It will do heartbeat check to the endpoints to make sure the endpoints are +// available. +// Currently, we don't distinguish the read/write requests, we just +// send the request to all the endpoints. +class HttpProxy { + public: + static constexpr int32_t CONNECTION_TIMEOUT = 5; // 5s + static constexpr int32_t READ_TIMEOUT = 30; // 5s + static constexpr int32_t WRITE_TIMEOUT = 30; // 10s + HttpProxy(); + ~HttpProxy(); + + gs::Status init( + const std::vector>& endpoints, + bool enable_heart_beat_check = false, + int32_t heart_beat_interval = + HeartBeatChecker::DEFAULT_HEART_BEAT_INTERVAL, + bool hang_until_success = true); + + void close(); + + seastar::future> forward_request( + const std::string& path, const std::string& method, + const std::string& body, const seastar_http_headers_t& headers); + + private: + seastar::future do_send_request( + const std::string& path, const std::string& method, + const std::string& body, const seastar_http_headers_t& headers, + std::vector& clients, size_t ind, + HttpForwardingResponses&& responses); + + seastar::future do_send_requests( + const std::string& path, const std::string& method, + const std::string& body, const seastar_http_headers_t& headers, + std::vector& clients); + + std::atomic initialized_; + bool enable_heart_beat_check_; + bool hang_until_success_; + std::vector> endpoints_; // ip and ports + + std::vector clients_; + + std::unique_ptr heartbeat_checker_; +}; + +} // namespace server + +#endif // ENGINES_HTTP_SERVER_HANDLER_FORWARD_HTTP_CLIENT_H_ diff --git a/flex/engines/http_server/handler/http_utils.h b/flex/engines/http_server/handler/http_utils.h index e303beb68d35..94c874ab0450 100644 --- a/flex/engines/http_server/handler/http_utils.h +++ b/flex/engines/http_server/handler/http_utils.h @@ -12,9 +12,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "flex/engines/http_server/types.h" #include "flex/utils/result.h" -#include "seastar/http/reply.hh" + +#include #ifndef ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_ #define ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_ diff --git a/flex/engines/http_server/handler/proxy_http_handler.cc b/flex/engines/http_server/handler/proxy_http_handler.cc new file mode 100644 index 000000000000..9f9e51359198 --- /dev/null +++ b/flex/engines/http_server/handler/proxy_http_handler.cc @@ -0,0 +1,95 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/handler/proxy_http_handler.h" + +#include "flex/engines/http_server/executor_group.actg.h" +#include "flex/engines/http_server/handler/http_utils.h" +#include "flex/engines/http_server/options.h" + +#include "flex/engines/http_server/types.h" + +namespace server { + +proxy_http_forward_handler::proxy_http_forward_handler( + uint32_t group_id, uint32_t shard_concurrency) + : executor_idx_(0), shard_concurrency_(shard_concurrency) { + executor_refs_.reserve(shard_concurrency_); + hiactor::scope_builder builder; + builder.set_shard(hiactor::local_shard_id()) + .enter_sub_scope(hiactor::scope(0)) + .enter_sub_scope(hiactor::scope(group_id)); + for (unsigned i = 0; i < shard_concurrency_; ++i) { + executor_refs_.emplace_back(builder.build_ref(i)); + } +} + +seastar::future> +proxy_http_forward_handler::handle(const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) { + auto dst_executor = executor_idx_; + executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; + + return executor_refs_[dst_executor] + .do_query(proxy_request{std::move(req)}) + .then_wrapped([rep = std::move(rep)]( + seastar::future&& fut) mutable { + return return_reply_with_result(std::move(rep), std::move(fut)); + // if (__builtin_expect(fut.failed(), false)) { + // return seastar::make_exception_future< + // std::unique_ptr>(fut.get_exception()); + // } + // auto result = fut.get0(); + // rep->write_body("bin", std::move(result.content)); + // rep->done(); + // return seastar::make_ready_future< + // std::unique_ptr>(std::move(rep)); + }); +} + +proxy_http_handler::proxy_http_handler(uint16_t http_port) + : http_port_(http_port) {} + +void proxy_http_handler::start() { + auto fut = seastar::alien::submit_to( + *seastar::alien::internal::default_instance, 0, [this] { + return server_.start() + .then([this] { return set_routes(); }) + .then([this] { return server_.listen(http_port_); }) + .then([this] { + fmt::print("Http handler is listening on port {} ...\n", + http_port_); + }); + }); + fut.wait(); +} + +void proxy_http_handler::stop() { + auto fut = + seastar::alien::submit_to(*seastar::alien::internal::default_instance, 0, + [this] { return server_.stop(); }); + fut.wait(); +} + +seastar::future<> proxy_http_handler::set_routes() { + return server_.set_routes([](seastar::httpd::routes& r) { + r.add_default_handler(new proxy_http_forward_handler( + proxy_group_id, shard_proxy_concurrency)); + return seastar::make_ready_future<>(); + }); +} + +} // namespace server \ No newline at end of file diff --git a/flex/engines/http_server/handler/proxy_http_handler.h b/flex/engines/http_server/handler/proxy_http_handler.h new file mode 100644 index 000000000000..b360f9390608 --- /dev/null +++ b/flex/engines/http_server/handler/proxy_http_handler.h @@ -0,0 +1,63 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_HTTP_SERVER_HANDLER_PROXY_HTTP_HANDLER_H_ +#define ENGINES_HTTP_SERVER_HANDLER_PROXY_HTTP_HANDLER_H_ + +#include +#include +#include +#include + +#include "flex/engines/http_server/generated/actor/proxy_actor_ref.act.autogen.h" + +namespace server { + +class proxy_http_forward_handler : public seastar::httpd::handler_base { + public: + proxy_http_forward_handler(uint32_t group_id, uint32_t shard_concurrency); + + ~proxy_http_forward_handler() = default; + + seastar::future> handle( + const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) override; + + private: + uint32_t executor_idx_; + const uint32_t shard_concurrency_; + std::vector executor_refs_; +}; + +// TODO: How to distinguish between read requests and write requests? +class proxy_http_handler { + public: + proxy_http_handler(uint16_t http_port); + + void start(); + void stop(); + + private: + seastar::future<> set_routes(); + + private: + const uint16_t http_port_; + seastar::httpd::http_server_control server_; +}; + +} // namespace server + +#endif // ENGINES_HTTP_SERVER_HANDLER_PROXY_HTTP_HANDLER_H_ \ No newline at end of file diff --git a/flex/engines/http_server/options.cc b/flex/engines/http_server/options.cc index 2f7c0441acf3..372a1465743b 100644 --- a/flex/engines/http_server/options.cc +++ b/flex/engines/http_server/options.cc @@ -25,5 +25,6 @@ uint32_t shard_admin_procedure_concurrency = 1; uint32_t shard_admin_node_concurrency = 1; uint32_t shard_admin_job_concurrency = 1; uint32_t shard_admin_service_concurrency = 1; +uint32_t shard_proxy_concurrency = 16; // same as shard_query_concurrency } // namespace server diff --git a/flex/engines/http_server/options.h b/flex/engines/http_server/options.h index b7110a9d6620..62256b32bf36 100644 --- a/flex/engines/http_server/options.h +++ b/flex/engines/http_server/options.h @@ -28,6 +28,7 @@ const uint32_t ic_update_group_id = 3; const uint32_t ic_adhoc_group_id = 4; const uint32_t codegen_group_id = 5; const uint32_t proc_query_group_id = 6; +const uint32_t proxy_group_id = 7; const uint32_t max_group_id = std::numeric_limits::max(); const uint32_t group_inc_step = @@ -42,6 +43,7 @@ extern uint32_t shard_admin_node_concurrency; extern uint32_t shard_admin_service_concurrency; extern uint32_t shard_admin_job_concurrency; extern uint32_t shard_admin_procedure_concurrency; +extern uint32_t shard_proxy_concurrency; } // namespace server diff --git a/flex/engines/http_server/service/proxy_service.cc b/flex/engines/http_server/service/proxy_service.cc new file mode 100644 index 000000000000..aa9685d7f10e --- /dev/null +++ b/flex/engines/http_server/service/proxy_service.cc @@ -0,0 +1,63 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/service/proxy_service.h" + +namespace server { + +gs::Status ProxyService::init( + uint32_t num_shards, uint16_t http_port, + const std::vector>& endpoints, + bool enable_heartbeat, int32_t heart_beat_interval, + bool hang_until_success) { + proxy_port_ = http_port; + endpoints_ = endpoints; + actor_sys_ = std::make_unique(num_shards, false); + http_hdl_ = std::make_unique(http_port); + auto init_res = client.init(endpoints, enable_heartbeat, heart_beat_interval, + hang_until_success); + if (!init_res.ok()) { + LOG(ERROR) << "Failed to init HttpProxy"; + return gs::Status(gs::StatusCode::InternalError, + "Failed to init HttpProxy" + init_res.error_message()); + } + return gs::Status::OK(); +} + +void ProxyService::run_and_wait_for_exit() { + if (!actor_sys_ || !http_hdl_) { + std::cerr << "GraphDB service has not been inited!" << std::endl; + return; + } + actor_sys_->launch(); + http_hdl_->start(); + running_.store(true); + while (running_.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + http_hdl_->stop(); + actor_sys_->terminate(); +} + +const std::vector>& +ProxyService::get_endpoints() const { + return endpoints_; +} + +HttpProxy& ProxyService::get_client() { return client; } + +void ProxyService::set_exit_state() { running_.store(false); } + +} // namespace server diff --git a/flex/engines/http_server/service/proxy_service.h b/flex/engines/http_server/service/proxy_service.h new file mode 100644 index 000000000000..9632818cdafe --- /dev/null +++ b/flex/engines/http_server/service/proxy_service.h @@ -0,0 +1,66 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_HTTP_SERVER_SERVICE_PROXY_SERVICE_H_ +#define ENGINES_HTTP_SERVER_SERVICE_PROXY_SERVICE_H_ + +#include +#include +#include +#include + +#include "flex/engines/http_server/actor_system.h" +#include "flex/engines/http_server/handler/http_proxy.h" +#include "flex/engines/http_server/handler/proxy_http_handler.h" +#include "flex/utils/result.h" +#include "flex/utils/service_utils.h" + +namespace server { +class ProxyService { + public: + static ProxyService& get() { + static ProxyService instance; + return instance; + } + + ~ProxyService() = default; + + gs::Status init( + uint32_t num_shards, uint16_t http_port, + const std::vector>& endpoints, + bool enable_heartbeat = false, + int32_t heart_beat_interval = + HeartBeatChecker::DEFAULT_HEART_BEAT_INTERVAL, + bool hang_until_success = true); + void run_and_wait_for_exit(); + const std::vector>& get_endpoints() const; + void set_exit_state(); + + HttpProxy& get_client(); + + private: + ProxyService() = default; + + private: + uint32_t proxy_port_; + std::vector> endpoints_; + std::unique_ptr actor_sys_; + std::unique_ptr http_hdl_; + std::atomic running_{false}; + HttpProxy client; +}; +} // namespace server + +#endif // ENGINES_HTTP_SERVER_SERVICE_PROXY_SERVICE_H_ diff --git a/flex/engines/http_server/types.h b/flex/engines/http_server/types.h index f43cd6d4fe08..44a39b0dd83f 100644 --- a/flex/engines/http_server/types.h +++ b/flex/engines/http_server/types.h @@ -20,8 +20,10 @@ #include #include #include +#include #include "flex/utils/service_utils.h" +#include #include namespace server { @@ -54,7 +56,9 @@ struct payload { }; using query_param = payload; +using proxy_request = payload>; using query_result = payload; +using proxy_query_result = payload>; using admin_query_result = payload>; // url_path, query_param using graph_management_param = diff --git a/flex/interactive/docker/entrypoint.sh b/flex/interactive/docker/entrypoint.sh index 6616f8ce15d1..217bfab021a9 100644 --- a/flex/interactive/docker/entrypoint.sh +++ b/flex/interactive/docker/entrypoint.sh @@ -18,10 +18,11 @@ set -e DEFAULT_GRAPH_NAME=gs_interactive_default_graph BULK_LOADER_BINARY_PATH=/opt/flex/bin/bulk_loader INTERACTIVE_SERVER_BIN=/opt/flex/bin/interactive_server +PROXY_SERVER_BIN=/opt/flex/bin/proxy_server function usage() { cat << EOF - Usage: $0 -w[--workspace] + Usage: $0 -w[--workspace] -t[--type] -e[--endpoints] This is the entrypoint script for the interactive container. Options: -h, --help: show this help message and exit @@ -32,6 +33,11 @@ function usage() { -c, --enable-coordinator: Launch the Interactive service along with Coordinator. Must enable this option if you want to use `gsctl` command-line tool. + -t, --type: Specify the type of the service to start. + Default is "engine", which means start the engine service. + Other options are "proxy", which means start the proxy service. + -e, --endpoints: Specify the endpoints of the engine service. i.e. + the address of the engine service. For example, "localhost:9190,localhost:9191" EOF } @@ -42,16 +48,18 @@ function prepare_workspace() { workspace="/tmp/interactive_workspace" fi #if workspace is not exist, create it - if [ ! -d "${workspace}" ]; then - mkdir -p ${workspace} - mkdir -p ${workspace}/conf/ - else - echo "Workspace ${workspace} already exists" + mkdir -p ${workspace} + mkdir -p ${workspace}/conf/ + # prepare engine_config.yaml + builtin_graph_directory="${workspace}/data/${DEFAULT_GRAPH_NAME}" + if [ -d "${builtin_graph_directory}" ]; then + echo "The builtin graph: ${DEFAULT_GRAPH_NAME} already exists, skip preparing the workspace" return 0 fi - # prepare engine_config.yaml engine_config_path="${workspace}/conf/engine_config.yaml" - cp /opt/flex/share/engine_config.yaml $engine_config_path + if [ ! -f "${engine_config_path}" ]; then + cp /opt/flex/share/engine_config.yaml $engine_config_path + fi #make sure the line which start with default_graph is changed to default_graph: ${DEFAULT_GRAPH_NAME} sed -i "s/default_graph:.*/default_graph: ${DEFAULT_GRAPH_NAME}/" $engine_config_path echo "Using default graph: ${DEFAULT_GRAPH_NAME} to start the service" @@ -106,39 +114,85 @@ EOF fi } +function launch_proxy_service() { + #expect 1 arg + if [ $# -ne 3 ]; then + echo "Usage: launch_proxy_service " + echo " number of args: $#" + exit 1 + fi + local endpoints=$1 + local port=$2 + local hang_until_success=$3 + start_cmd="${PROXY_SERVER_BIN} -e '${endpoints}' -p ${port} --hang-until-success ${hang_until_success}" + echo "Starting the proxy service with command: $start_cmd" + eval $start_cmd +} + #################### Entry #################### ENABLE_COORDINATOR=false WORKSPACE=/tmp/interactive_workspace +SERVICE_TYPE="engine" +PROXY_PORT=10000 +HANG_UNTIL_SUCCESS=false while [[ $# -gt 0 ]]; do - case $1 in - -w | --workspace) - shift - if [[ $# -eq 0 || $1 == -* ]]; then - echo "Option -w requires an argument." >&2 - exit 1 - fi - WORKSPACE=$1 - shift - ;; - -c | --enable-coordinator) - ENABLE_COORDINATOR=true - shift - ;; - -h | --help) - usage - exit 0 - ;; - *) - echo "Invalid option: $1" >&2 - usage - exit 1 - ;; + key="$1" + + case $key in + -h | --help) + usage + exit + ;; + -w | --workspace) + shift + WORKSPACE="$1" + shift + ;; + -c | --enable-coordinator) + ENABLE_COORDINATOR=true + shift + ;; + -t | --type) + shift + SERVICE_TYPE="$1" + shift + ;; + -e | --endpoints) + shift + ENDPOINTS="$1" + shift + ;; + -p | --port) + shift + PROXY_PORT="$1" + shift + ;; + --hang-until-success) + shift + HANG_UNTIL_SUCCESS="$1" + shift + ;; + *) # unknown option + echo "unknown option $1" + usage + exit 1 + ;; esac done - -prepare_workspace $WORKSPACE -launch_service $WORKSPACE -launch_coordinator +if [ "${SERVICE_TYPE}" != "engine" ] && [ "${SERVICE_TYPE}" != "proxy" ]; then + echo "Invalid service type: ${SERVICE_TYPE}" + usage + exit 1 +fi +if [ "${SERVICE_TYPE}" == "proxy" ]; then + echo "Start the proxy service" + launch_proxy_service $ENDPOINTS $PROXY_PORT $HANG_UNTIL_SUCCESS +else + echo "Start the engine service" + prepare_workspace $WORKSPACE + launch_service $WORKSPACE + launch_coordinator +fi diff --git a/flex/interactive/docker/interactive-runtime.Dockerfile b/flex/interactive/docker/interactive-runtime.Dockerfile index b15811f4b47c..8b87f7db03cb 100644 --- a/flex/interactive/docker/interactive-runtime.Dockerfile +++ b/flex/interactive/docker/interactive-runtime.Dockerfile @@ -4,8 +4,6 @@ FROM registry.cn-hongkong.aliyuncs.com/graphscope/interactive-base:v0.0.4 AS bui ARG ARCH ARG ENABLE_COORDINATOR="false" -COPY --chown=graphscope:graphscope . /home/graphscope/GraphScope - # change bash as default SHELL ["/bin/bash", "-c"] @@ -31,8 +29,10 @@ cmake . -DCMAKE_INSTALL_PREFIX=/opt/flex -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_ -DBUILD_TESTING=OFF -DWITH_EXAMPLES=OFF && make -j && make install && rm -rf /tmp/opentelemetry-cpp # install flex +COPY --chown=graphscope:graphscope . /home/graphscope/GraphScope + RUN . ${HOME}/.cargo/env && cd ${HOME}/GraphScope/flex && \ - git submodule update --init && mkdir build && cd build && cmake .. -DCMAKE_INSTALL_PREFIX=/opt/flex -DBUILD_DOC=OFF -DBUILD_TEST=OFF && make -j && make install && \ + git submodule update --init && mkdir build && cd build && cmake .. -DCMAKE_INSTALL_PREFIX=/opt/flex -DBUILD_DOC=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON && make -j && make install && \ cd ~/GraphScope/interactive_engine/ && mvn clean package -Pexperimental -DskipTests && \ cd ~/GraphScope/interactive_engine/compiler && cp target/compiler-0.0.1-SNAPSHOT.jar /opt/flex/lib/ && \ cp target/libs/*.jar /opt/flex/lib/ && \ diff --git a/flex/interactive/sdk/examples/python/get_service_status.py b/flex/interactive/sdk/examples/python/get_service_status.py new file mode 100644 index 000000000000..b6dc8f7fef19 --- /dev/null +++ b/flex/interactive/sdk/examples/python/get_service_status.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import sys + +sys.path.append("../../python/") +import time +import argparse +import os +from interactive_sdk.client.driver import Driver +from interactive_sdk.client.session import Session +from interactive_sdk.openapi.models.query_request import QueryRequest +from interactive_sdk.openapi.models.gs_data_type import GSDataType +from interactive_sdk.openapi.models.typed_value import TypedValue +from interactive_sdk.openapi.models.primitive_type import PrimitiveType + + +def get_service_status(sess: Session): + print("Get service status") + status = sess.get_service_status() + print(status) + + +def get_procedures(sess: Session): + print("Get procedures") + procedures = sess.list_procedures("1") + print(procedures) + + +def call_procedure(sess: Session): + print("Call procedure") + req = QueryRequest( + query_name="QueryName", + arguments=[ + TypedValue( + type=GSDataType(PrimitiveType(primitive_type="DT_SIGNED_INT32")), + value=1, + ) + ], + ) + resp = sess.call_procedure("1", req) + print(resp) + + +if __name__ == "__main__": + # expect one argument: interactive_endpoint + parser = argparse.ArgumentParser(description="Example Python3 script") + + # Add arguments + parser.add_argument( + "--endpoint", + type=str, + help="The interactive endpoint to connect", + required=True, + default="https://virtserver.swaggerhub.com/GRAPHSCOPE/interactive/1.0.0/", + ) + + # Parse the arguments + args = parser.parse_args() + + driver = Driver(endpoint=args.endpoint) + with driver.session() as sess: + get_service_status(sess) + get_procedures(sess) + call_procedure(sess) diff --git a/flex/interactive/sdk/python/interactive_sdk/client/driver.py b/flex/interactive/sdk/python/interactive_sdk/client/driver.py index 8629decab195..1790e7a934f2 100644 --- a/flex/interactive/sdk/python/interactive_sdk/client/driver.py +++ b/flex/interactive/sdk/python/interactive_sdk/client/driver.py @@ -18,10 +18,8 @@ import sys -from gremlin_python import statics from gremlin_python.driver.client import Client -from gremlin_python.driver.driver_remote_connection import \ - DriverRemoteConnection +from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.structure.graph import Graph diff --git a/flex/storages/metadata/local_file_metadata_store.cc b/flex/storages/metadata/local_file_metadata_store.cc index 0ef323e18ef5..2a4b941b9139 100644 --- a/flex/storages/metadata/local_file_metadata_store.cc +++ b/flex/storages/metadata/local_file_metadata_store.cc @@ -38,7 +38,7 @@ Result LocalFileMetadataStore::CreateMeta( std::unique_lock lock(meta_mutex_); meta_key_t meta_key; ASSIGN_AND_RETURN_IF_RESULT_NOT_OK(meta_key, get_next_meta_key(meta_kind)); - VLOG(10) << "got next meta key: " << meta_key; + LOG(INFO) << "got next meta key: " << meta_key << ", for " << meta_kind; if (is_key_exist(meta_kind, meta_key)) { return Status(StatusCode::InternalError, "When creating meta, got an existing key"); diff --git a/flex/tests/hqps/engine_config_test_2.yaml b/flex/tests/hqps/engine_config_test_2.yaml new file mode 100644 index 000000000000..3e4973392edd --- /dev/null +++ b/flex/tests/hqps/engine_config_test_2.yaml @@ -0,0 +1,39 @@ +directories: + workspace: /tmp/interactive_workspace + subdirs: + data: data + logs: logs + conf: conf +log_level: INFO +default_graph: ldbc +compute_engine: + type: hiactor + workers: + - localhost:10000 + thread_num_per_worker: 1 + store: + type: cpp-mcsr + metadata_store: + type: file # file/sqlite/etcd +compiler: + planner: + is_on: true + opt: RBO + rules: + - FilterIntoJoinRule + - FilterMatchRule + - NotMatchToAntiJoinRule + endpoint: + default_listen_address: localhost + bolt_connector: + disabled: false + port: 7687 + gremlin_connector: + disabled: false + port: 8182 + query_timeout: 40000 + gremlin_script_language_name: antlr_gremlin_calcite +http_service: + default_listen_address: localhost + admin_port: 7777 + query_port: 10001 diff --git a/flex/tests/hqps/hqps_proxy_server_test.sh b/flex/tests/hqps/hqps_proxy_server_test.sh new file mode 100644 index 000000000000..89d2c82df762 --- /dev/null +++ b/flex/tests/hqps/hqps_proxy_server_test.sh @@ -0,0 +1,161 @@ +#!/bin/bash +# Copyright 2020 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -e + +RED='\033[0;31m' +GREEN='\033[0;32m' +NC='\033[0m' # No Color +err() { + echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] -ERROR- $* ${NC}" >&2 +} + +info() { + echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] -INFO- $* ${NC}" +} + +SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) +FLEX_HOME=${SCRIPT_DIR}/../../ +INTERACITIVE_SERVER_BIN=${FLEX_HOME}/build/bin/interactive_server +PROXY_SERVER_BIN=${FLEX_HOME}/build/bin/proxy_server +GIE_HOME=${FLEX_HOME}/../interactive_engine/ +ENGINE_CONFIG_PATH_WORKER1=/tmp/interactive_engine_config_worker1.yaml +ENGINE_CONFIG_PATH_WORKER2=/tmp/interactive_engine_config_worker2.yaml + +if [ $# -lt 2 ] || [ $# -ge 3 ]; then + echo "Receives: $# args, need 2 args" + echo "Usage: $0 " + exit 1 +fi + +INTERACTIVE_WORKSPACE=$1 +ENGINE_CONFIG_PATH=$2 +info "INTERACTIVE_WORKSPACE: ${INTERACTIVE_WORKSPACE}" +info "ENGINE_CONFIG_PATH: ${ENGINE_CONFIG_PATH}" + +kill_service(){ + info "Kill Service first" + ps -ef | grep "com.alibaba.graphscope.GraphServer" | awk '{print $2}' | xargs kill -9 || true + ps -ef | grep "interactive_server" | awk '{print $2}' | xargs kill -9 || true + ps -ef | grep "/bin/proxy_server" | awk '{print $2}' | xargs kill -9 || true + sleep 3 + # check if service is killed + info "Kill Service success" +} + +# kill service when exit +trap kill_service EXIT + +# create two copy of engine config, for two workers. +prepare_engine_config() { + if [ -f ${ENGINE_CONFIG_PATH_WORKER1} ]; then + rm -f ${ENGINE_CONFIG_PATH_WORKER1} + fi + if [ -f ${ENGINE_CONFIG_PATH_WORKER2} ]; then + rm -f ${ENGINE_CONFIG_PATH_WORKER2} + fi + cp ${ENGINE_CONFIG_PATH} ${ENGINE_CONFIG_PATH_WORKER1} + cp ${ENGINE_CONFIG_PATH} ${ENGINE_CONFIG_PATH_WORKER2} + # modify the engine config + sed -i "s/localhost:10000/localhost:10001/g" ${ENGINE_CONFIG_PATH_WORKER2} + sed -i "s/port: 7687/port: 7688/g" ${ENGINE_CONFIG_PATH_WORKER2} + sed -i "s/port: 8182/port: 8183/g" ${ENGINE_CONFIG_PATH_WORKER2} + sed -i "s/admin_port: 7777/admin_port: 7778/g" ${ENGINE_CONFIG_PATH_WORKER2} + sed -i "s/query_port: 10000/query_port: 10001/g" ${ENGINE_CONFIG_PATH_WORKER2} +} + +start_worker() { + info "start worker1" + graph_yaml=${INTERACTIVE_WORKSPACE}/data/modern_graph/graph.yaml + indices_path=${INTERACTIVE_WORKSPACE}/data/modern_graph/indices + base_cmd="${INTERACITIVE_SERVER_BIN} -g ${graph_yaml}" + base_cmd=" ${base_cmd} --data-path ${indices_path}" + cmd1=" ${base_cmd} -c ${ENGINE_CONFIG_PATH_WORKER1}" + cmd2=" ${base_cmd} -c ${ENGINE_CONFIG_PATH_WORKER2}" + info "Start worker1 with command: ${cmd1}" + ${cmd1} & + sleep 2 + info "Start worker2 with command: ${cmd2}" + ${cmd2} & + sleep 2 + # check whether interactive_server has two process running + cnt=$(ps -ef | grep "bin/interactive_server" | grep -v grep | wc -l) + if [ ${cnt} -ne 2 ]; then + err "Start worker failed, expect 2 interactive_server process, but got ${cnt}" + exit 1 + fi + info "Start worker success" +} + +start_proxy() { + info "start proxy server" + cmd="${PROXY_SERVER_BIN} -e localhost:10000,localhost:10001" + info "Start proxy server with command: ${cmd}" + ${cmd} & + sleep 2 + # check whether proxy_server is running + cnt=$(ps -ef | grep "bin/proxy_server" | grep -v grep | wc -l) + if [ ${cnt} -ne 1 ]; then + err "Start proxy server failed, expect 1 proxy_server process, but got ${cnt}" + exit 1 + fi + info "Start proxy server success" +} + + +test_proxy() { + # First check whether proxy server is running, if not, exit + cnt=$(ps -ef | grep "bin/proxy_server" | grep -v grep | wc -l) + if [ ${cnt} -ne 1 ]; then + err "Proxy server is not running, got cnt ${cnt}, expect 1" + exit 1 + fi + # test proxy server + info "Test proxy server" + # check heart beat + res=$(curl -X GET http://localhost:9999/heartbeat) + if [ "${res}" != "OK" ]; then + err "Test proxy server failed, expect OK, but got ${res}" + exit 1 + fi + # now kill worker2, and check whether proxy server can still work + ps -ef | grep "bin/interactive_server" | grep -v grep | grep ${ENGINE_CONFIG_PATH_WORKER2} | awk '{print $2}' | xargs kill -9 + sleep 2 + res=$(curl -X GET http://localhost:9999/heartbeat) + # shold still be ok + if [ "${res}" != "OK" ]; then + err "Test proxy server failed, expect OK, but got ${res}" + exit 1 + fi + # now kill worker1, and check whether proxy server can still work + ps -ef | grep "bin/interactive_server" | grep -v grep | grep ${ENGINE_CONFIG_PATH_WORKER1} | awk '{print $2}' | xargs kill -9 + sleep 2 + res=$(curl -X GET http://localhost:9999/heartbeat) + # shold not contains OK + if [ "${res}" == "OK" ]; then + err "Test proxy server failed, expect not OK, but got ${res}" + exit 1 + fi + info "Test proxy server success" +} + + +kill_service + +prepare_engine_config +start_worker # start interactive worker first +start_proxy # start the proxy server +test_proxy + +kill_service \ No newline at end of file diff --git a/flex/utils/result.h b/flex/utils/result.h index 6d553b4c3b23..865fbadde1b4 100644 --- a/flex/utils/result.h +++ b/flex/utils/result.h @@ -86,6 +86,9 @@ class Result { Result(const Status& status, ValueType&& value) : status_(status), value_(std::move(value)) {} + Result(Status&& status, ValueType&& value) + : status_(std::move(status)), value_(std::move(value)) {} + Result(const Status& status) : status_(status) {} Result(const Status& status, const ValueType& value)