From 14a79166d295249e37f8855f946bce033b19a25b Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK <131133853+Jeyaprakash-NK@users.noreply.github.com> Date: Mon, 15 Apr 2024 15:32:46 +0530 Subject: [PATCH 01/29] list and get cluster temporary changes --- .../contollers/clusterController.py | 16 ++++ dataproc_jupyter_plugin/handlers.py | 3 +- .../services/clusterListService.py | 47 +++++++++- src/cluster/clusterServices.tsx | 90 +++++++------------ 4 files changed, 96 insertions(+), 60 deletions(-) diff --git a/dataproc_jupyter_plugin/contollers/clusterController.py b/dataproc_jupyter_plugin/contollers/clusterController.py index 27d7d583..4fae603b 100644 --- a/dataproc_jupyter_plugin/contollers/clusterController.py +++ b/dataproc_jupyter_plugin/contollers/clusterController.py @@ -31,6 +31,22 @@ def get(self): cluster_list = cluster.list_clusters( credentials, page_size, page_token, self.log ) + print("33333", cluster_list) + self.finish(json.dumps(cluster_list)) + except Exception as e: + self.log.exception(f"Error fetching cluster list") + self.finish({"error": str(e)}) + +class ClusterDetailController(APIHandler): + @tornado.web.authenticated + def get(self): + try: + cluster_selected = self.get_argument("clusterSelected") + cluster = ClusterListService() + credentials = handlers.get_cached_credentials(self.log) + cluster_list = cluster.get_cluster_detail( + credentials, cluster_selected, self.log + ) self.finish(json.dumps(cluster_list)) except Exception as e: self.log.exception(f"Error fetching cluster list") diff --git a/dataproc_jupyter_plugin/handlers.py b/dataproc_jupyter_plugin/handlers.py index a6ff8914..3df7ee40 100644 --- a/dataproc_jupyter_plugin/handlers.py +++ b/dataproc_jupyter_plugin/handlers.py @@ -33,7 +33,7 @@ from google.cloud.jupyter_config.config import gcp_kernel_gateway_url, get_gcloud_config -from dataproc_jupyter_plugin.contollers.clusterController import ClusterListController +from dataproc_jupyter_plugin.contollers.clusterController import ClusterDetailController, ClusterListController from dataproc_jupyter_plugin.contollers.composerController import ComposerListController from dataproc_jupyter_plugin.contollers.dagController import ( DagDeleteController, @@ -356,6 +356,7 @@ def full_path(name): "dagRunTask": DagRunTaskController, "dagRunTaskLogs": DagRunTaskLogsController, "clusterList": ClusterListController, + "clusterDetail": ClusterDetailController, "runtimeList": RuntimeController, "createJobScheduler": ExecutorController, "dagList": DagListController, diff --git a/dataproc_jupyter_plugin/services/clusterListService.py b/dataproc_jupyter_plugin/services/clusterListService.py index 39ad2bc3..eb706ac5 100644 --- a/dataproc_jupyter_plugin/services/clusterListService.py +++ b/dataproc_jupyter_plugin/services/clusterListService.py @@ -16,9 +16,54 @@ import requests from dataproc_jupyter_plugin.utils.constants import CONTENT_TYPE, dataproc_url +from google.cloud import dataproc_v1 as dataproc +import google.oauth2.credentials + class ClusterListService: def list_clusters(self, credentials, page_size, page_token, log): + try: + if ( + ("access_token" in credentials) + and ("project_id" in credentials) + and ("region_id" in credentials) + ): + access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) + + # Create a client + client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) + + # Initialize request argument(s) + request = dataproc.ListClustersRequest( + project_id=credentials["project_id"], + region=credentials["region_id"], + page_size=int(page_size), + page_token=page_token + ) + + clusters_list = [] + + # Make the request + page_result = client.list_clusters(request=request) + print("1111",page_result) + for cluster in page_result: + print(cluster) + cluster_dict = { + "cluster_name": cluster.cluster_name, + "status": cluster.status.state, + } + clusters_list.append(cluster_dict) + print("2222",clusters_list) + return clusters_list + else: + log.exception(f"Missing required credentials") + raise ValueError("Missing required credentials") + except Exception as e: + log.exception(f"Error fetching cluster list") + return {"error": str(e)} + + + def get_cluster_detail(self, credentials, cluster_selected, log): try: if ( ("access_token" in credentials) @@ -28,7 +73,7 @@ def list_clusters(self, credentials, page_size, page_token, log): access_token = credentials["access_token"] project_id = credentials["project_id"] region_id = credentials["region_id"] - api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters?pageSize={page_size}&pageToken={page_token}" + api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters/{cluster_selected}" headers = { "Content-Type": CONTENT_TYPE, "Authorization": f"Bearer {access_token}", diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index 06ebf6f3..d4394179 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -34,6 +34,7 @@ import { statusValue } from '../utils/utils'; import { DataprocLoggingService, LOG_LEVEL } from '../utils/loggingService'; +import { requestAPI } from '../handler/handler'; interface IClusterRenderData { status: { state: ClusterStatus }; @@ -80,17 +81,9 @@ export class ClusterService { const projectId = await getProjectId(); setProjectId(projectId); - const queryParams = new URLSearchParams(); - queryParams.append('pageSize', '50'); - queryParams.append('pageToken', pageToken); + const serviceURL = `clusterList?pageSize=500&pageToken=${pageToken}`; - const response = await authenticatedFetch({ - uri: 'clusters', - regionIdentifier: 'regions', - method: HTTP_METHOD.GET, - queryParams: queryParams - }); - const formattedResponse = await response.json(); + const formattedResponse: any = await requestAPI(serviceURL); let transformClusterListData = []; if (formattedResponse && formattedResponse.clusters) { transformClusterListData = formattedResponse.clusters.map( @@ -158,52 +151,35 @@ export class ClusterService { setClusterInfo: (value: IClusterDetailsResponse) => void ) => { const credentials = await authApi(); - const { DATAPROC } = await gcpServiceUrls; if (credentials) { setProjectName(credentials.project_id || ''); - loggedFetch( - `${DATAPROC}/projects/${credentials.project_id}/regions/${credentials.region_id}/clusters/${clusterSelected}`, - { - method: 'GET', - headers: { - 'Content-Type': API_HEADER_CONTENT_TYPE, - Authorization: API_HEADER_BEARER + credentials.access_token + + try { + const serviceURL = `clusterDetail?clusterSelected=${clusterSelected}`; + + const responseResult: any = await requestAPI(serviceURL); + if (responseResult) { + if (responseResult.error && responseResult.error.code === 404) { + setErrorView(true); } - } - ) - .then((response: Response) => { - response - .json() - .then((responseResult: IClusterDetailsResponse) => { - if (responseResult.error && responseResult.error.code === 404) { - setErrorView(true); - } - if (responseResult?.error?.code) { - toast.error( - responseResult?.error?.message, - toastifyCustomStyle - ); - } - setClusterInfo(responseResult); - setIsLoading(false); - }) - .catch((e: Error) => { - console.log(e); - setIsLoading(false); - }); - }) - .catch((err: Error) => { + if (responseResult?.error?.code) { + toast.error(responseResult?.error?.message, toastifyCustomStyle); + } + setClusterInfo(responseResult); setIsLoading(false); - console.error('Error listing clusters Details', err); - DataprocLoggingService.log( - 'Error listing clusters Details', - LOG_LEVEL.ERROR - ); - toast.error( - `Failed to fetch cluster details ${clusterSelected}`, - toastifyCustomStyle - ); - }); + } + } catch (err) { + setIsLoading(false); + console.error('Error listing clusters Details', err); + DataprocLoggingService.log( + 'Error listing clusters Details', + LOG_LEVEL.ERROR + ); + toast.error( + `Failed to fetch cluster details ${clusterSelected}`, + toastifyCustomStyle + ); + } } }; @@ -213,12 +189,9 @@ export class ClusterService { timer: any ) => { try { - const response = await authenticatedFetch({ - uri: `clusters/${selectedCluster}`, - method: HTTP_METHOD.GET, - regionIdentifier: 'regions' - }); - const formattedResponse = await response.json(); + const serviceURL = `clusterDetail?clusterSelected=${selectedCluster}`; + + const formattedResponse: any = await requestAPI(serviceURL); if (formattedResponse.status.state === ClusterStatus.STATUS_STOPPED) { ClusterService.startClusterApi(selectedCluster); @@ -255,6 +228,7 @@ export class ClusterService { }); const formattedResponse = await response.json(); console.log(formattedResponse); + listClustersAPI(); timer.current = setInterval(() => { statusApi(selectedCluster); From 654b621b03ff764624e5f3f7a9543462acc9f495 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK <131133853+Jeyaprakash-NK@users.noreply.github.com> Date: Tue, 16 Apr 2024 11:48:43 +0530 Subject: [PATCH 02/29] cluster service BE code change --- .../contollers/clusterController.py | 6 +- .../services/clusterListService.py | 91 ------------- .../services/clusterService.py | 121 ++++++++++++++++++ 3 files changed, 124 insertions(+), 94 deletions(-) delete mode 100644 dataproc_jupyter_plugin/services/clusterListService.py create mode 100644 dataproc_jupyter_plugin/services/clusterService.py diff --git a/dataproc_jupyter_plugin/contollers/clusterController.py b/dataproc_jupyter_plugin/contollers/clusterController.py index 4fae603b..4e1ff794 100644 --- a/dataproc_jupyter_plugin/contollers/clusterController.py +++ b/dataproc_jupyter_plugin/contollers/clusterController.py @@ -17,7 +17,7 @@ from jupyter_server.base.handlers import APIHandler import tornado from dataproc_jupyter_plugin import handlers -from dataproc_jupyter_plugin.services.clusterListService import ClusterListService +from dataproc_jupyter_plugin.services.clusterService import ClusterService class ClusterListController(APIHandler): @@ -26,7 +26,7 @@ def get(self): try: page_token = self.get_argument("pageToken") page_size = self.get_argument("pageSize") - cluster = ClusterListService() + cluster = ClusterService() credentials = handlers.get_cached_credentials(self.log) cluster_list = cluster.list_clusters( credentials, page_size, page_token, self.log @@ -42,7 +42,7 @@ class ClusterDetailController(APIHandler): def get(self): try: cluster_selected = self.get_argument("clusterSelected") - cluster = ClusterListService() + cluster = ClusterService() credentials = handlers.get_cached_credentials(self.log) cluster_list = cluster.get_cluster_detail( credentials, cluster_selected, self.log diff --git a/dataproc_jupyter_plugin/services/clusterListService.py b/dataproc_jupyter_plugin/services/clusterListService.py deleted file mode 100644 index eb706ac5..00000000 --- a/dataproc_jupyter_plugin/services/clusterListService.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import requests -from dataproc_jupyter_plugin.utils.constants import CONTENT_TYPE, dataproc_url - -from google.cloud import dataproc_v1 as dataproc -import google.oauth2.credentials - - -class ClusterListService: - def list_clusters(self, credentials, page_size, page_token, log): - try: - if ( - ("access_token" in credentials) - and ("project_id" in credentials) - and ("region_id" in credentials) - ): - access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) - - # Create a client - client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) - - # Initialize request argument(s) - request = dataproc.ListClustersRequest( - project_id=credentials["project_id"], - region=credentials["region_id"], - page_size=int(page_size), - page_token=page_token - ) - - clusters_list = [] - - # Make the request - page_result = client.list_clusters(request=request) - print("1111",page_result) - for cluster in page_result: - print(cluster) - cluster_dict = { - "cluster_name": cluster.cluster_name, - "status": cluster.status.state, - } - clusters_list.append(cluster_dict) - print("2222",clusters_list) - return clusters_list - else: - log.exception(f"Missing required credentials") - raise ValueError("Missing required credentials") - except Exception as e: - log.exception(f"Error fetching cluster list") - return {"error": str(e)} - - - def get_cluster_detail(self, credentials, cluster_selected, log): - try: - if ( - ("access_token" in credentials) - and ("project_id" in credentials) - and ("region_id" in credentials) - ): - access_token = credentials["access_token"] - project_id = credentials["project_id"] - region_id = credentials["region_id"] - api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters/{cluster_selected}" - headers = { - "Content-Type": CONTENT_TYPE, - "Authorization": f"Bearer {access_token}", - } - response = requests.get(api_endpoint, headers=headers) - if response.status_code == 200: - resp = response.json() - - return resp - else: - log.exception(f"Missing required credentials") - raise ValueError("Missing required credentials") - except Exception as e: - log.exception(f"Error fetching cluster list") - return {"error": str(e)} diff --git a/dataproc_jupyter_plugin/services/clusterService.py b/dataproc_jupyter_plugin/services/clusterService.py new file mode 100644 index 00000000..52f79726 --- /dev/null +++ b/dataproc_jupyter_plugin/services/clusterService.py @@ -0,0 +1,121 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import requests +from dataproc_jupyter_plugin.utils.constants import CONTENT_TYPE, dataproc_url + +from google.cloud import dataproc_v1 as dataproc +import google.oauth2.credentials + + +class ClusterService: + # def list_clusters(self, credentials, page_size, page_token, log): + # try: + # if ( + # ("access_token" in credentials) + # and ("project_id" in credentials) + # and ("region_id" in credentials) + # ): + # access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) + + # # Create a client + # client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) + + # # Initialize request argument(s) + # request = dataproc.ListClustersRequest( + # project_id=credentials["project_id"], + # region=credentials["region_id"], + # page_size=int(page_size), + # page_token=page_token + # ) + + # # Make the request + # page_result = client.list_clusters(request=request) + # print("1111",page_result) + # # return page_result + + # clusters_list = [] + + # # Make the request + # for cluster in page_result: + # print(cluster) + # # cluster_dict = { + # # "cluster_name": cluster.cluster_name, + # # "status": cluster.status.state, + # # } + # clusters_list.append({cluster}) + # print("2222",clusters_list) + # return clusters_list + # else: + # log.exception(f"Missing required credentials") + # raise ValueError("Missing required credentials") + # except Exception as e: + # log.exception(f"Error fetching cluster list") + # return {"error": str(e)} + + def list_clusters(self, credentials, page_size, page_token, log): + try: + if ( + ("access_token" in credentials) + and ("project_id" in credentials) + and ("region_id" in credentials) + ): + access_token = credentials["access_token"] + project_id = credentials["project_id"] + region_id = credentials["region_id"] + api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters?pageSize={page_size}&pageToken={page_token}" + headers = { + "Content-Type": CONTENT_TYPE, + "Authorization": f"Bearer {access_token}", + } + response = requests.get(api_endpoint, headers=headers) + if response.status_code == 200: + resp = response.json() + + return resp + else: + log.exception(f"Missing required credentials") + raise ValueError("Missing required credentials") + except Exception as e: + log.exception(f"Error fetching cluster list") + return {"error": str(e)} + + + def get_cluster_detail(self, credentials, cluster_selected, log): + try: + if ( + ("access_token" in credentials) + and ("project_id" in credentials) + and ("region_id" in credentials) + ): + access_token = credentials["access_token"] + project_id = credentials["project_id"] + region_id = credentials["region_id"] + api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters/{cluster_selected}" + headers = { + "Content-Type": CONTENT_TYPE, + "Authorization": f"Bearer {access_token}", + } + response = requests.get(api_endpoint, headers=headers) + if response.status_code == 200: + resp = response.json() + + return resp + else: + log.exception(f"Missing required credentials") + raise ValueError("Missing required credentials") + except Exception as e: + log.exception(f"Error fetching cluster list") + return {"error": str(e)} From 9659c0253ead9dd803e186021f5bfa97f94540ab Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK <131133853+Jeyaprakash-NK@users.noreply.github.com> Date: Wed, 17 Apr 2024 11:11:31 +0530 Subject: [PATCH 03/29] list cluster client library temp changes --- .../contollers/clusterController.py | 1 - .../services/clusterService.py | 147 +++++++++--------- src/cluster/clusterServices.tsx | 6 +- 3 files changed, 77 insertions(+), 77 deletions(-) diff --git a/dataproc_jupyter_plugin/contollers/clusterController.py b/dataproc_jupyter_plugin/contollers/clusterController.py index 4e1ff794..436d2edd 100644 --- a/dataproc_jupyter_plugin/contollers/clusterController.py +++ b/dataproc_jupyter_plugin/contollers/clusterController.py @@ -31,7 +31,6 @@ def get(self): cluster_list = cluster.list_clusters( credentials, page_size, page_token, self.log ) - print("33333", cluster_list) self.finish(json.dumps(cluster_list)) except Exception as e: self.log.exception(f"Error fetching cluster list") diff --git a/dataproc_jupyter_plugin/services/clusterService.py b/dataproc_jupyter_plugin/services/clusterService.py index 52f79726..e9124d35 100644 --- a/dataproc_jupyter_plugin/services/clusterService.py +++ b/dataproc_jupyter_plugin/services/clusterService.py @@ -18,79 +18,75 @@ from google.cloud import dataproc_v1 as dataproc import google.oauth2.credentials +import proto +import json class ClusterService: - # def list_clusters(self, credentials, page_size, page_token, log): - # try: - # if ( - # ("access_token" in credentials) - # and ("project_id" in credentials) - # and ("region_id" in credentials) - # ): - # access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) - - # # Create a client - # client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) - - # # Initialize request argument(s) - # request = dataproc.ListClustersRequest( - # project_id=credentials["project_id"], - # region=credentials["region_id"], - # page_size=int(page_size), - # page_token=page_token - # ) - - # # Make the request - # page_result = client.list_clusters(request=request) - # print("1111",page_result) - # # return page_result - - # clusters_list = [] - - # # Make the request - # for cluster in page_result: - # print(cluster) - # # cluster_dict = { - # # "cluster_name": cluster.cluster_name, - # # "status": cluster.status.state, - # # } - # clusters_list.append({cluster}) - # print("2222",clusters_list) - # return clusters_list - # else: - # log.exception(f"Missing required credentials") - # raise ValueError("Missing required credentials") - # except Exception as e: - # log.exception(f"Error fetching cluster list") - # return {"error": str(e)} - def list_clusters(self, credentials, page_size, page_token, log): try: if ( ("access_token" in credentials) and ("project_id" in credentials) and ("region_id" in credentials) - ): - access_token = credentials["access_token"] - project_id = credentials["project_id"] - region_id = credentials["region_id"] - api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters?pageSize={page_size}&pageToken={page_token}" - headers = { - "Content-Type": CONTENT_TYPE, - "Authorization": f"Bearer {access_token}", - } - response = requests.get(api_endpoint, headers=headers) - if response.status_code == 200: - resp = response.json() - - return resp + ): + access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) + + # Create a client + client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) + + # Initialize request argument(s) + print("call params",int(page_size), page_token) + request = dataproc.ListClustersRequest( + project_id=credentials["project_id"], + page_size=int(page_size), + page_token=page_token, + region=credentials["region_id"], + ) + + # Make the request + page_result = client.list_clusters(request=request) + + clusters_list = [] + + # Handle the response + for response in page_result: + clusters_list.append(json.loads(proto.Message.to_json(response))) + + return clusters_list else: log.exception(f"Missing required credentials") raise ValueError("Missing required credentials") except Exception as e: log.exception(f"Error fetching cluster list") return {"error": str(e)} + + # def list_clusters(self, credentials, page_size, page_token, log): + # try: + # if ( + # ("access_token" in credentials) + # and ("project_id" in credentials) + # and ("region_id" in credentials) + # ): + # access_token = credentials["access_token"] + # project_id = credentials["project_id"] + # region_id = credentials["region_id"] + # api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters?pageSize={page_size}&pageToken={page_token}" + # headers = { + # "Content-Type": CONTENT_TYPE, + # "Authorization": f"Bearer {access_token}", + # } + # response = requests.get(api_endpoint, headers=headers) + # if response.status_code == 200: + # resp = response.json() + + # return resp + # else: + # log.exception(f"Missing required credentials") + # raise ValueError("Missing required credentials") + # except Exception as e: + # log.exception(f"Error fetching cluster list") + # return {"error": str(e)} def get_cluster_detail(self, credentials, cluster_selected, log): @@ -99,23 +95,28 @@ def get_cluster_detail(self, credentials, cluster_selected, log): ("access_token" in credentials) and ("project_id" in credentials) and ("region_id" in credentials) - ): - access_token = credentials["access_token"] - project_id = credentials["project_id"] - region_id = credentials["region_id"] - api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters/{cluster_selected}" - headers = { - "Content-Type": CONTENT_TYPE, - "Authorization": f"Bearer {access_token}", - } - response = requests.get(api_endpoint, headers=headers) - if response.status_code == 200: - resp = response.json() - - return resp + ): + access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) + + # Create a client + client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) + + # Initialize request argument(s) + request = dataproc.GetClusterRequest( + project_id=credentials["project_id"], + region=credentials["region_id"], + cluster_name=cluster_selected + ) + + # Make the request + response = client.get_cluster(request=request) + + # Handle the response + print(response) + return response else: log.exception(f"Missing required credentials") raise ValueError("Missing required credentials") except Exception as e: - log.exception(f"Error fetching cluster list") + log.exception(f"Error fetching cluster detail") return {"error": str(e)} diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index d4394179..ab749382 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -81,12 +81,12 @@ export class ClusterService { const projectId = await getProjectId(); setProjectId(projectId); - const serviceURL = `clusterList?pageSize=500&pageToken=${pageToken}`; + const serviceURL = `clusterList?pageSize=50&pageToken=${pageToken}`; const formattedResponse: any = await requestAPI(serviceURL); let transformClusterListData = []; - if (formattedResponse && formattedResponse.clusters) { - transformClusterListData = formattedResponse.clusters.map( + if (formattedResponse) { + transformClusterListData = formattedResponse.map( (data: any) => { const statusVal = statusValue(data); // Extracting zone from zoneUri From 37087949140c6b31efa5a44f52562d5d2d2fdd11 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Thu, 8 Aug 2024 12:13:09 +0530 Subject: [PATCH 04/29] list and get cluster api status changes --- .../services/clusterService.py | 33 ++---------- src/cluster/clusterServices.tsx | 51 ++++++++++--------- src/utils/const.ts | 13 +++++ src/utils/utils.ts | 9 ++-- 4 files changed, 48 insertions(+), 58 deletions(-) diff --git a/dataproc_jupyter_plugin/services/clusterService.py b/dataproc_jupyter_plugin/services/clusterService.py index e9124d35..d27e034f 100644 --- a/dataproc_jupyter_plugin/services/clusterService.py +++ b/dataproc_jupyter_plugin/services/clusterService.py @@ -51,6 +51,8 @@ def list_clusters(self, credentials, page_size, page_token, log): # Handle the response for response in page_result: + # response.status.state = 'STOPPED' + # print("aaaaaaa", response.status.state) clusters_list.append(json.loads(proto.Message.to_json(response))) return clusters_list @@ -60,34 +62,6 @@ def list_clusters(self, credentials, page_size, page_token, log): except Exception as e: log.exception(f"Error fetching cluster list") return {"error": str(e)} - - # def list_clusters(self, credentials, page_size, page_token, log): - # try: - # if ( - # ("access_token" in credentials) - # and ("project_id" in credentials) - # and ("region_id" in credentials) - # ): - # access_token = credentials["access_token"] - # project_id = credentials["project_id"] - # region_id = credentials["region_id"] - # api_endpoint = f"{dataproc_url}/v1/projects/{project_id}/regions/{region_id}/clusters?pageSize={page_size}&pageToken={page_token}" - # headers = { - # "Content-Type": CONTENT_TYPE, - # "Authorization": f"Bearer {access_token}", - # } - # response = requests.get(api_endpoint, headers=headers) - # if response.status_code == 200: - # resp = response.json() - - # return resp - # else: - # log.exception(f"Missing required credentials") - # raise ValueError("Missing required credentials") - # except Exception as e: - # log.exception(f"Error fetching cluster list") - # return {"error": str(e)} - def get_cluster_detail(self, credentials, cluster_selected, log): try: @@ -112,8 +86,7 @@ def get_cluster_detail(self, credentials, cluster_selected, log): response = client.get_cluster(request=request) # Handle the response - print(response) - return response + return json.loads(proto.Message.to_json(response)) else: log.exception(f"Missing required credentials") raise ValueError("Missing required credentials") diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index ab749382..26f77d3c 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -21,6 +21,7 @@ import { API_HEADER_BEARER, API_HEADER_CONTENT_TYPE, ClusterStatus, + ClusterStatusState, HTTP_METHOD, POLLING_TIME_LIMIT, gcpServiceUrls @@ -86,29 +87,27 @@ export class ClusterService { const formattedResponse: any = await requestAPI(serviceURL); let transformClusterListData = []; if (formattedResponse) { - transformClusterListData = formattedResponse.map( - (data: any) => { - const statusVal = statusValue(data); - // Extracting zone from zoneUri - // Example: "projects/{project}/zones/{zone}" + transformClusterListData = formattedResponse.map((data: any) => { + const statusVal = statusValue(data); + // Extracting zone from zoneUri + // Example: "projects/{project}/zones/{zone}" - const zoneUri = data.config.gceClusterConfig.zoneUri.split('/'); + const zoneUri = data.config.gceClusterConfig.zoneUri.split('/'); - return { - clusterUuid: data.clusterUuid, - status: statusVal, - clusterName: data.clusterName, - clusterImage: data.config.softwareConfig.imageVersion, - region: data.labels['goog-dataproc-location'], - zone: zoneUri[zoneUri.length - 1], - totalWorkersNode: data.config.workerConfig - ? data.config.workerConfig.numInstances - : 0, - schedulesDeletion: data.config.lifecycleConfig ? 'On' : 'Off', - actions: renderActions(data) - }; - } - ); + return { + clusterUuid: data.clusterUuid, + status: statusVal, + clusterName: data.clusterName, + clusterImage: data.config.softwareConfig.imageVersion, + region: data.labels['goog-dataproc-location'], + zone: zoneUri[zoneUri.length - 1], + totalWorkersNode: data.config.workerConfig + ? data.config.workerConfig.numInstances + : 0, + schedulesDeletion: data.config.lifecycleConfig ? 'On' : 'Off', + actions: renderActions(data) + }; + }); } const existingClusterData = previousClustersList ?? []; //setStateAction never type issue @@ -157,7 +156,9 @@ export class ClusterService { try { const serviceURL = `clusterDetail?clusterSelected=${clusterSelected}`; - const responseResult: any = await requestAPI(serviceURL); + let responseResult: any = await requestAPI(serviceURL); + responseResult.status.state = + ClusterStatusState[responseResult.status.state.toString()]; if (responseResult) { if (responseResult.error && responseResult.error.code === 404) { setErrorView(true); @@ -191,7 +192,9 @@ export class ClusterService { try { const serviceURL = `clusterDetail?clusterSelected=${selectedCluster}`; - const formattedResponse: any = await requestAPI(serviceURL); + let formattedResponse: any = await requestAPI(serviceURL); + formattedResponse.status.state = + ClusterStatusState[formattedResponse.status.state.toString()]; if (formattedResponse.status.state === ClusterStatus.STATUS_STOPPED) { ClusterService.startClusterApi(selectedCluster); @@ -228,7 +231,7 @@ export class ClusterService { }); const formattedResponse = await response.json(); console.log(formattedResponse); - + listClustersAPI(); timer.current = setInterval(() => { statusApi(selectedCluster); diff --git a/src/utils/const.ts b/src/utils/const.ts index a4a0449f..01653f5d 100644 --- a/src/utils/const.ts +++ b/src/utils/const.ts @@ -68,6 +68,19 @@ export enum ClusterStatus { STATUS_STOPPED = 'STOPPED', STATUS_ACTIVE = 'ACTIVE' } +export const ClusterStatusState: any = { + '0': 'UNKNOWN', + '1': 'CREATING', + '2': 'RUNNING', + '3': 'ERROR', + '4': 'DELETING', + '5': 'UPDATING', + '6': 'STOPPING', + '7': 'STOPPED', + '8': 'STARTING', + '9': 'ERROR_DUE_TO_UPDATE', + '10': 'REPAIRING' +}; export enum BatchStatus { STATUS_PENDING = 'PENDING' } diff --git a/src/utils/utils.ts b/src/utils/utils.ts index ce35133d..22bc2c51 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -24,6 +24,7 @@ import { requestAPI } from '../handler/handler'; import { API_HEADER_BEARER, API_HEADER_CONTENT_TYPE, + ClusterStatusState, DCU_HOURS, GB_MONTHS, HTTP_METHOD, @@ -31,7 +32,7 @@ import { SPARK, SPARKR, SPARKSQL, - STATUS_CREATING, + // STATUS_CREATING, STATUS_DONE, STATUS_ERROR, STATUS_FAIL, @@ -262,11 +263,11 @@ export const statusMessage = (data: { status: { state: string } }) => { } }; -export const statusValue = (data: { status: { state: string } }) => { - if (data.status.state === STATUS_CREATING) { +export const statusValue = (data: { status: { state: number } }) => { + if (data.status.state === 1) { return STATUS_PROVISIONING; } else { - return data.status.state; + return ClusterStatusState[data.status.state.toString()]; } }; From 25e25c1f0d41ab957d786d9ecdbdf54bd2a648d7 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 12 Aug 2024 10:40:46 +0530 Subject: [PATCH 05/29] stop cluster BE and FE temp changes --- .../contollers/clusterController.py | 21 +++++++++-- dataproc_jupyter_plugin/handlers.py | 3 +- .../services/clusterService.py | 35 ++++++++++++++++-- src/cluster/clusterServices.tsx | 36 +++++++++++++------ src/cluster/listCluster.tsx | 29 ++++++++------- 5 files changed, 94 insertions(+), 30 deletions(-) diff --git a/dataproc_jupyter_plugin/contollers/clusterController.py b/dataproc_jupyter_plugin/contollers/clusterController.py index 436d2edd..b8393877 100644 --- a/dataproc_jupyter_plugin/contollers/clusterController.py +++ b/dataproc_jupyter_plugin/contollers/clusterController.py @@ -43,10 +43,25 @@ def get(self): cluster_selected = self.get_argument("clusterSelected") cluster = ClusterService() credentials = handlers.get_cached_credentials(self.log) - cluster_list = cluster.get_cluster_detail( + get_cluster = cluster.get_cluster_detail( credentials, cluster_selected, self.log ) - self.finish(json.dumps(cluster_list)) + self.finish(json.dumps(get_cluster)) except Exception as e: - self.log.exception(f"Error fetching cluster list") + self.log.exception(f"Error fetching get cluster") + self.finish({"error": str(e)}) + +class StopClusterController(APIHandler): + @tornado.web.authenticated + def post(self): + try: + cluster_selected = self.get_argument("clusterSelected") + cluster = ClusterService() + credentials = handlers.get_cached_credentials(self.log) + stop_cluster = cluster.post_stop_cluster( + credentials, cluster_selected, self.log + ) + self.finish(json.dumps(stop_cluster)) + except Exception as e: + self.log.exception(f"Error fetching stop cluster") self.finish({"error": str(e)}) diff --git a/dataproc_jupyter_plugin/handlers.py b/dataproc_jupyter_plugin/handlers.py index 8e908dd7..5e007f35 100644 --- a/dataproc_jupyter_plugin/handlers.py +++ b/dataproc_jupyter_plugin/handlers.py @@ -33,7 +33,7 @@ from google.cloud.jupyter_config.config import gcp_kernel_gateway_url, get_gcloud_config -from dataproc_jupyter_plugin.contollers.clusterController import ClusterDetailController, ClusterListController +from dataproc_jupyter_plugin.contollers.clusterController import ClusterDetailController, StopClusterController, ClusterListController from dataproc_jupyter_plugin.contollers.composerController import ComposerListController from dataproc_jupyter_plugin.contollers.dagController import ( DagDeleteController, @@ -357,6 +357,7 @@ def full_path(name): "dagRunTaskLogs": DagRunTaskLogsController, "clusterList": ClusterListController, "clusterDetail": ClusterDetailController, + "stopCluster": StopClusterController, "runtimeList": RuntimeController, "createJobScheduler": ExecutorController, "dagList": DagListController, diff --git a/dataproc_jupyter_plugin/services/clusterService.py b/dataproc_jupyter_plugin/services/clusterService.py index d27e034f..a72d961b 100644 --- a/dataproc_jupyter_plugin/services/clusterService.py +++ b/dataproc_jupyter_plugin/services/clusterService.py @@ -36,7 +36,6 @@ def list_clusters(self, credentials, page_size, page_token, log): client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) # Initialize request argument(s) - print("call params",int(page_size), page_token) request = dataproc.ListClustersRequest( project_id=credentials["project_id"], page_size=int(page_size), @@ -51,8 +50,6 @@ def list_clusters(self, credentials, page_size, page_token, log): # Handle the response for response in page_result: - # response.status.state = 'STOPPED' - # print("aaaaaaa", response.status.state) clusters_list.append(json.loads(proto.Message.to_json(response))) return clusters_list @@ -93,3 +90,35 @@ def get_cluster_detail(self, credentials, cluster_selected, log): except Exception as e: log.exception(f"Error fetching cluster detail") return {"error": str(e)} + + def post_stop_cluster(self, credentials, cluster_selected, log): + try: + if ( + ("access_token" in credentials) + and ("project_id" in credentials) + and ("region_id" in credentials) + ): + access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) + + # Create a client + client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) + + # Initialize request argument(s) + request = dataproc.StopClusterRequest( + project_id=credentials["project_id"], + region=credentials["region_id"], + cluster_name=cluster_selected + ) + + operation = client.stop_cluster(request=request) + + response = operation.result() + + # Handle the response + return json.loads(proto.Message.to_json(response)) + else: + log.exception(f"Missing required credentials") + raise ValueError("Missing required credentials") + except Exception as e: + log.exception(f"Error fetching stop cluster") + return {"error": str(e)} \ No newline at end of file diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index 26f77d3c..cf292f3a 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -22,7 +22,7 @@ import { API_HEADER_CONTENT_TYPE, ClusterStatus, ClusterStatusState, - HTTP_METHOD, + // HTTP_METHOD, POLLING_TIME_LIMIT, gcpServiceUrls } from '../utils/const'; @@ -31,7 +31,7 @@ import { toastifyCustomStyle, loggedFetch, getProjectId, - authenticatedFetch, + // authenticatedFetch, statusValue } from '../utils/utils'; import { DataprocLoggingService, LOG_LEVEL } from '../utils/loggingService'; @@ -219,25 +219,41 @@ export class ClusterService { setRestartEnabled: (value: boolean) => void, listClustersAPI: () => void, timer: any, - statusApi: (value: string) => void + statusApi: (value: string) => void, + clustersList: ICluster[], + setClustersList: (value: ICluster[]) => void ) => { setRestartEnabled(true); + let tempClustersList = [...clustersList]; + console.log(tempClustersList) + tempClustersList.forEach(clusterData => { + if (clusterData.clusterName === selectedCluster) { + clusterData.status = 'STOPPING'; + } + }); + setClustersList(tempClustersList); + try { - const response = await authenticatedFetch({ - uri: `clusters/${selectedCluster}:stop`, - method: HTTP_METHOD.POST, - regionIdentifier: 'regions' + // const response = await authenticatedFetch({ + // uri: `clusters/${selectedCluster}:stop`, + // method: HTTP_METHOD.POST, + // regionIdentifier: 'regions' + // }); + const serviceURL = `stopCluster?clusterSelected=${selectedCluster}`; + + let formattedResponse: any = await requestAPI(serviceURL, { + method: 'POST' }); - const formattedResponse = await response.json(); + // const formattedResponse = await response.json(); console.log(formattedResponse); listClustersAPI(); timer.current = setInterval(() => { statusApi(selectedCluster); }, POLLING_TIME_LIMIT); - if (formattedResponse?.error?.code) { - toast.error(formattedResponse?.error?.message, toastifyCustomStyle); + if (formattedResponse?.error) { + toast.error(formattedResponse?.error, toastifyCustomStyle); } // This is an artifact of the refactoring listClustersAPI(); diff --git a/src/cluster/listCluster.tsx b/src/cluster/listCluster.tsx index 5d7de3c9..8eb30992 100644 --- a/src/cluster/listCluster.tsx +++ b/src/cluster/listCluster.tsx @@ -27,6 +27,7 @@ import { ClipLoader } from 'react-spinners'; import { CREATE_CLUSTER_URL, ClusterStatus, + ClusterStatusState, STATUS_CREATING, STATUS_DELETING, STATUS_ERROR, @@ -202,7 +203,9 @@ function ListCluster({ setRestartEnabled, listClustersAPI, timer, - statusApi + statusApi, + clustersList, + setClustersList ); }; @@ -214,23 +217,23 @@ function ListCluster({
ClusterService.startClusterApi(data.clusterName) : undefined } > - {data.status.state === ClusterStatus.STATUS_STOPPED && + {ClusterStatusState[data.status.state.toString()] === ClusterStatus.STATUS_STOPPED && !restartEnabled ? ( ClusterService.stopClusterApi(data.clusterName) : undefined } > - {data.status.state === ClusterStatus.STATUS_RUNNING ? ( + {ClusterStatusState[data.status.state.toString()] === ClusterStatus.STATUS_RUNNING ? ( restartClusterApi(data.clusterName) : undefined } > - {data.status.state === ClusterStatus.STATUS_RUNNING ? ( + {ClusterStatusState[data.status.state.toString()] === ClusterStatus.STATUS_RUNNING ? ( Date: Mon, 12 Aug 2024 10:44:06 +0530 Subject: [PATCH 06/29] controller rename changes --- .../{contollers => controllers}/clusterController.py | 0 dataproc_jupyter_plugin/handlers.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename dataproc_jupyter_plugin/{contollers => controllers}/clusterController.py (100%) diff --git a/dataproc_jupyter_plugin/contollers/clusterController.py b/dataproc_jupyter_plugin/controllers/clusterController.py similarity index 100% rename from dataproc_jupyter_plugin/contollers/clusterController.py rename to dataproc_jupyter_plugin/controllers/clusterController.py diff --git a/dataproc_jupyter_plugin/handlers.py b/dataproc_jupyter_plugin/handlers.py index 5e007f35..9cbc8d31 100644 --- a/dataproc_jupyter_plugin/handlers.py +++ b/dataproc_jupyter_plugin/handlers.py @@ -33,7 +33,7 @@ from google.cloud.jupyter_config.config import gcp_kernel_gateway_url, get_gcloud_config -from dataproc_jupyter_plugin.contollers.clusterController import ClusterDetailController, StopClusterController, ClusterListController +from dataproc_jupyter_plugin.controllers.clusterController import ClusterDetailController, StopClusterController, ClusterListController from dataproc_jupyter_plugin.contollers.composerController import ComposerListController from dataproc_jupyter_plugin.contollers.dagController import ( DagDeleteController, From d539d90b475fbfc2522ec67f3ab7d3c2c47fbb97 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 12 Aug 2024 14:48:48 +0530 Subject: [PATCH 07/29] start cluster and BE temp changes --- .../controllers/cluster.py | 59 +++-- dataproc_jupyter_plugin/handlers.py | 1 + .../services/clusterService.py | 223 ++++++++++-------- src/cluster/clusterServices.tsx | 105 +++++---- 4 files changed, 233 insertions(+), 155 deletions(-) diff --git a/dataproc_jupyter_plugin/controllers/cluster.py b/dataproc_jupyter_plugin/controllers/cluster.py index 642da848..017acf0a 100644 --- a/dataproc_jupyter_plugin/controllers/cluster.py +++ b/dataproc_jupyter_plugin/controllers/cluster.py @@ -14,54 +14,73 @@ import json +import aiohttp from jupyter_server.base.handlers import APIHandler import tornado -from dataproc_jupyter_plugin import handlers -from dataproc_jupyter_plugin.services.clusterService import ClusterService +from dataproc_jupyter_plugin import credentials +from dataproc_jupyter_plugin.services import clusterService class ClusterListPageController(APIHandler): @tornado.web.authenticated - def get(self): + async def get(self): try: page_token = self.get_argument("pageToken") page_size = self.get_argument("pageSize") - cluster = ClusterService() - credentials = handlers.get_cached_credentials(self.log) - cluster_list = cluster.list_clusters( - credentials, page_size, page_token, self.log - ) + async with aiohttp.ClientSession() as client_session: + client = clusterService.Client( + await credentials.get_cached(), self.log, client_session + ) + cluster_list = await client.list_clusters(page_size, page_token) self.finish(json.dumps(cluster_list)) except Exception as e: self.log.exception(f"Error fetching cluster list") self.finish({"error": str(e)}) + class ClusterDetailController(APIHandler): @tornado.web.authenticated - def get(self): + async def get(self): try: cluster_selected = self.get_argument("clusterSelected") - cluster = ClusterService() - credentials = handlers.get_cached_credentials(self.log) - get_cluster = cluster.get_cluster_detail( - credentials, cluster_selected, self.log - ) + async with aiohttp.ClientSession() as client_session: + client = clusterService.Client( + await credentials.get_cached(), self.log, client_session + ) + get_cluster = await client.get_cluster_detail(cluster_selected) self.finish(json.dumps(get_cluster)) except Exception as e: self.log.exception(f"Error fetching get cluster") self.finish({"error": str(e)}) + class StopClusterController(APIHandler): @tornado.web.authenticated - def post(self): + async def post(self): try: cluster_selected = self.get_argument("clusterSelected") - cluster = ClusterService() - credentials = handlers.get_cached_credentials(self.log) - stop_cluster = cluster.post_stop_cluster( - credentials, cluster_selected, self.log - ) + async with aiohttp.ClientSession() as client_session: + client = clusterService.Client( + await credentials.get_cached(), self.log, client_session + ) + stop_cluster = await client.post_stop_cluster(cluster_selected) self.finish(json.dumps(stop_cluster)) except Exception as e: self.log.exception(f"Error fetching stop cluster") self.finish({"error": str(e)}) + + +class StartClusterController(APIHandler): + @tornado.web.authenticated + async def post(self): + try: + cluster_selected = self.get_argument("clusterSelected") + async with aiohttp.ClientSession() as client_session: + client = clusterService.Client( + await credentials.get_cached(), self.log, client_session + ) + start_cluster = await client.post_start_cluster(cluster_selected) + self.finish(json.dumps(start_cluster)) + except Exception as e: + self.log.exception(f"Error fetching start cluster") + self.finish({"error": str(e)}) diff --git a/dataproc_jupyter_plugin/handlers.py b/dataproc_jupyter_plugin/handlers.py index 6dd46c40..568bc38f 100644 --- a/dataproc_jupyter_plugin/handlers.py +++ b/dataproc_jupyter_plugin/handlers.py @@ -197,6 +197,7 @@ def full_path(name): "clusterListPage": cluster.ClusterListPageController, "clusterDetail": cluster.ClusterDetailController, "stopCluster": cluster.StopClusterController, + "startCluster": cluster.StartClusterController, "runtimeList": dataproc.RuntimeController, "createJobScheduler": executor.ExecutorController, "dagList": airflow.DagListController, diff --git a/dataproc_jupyter_plugin/services/clusterService.py b/dataproc_jupyter_plugin/services/clusterService.py index a72d961b..fef5e48b 100644 --- a/dataproc_jupyter_plugin/services/clusterService.py +++ b/dataproc_jupyter_plugin/services/clusterService.py @@ -12,113 +12,146 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import requests -from dataproc_jupyter_plugin.utils.constants import CONTENT_TYPE, dataproc_url - +from dataproc_jupyter_plugin.commons.constants import CONTENT_TYPE from google.cloud import dataproc_v1 as dataproc import google.oauth2.credentials import proto import json -class ClusterService: - def list_clusters(self, credentials, page_size, page_token, log): +class Client: + def __init__(self, credentials, log, client_session): + self.log = log + if not ( + ("access_token" in credentials) + and ("project_id" in credentials) + and ("region_id" in credentials) + ): + self.log.exception("Missing required credentials") + raise ValueError("Missing required credentials") + self._access_token = credentials["access_token"] + self.project_id = credentials["project_id"] + self.region_id = credentials["region_id"] + self.client_session = client_session + + def create_headers(self): + return { + "Content-Type": CONTENT_TYPE, + "Authorization": f"Bearer {self._access_token}", + } + + async def list_clusters(self, page_size, page_token): + print("List cluster service") + try: + # Create a client + client = dataproc.ClusterControllerAsyncClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=self._access_token, + ) + + # Initialize request argument(s) + request = dataproc.ListClustersRequest( + project_id=self.project_id, + page_size=int(page_size), + page_token=page_token, + region=self.region_id, + ) + + print("requesttttt", request) + # Make the request + page_result = await client.list_clusters(request=request) + + print("page_resultttt", page_result) + clusters_list = [] + + print("responseeee", page_result) + # Handle the response + async for response in page_result: + clusters_list.append(json.loads(proto.Message.to_json(response))) + + return clusters_list + except Exception as e: + self.log.exception(f"Error fetching cluster list") + return {"error": str(e)} + + def get_cluster_detail(self, cluster_selected): try: - if ( - ("access_token" in credentials) - and ("project_id" in credentials) - and ("region_id" in credentials) - ): - access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) - - # Create a client - client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) - - # Initialize request argument(s) - request = dataproc.ListClustersRequest( - project_id=credentials["project_id"], - page_size=int(page_size), - page_token=page_token, - region=credentials["region_id"], - ) - - # Make the request - page_result = client.list_clusters(request=request) - - clusters_list = [] - - # Handle the response - for response in page_result: - clusters_list.append(json.loads(proto.Message.to_json(response))) - - return clusters_list - else: - log.exception(f"Missing required credentials") - raise ValueError("Missing required credentials") + # Create a client + client = dataproc.ClusterControllerClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=self._access_token, + ) + + # Initialize request argument(s) + request = dataproc.GetClusterRequest( + project_id=self.project_id, + region=self.region_id, + cluster_name=cluster_selected, + ) + + # Make the request + response = client.get_cluster(request=request) + + # Handle the response + return json.loads(proto.Message.to_json(response)) except Exception as e: - log.exception(f"Error fetching cluster list") + self.log.exception(f"Error fetching cluster detail") return {"error": str(e)} - - def get_cluster_detail(self, credentials, cluster_selected, log): + + def post_stop_cluster(self, cluster_selected): try: - if ( - ("access_token" in credentials) - and ("project_id" in credentials) - and ("region_id" in credentials) - ): - access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) - - # Create a client - client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) - - # Initialize request argument(s) - request = dataproc.GetClusterRequest( - project_id=credentials["project_id"], - region=credentials["region_id"], - cluster_name=cluster_selected - ) - - # Make the request - response = client.get_cluster(request=request) - - # Handle the response - return json.loads(proto.Message.to_json(response)) - else: - log.exception(f"Missing required credentials") - raise ValueError("Missing required credentials") + # Create a client + client = dataproc.ClusterControllerClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=self._access_token, + ) + + # Initialize request argument(s) + request = dataproc.StopClusterRequest( + project_id=self.project_id, + region=self.region_id, + cluster_name=cluster_selected, + ) + + operation = client.stop_cluster(request=request) + + response = operation.result() + + # Handle the response + return json.loads(proto.Message.to_json(response)) except Exception as e: - log.exception(f"Error fetching cluster detail") + self.log.exception(f"Error fetching stop cluster") return {"error": str(e)} - def post_stop_cluster(self, credentials, cluster_selected, log): + def post_start_cluster(self, cluster_selected): try: - if ( - ("access_token" in credentials) - and ("project_id" in credentials) - and ("region_id" in credentials) - ): - access_credentials = google.oauth2.credentials.Credentials(credentials["access_token"]) - - # Create a client - client = dataproc.ClusterControllerClient(client_options={"api_endpoint": f"us-central1-dataproc.googleapis.com:443"},credentials = access_credentials) - - # Initialize request argument(s) - request = dataproc.StopClusterRequest( - project_id=credentials["project_id"], - region=credentials["region_id"], - cluster_name=cluster_selected - ) - - operation = client.stop_cluster(request=request) - - response = operation.result() - - # Handle the response - return json.loads(proto.Message.to_json(response)) - else: - log.exception(f"Missing required credentials") - raise ValueError("Missing required credentials") + # Create a client + client = dataproc.ClusterControllerClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=self._access_token, + ) + + # Initialize request argument(s) + request = dataproc.StartClusterRequest( + project_id=self.project_id, + region=self.region_id, + cluster_name=cluster_selected, + ) + + operation = client.start_cluster(request=request) + + response = operation.result() + + # Handle the response + return json.loads(proto.Message.to_json(response)) except Exception as e: - log.exception(f"Error fetching stop cluster") - return {"error": str(e)} \ No newline at end of file + self.log.exception(f"Error fetching start cluster") + return {"error": str(e)} diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index 5352f472..64afa523 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -82,9 +82,11 @@ export class ClusterService { const projectId = await getProjectId(); setProjectId(projectId); - const serviceURL = `clusterList?pageSize=50&pageToken=${pageToken}`; + const serviceURL = `clusterListPage?pageSize=50&pageToken=${pageToken}`; const formattedResponse: any = await requestAPI(serviceURL); + + console.log(formattedResponse); let transformClusterListData = []; if (formattedResponse) { transformClusterListData = formattedResponse.map((data: any) => { @@ -234,7 +236,7 @@ export class ClusterService { setRestartEnabled(true); let tempClustersList = [...clustersList]; - console.log(tempClustersList) + console.log(tempClustersList); tempClustersList.forEach(clusterData => { if (clusterData.clusterName === selectedCluster) { clusterData.status = 'STOPPING'; @@ -321,45 +323,68 @@ export class ClusterService { selectedcluster: string, operation: 'start' | 'stop' ) => { - const credentials = await authApi(); - const { DATAPROC } = await gcpServiceUrls; - if (credentials) { - loggedFetch( - `${DATAPROC}/projects/${credentials.project_id}/regions/${credentials.region_id}/clusters/${selectedcluster}:${operation}`, - { - method: 'POST', - headers: { - 'Content-Type': API_HEADER_CONTENT_TYPE, - Authorization: API_HEADER_BEARER + credentials.access_token - } - } - ) - .then((response: Response) => { - response - .json() - .then(async (responseResult: Response) => { - console.log(responseResult); - const formattedResponse = await responseResult.json(); - if (formattedResponse?.error?.code) { - toast.error( - formattedResponse?.error?.message, - toastifyCustomStyle - ); - } - }) - .catch((e: Error) => console.log(e)); - }) - .catch((err: Error) => { - DataprocLoggingService.log( - `Error ${operation} cluster`, - LOG_LEVEL.ERROR - ); - toast.error( - `Failed to ${operation} the cluster ${selectedcluster} : ${err}`, - toastifyCustomStyle - ); - }); + // const credentials = await authApi(); + // const { DATAPROC } = await gcpServiceUrls; + + try { + const serviceURL = + operation === 'stop' + ? `stopCluster?clusterSelected=${selectedcluster}` + : `startCluster?clusterSelected=${selectedcluster}`; + + let responseResult: any = await requestAPI(serviceURL, { + method: 'POST' + }); + + const formattedResponse = await responseResult.json(); + if (formattedResponse?.error?.code) { + toast.error(formattedResponse?.error?.message, toastifyCustomStyle); + } + } catch (err) { + DataprocLoggingService.log(`Error ${operation} cluster`, LOG_LEVEL.ERROR); + toast.error( + `Failed to ${operation} the cluster ${selectedcluster} : ${err}`, + toastifyCustomStyle + ); } + + // if (credentials) { + // loggedFetch( + // `${DATAPROC}/projects/${credentials.project_id}/regions/${credentials.region_id}/clusters/${selectedcluster}:${operation}`, + // { + // method: 'POST', + // headers: { + // 'Content-Type': API_HEADER_CONTENT_TYPE, + // Authorization: API_HEADER_BEARER + credentials.access_token + // } + // } + // ) + // .then((response: Response) => { + // response + // .json() + // .then(async (responseResult: Response) => { + // console.log(responseResult); + // const formattedResponse = await responseResult.json(); + // if (formattedResponse?.error?.code) { + // toast.error( + // formattedResponse?.error?.message, + // toastifyCustomStyle + // ); + // } + // }) + // .catch((e: Error) => console.log(e)); + // }) + // .catch((err: Error) => { + // DataprocLoggingService.log( + // `Error ${operation} cluster`, + // LOG_LEVEL.ERROR + // ); + // toast.error( + // `Failed to ${operation} the cluster ${selectedcluster} : ${err}`, + // toastifyCustomStyle + // ); + // }); + // } }; static startClusterApi = async (selectedcluster: string) => { From 1d600384ce6bc8e7b874eefa8d50315dbd957a50 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 12 Aug 2024 17:57:08 +0530 Subject: [PATCH 08/29] Service file rename changes --- .../controllers/cluster.py | 10 +++---- .../{clusterService.py => cluster.py} | 28 +++++++------------ 2 files changed, 15 insertions(+), 23 deletions(-) rename dataproc_jupyter_plugin/services/{clusterService.py => cluster.py} (85%) diff --git a/dataproc_jupyter_plugin/controllers/cluster.py b/dataproc_jupyter_plugin/controllers/cluster.py index 017acf0a..013d9a3e 100644 --- a/dataproc_jupyter_plugin/controllers/cluster.py +++ b/dataproc_jupyter_plugin/controllers/cluster.py @@ -18,7 +18,7 @@ from jupyter_server.base.handlers import APIHandler import tornado from dataproc_jupyter_plugin import credentials -from dataproc_jupyter_plugin.services import clusterService +from dataproc_jupyter_plugin.services import cluster class ClusterListPageController(APIHandler): @@ -28,7 +28,7 @@ async def get(self): page_token = self.get_argument("pageToken") page_size = self.get_argument("pageSize") async with aiohttp.ClientSession() as client_session: - client = clusterService.Client( + client = cluster.Client( await credentials.get_cached(), self.log, client_session ) cluster_list = await client.list_clusters(page_size, page_token) @@ -44,7 +44,7 @@ async def get(self): try: cluster_selected = self.get_argument("clusterSelected") async with aiohttp.ClientSession() as client_session: - client = clusterService.Client( + client = cluster.Client( await credentials.get_cached(), self.log, client_session ) get_cluster = await client.get_cluster_detail(cluster_selected) @@ -60,7 +60,7 @@ async def post(self): try: cluster_selected = self.get_argument("clusterSelected") async with aiohttp.ClientSession() as client_session: - client = clusterService.Client( + client = cluster.Client( await credentials.get_cached(), self.log, client_session ) stop_cluster = await client.post_stop_cluster(cluster_selected) @@ -76,7 +76,7 @@ async def post(self): try: cluster_selected = self.get_argument("clusterSelected") async with aiohttp.ClientSession() as client_session: - client = clusterService.Client( + client = cluster.Client( await credentials.get_cached(), self.log, client_session ) start_cluster = await client.post_start_cluster(cluster_selected) diff --git a/dataproc_jupyter_plugin/services/clusterService.py b/dataproc_jupyter_plugin/services/cluster.py similarity index 85% rename from dataproc_jupyter_plugin/services/clusterService.py rename to dataproc_jupyter_plugin/services/cluster.py index fef5e48b..a020165b 100644 --- a/dataproc_jupyter_plugin/services/clusterService.py +++ b/dataproc_jupyter_plugin/services/cluster.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataproc_jupyter_plugin.commons.constants import CONTENT_TYPE from google.cloud import dataproc_v1 as dataproc -import google.oauth2.credentials import proto import json @@ -34,12 +32,6 @@ def __init__(self, credentials, log, client_session): self.region_id = credentials["region_id"] self.client_session = client_session - def create_headers(self): - return { - "Content-Type": CONTENT_TYPE, - "Authorization": f"Bearer {self._access_token}", - } - async def list_clusters(self, page_size, page_token): print("List cluster service") try: @@ -69,17 +61,17 @@ async def list_clusters(self, page_size, page_token): print("responseeee", page_result) # Handle the response async for response in page_result: - clusters_list.append(json.loads(proto.Message.to_json(response))) + await clusters_list.append(json.loads(proto.Message.to_json(response))) return clusters_list except Exception as e: self.log.exception(f"Error fetching cluster list") return {"error": str(e)} - def get_cluster_detail(self, cluster_selected): + async def get_cluster_detail(self, cluster_selected): try: # Create a client - client = dataproc.ClusterControllerClient( + client = dataproc.ClusterControllerAsyncClient( client_options={ "api_endpoint": f"us-central1-dataproc.googleapis.com:443" }, @@ -94,7 +86,7 @@ def get_cluster_detail(self, cluster_selected): ) # Make the request - response = client.get_cluster(request=request) + response = await client.get_cluster(request=request) # Handle the response return json.loads(proto.Message.to_json(response)) @@ -102,10 +94,10 @@ def get_cluster_detail(self, cluster_selected): self.log.exception(f"Error fetching cluster detail") return {"error": str(e)} - def post_stop_cluster(self, cluster_selected): + async def post_stop_cluster(self, cluster_selected): try: # Create a client - client = dataproc.ClusterControllerClient( + client = dataproc.ClusterControllerAsyncClient( client_options={ "api_endpoint": f"us-central1-dataproc.googleapis.com:443" }, @@ -121,7 +113,7 @@ def post_stop_cluster(self, cluster_selected): operation = client.stop_cluster(request=request) - response = operation.result() + response = (await operation).result() # Handle the response return json.loads(proto.Message.to_json(response)) @@ -129,10 +121,10 @@ def post_stop_cluster(self, cluster_selected): self.log.exception(f"Error fetching stop cluster") return {"error": str(e)} - def post_start_cluster(self, cluster_selected): + async def post_start_cluster(self, cluster_selected): try: # Create a client - client = dataproc.ClusterControllerClient( + client = dataproc.ClusterControllerAsyncClient( client_options={ "api_endpoint": f"us-central1-dataproc.googleapis.com:443" }, @@ -148,7 +140,7 @@ def post_start_cluster(self, cluster_selected): operation = client.start_cluster(request=request) - response = operation.result() + response = (await operation).result() # Handle the response return json.loads(proto.Message.to_json(response)) From cd8467729e82fc3ada75e1632ec5d2a591de864f Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 19 Aug 2024 15:22:50 +0530 Subject: [PATCH 09/29] delete cluster and auth access token fix --- .../controllers/cluster.py | 51 +++++--- dataproc_jupyter_plugin/handlers.py | 3 +- dataproc_jupyter_plugin/services/cluster.py | 59 ++++++--- src/cluster/clusterServices.tsx | 123 ++++-------------- 4 files changed, 97 insertions(+), 139 deletions(-) diff --git a/dataproc_jupyter_plugin/controllers/cluster.py b/dataproc_jupyter_plugin/controllers/cluster.py index 013d9a3e..e3725bf7 100644 --- a/dataproc_jupyter_plugin/controllers/cluster.py +++ b/dataproc_jupyter_plugin/controllers/cluster.py @@ -14,7 +14,6 @@ import json -import aiohttp from jupyter_server.base.handlers import APIHandler import tornado from dataproc_jupyter_plugin import credentials @@ -27,11 +26,10 @@ async def get(self): try: page_token = self.get_argument("pageToken") page_size = self.get_argument("pageSize") - async with aiohttp.ClientSession() as client_session: - client = cluster.Client( - await credentials.get_cached(), self.log, client_session - ) - cluster_list = await client.list_clusters(page_size, page_token) + client = cluster.Client( + await credentials.get_cached(), self.log + ) + cluster_list = await client.list_clusters(page_size, page_token) self.finish(json.dumps(cluster_list)) except Exception as e: self.log.exception(f"Error fetching cluster list") @@ -43,11 +41,10 @@ class ClusterDetailController(APIHandler): async def get(self): try: cluster_selected = self.get_argument("clusterSelected") - async with aiohttp.ClientSession() as client_session: - client = cluster.Client( - await credentials.get_cached(), self.log, client_session - ) - get_cluster = await client.get_cluster_detail(cluster_selected) + client = cluster.Client( + await credentials.get_cached(), self.log + ) + get_cluster = await client.get_cluster_detail(cluster_selected) self.finish(json.dumps(get_cluster)) except Exception as e: self.log.exception(f"Error fetching get cluster") @@ -59,11 +56,10 @@ class StopClusterController(APIHandler): async def post(self): try: cluster_selected = self.get_argument("clusterSelected") - async with aiohttp.ClientSession() as client_session: - client = cluster.Client( - await credentials.get_cached(), self.log, client_session - ) - stop_cluster = await client.post_stop_cluster(cluster_selected) + client = cluster.Client( + await credentials.get_cached(), self.log + ) + stop_cluster = await client.stop_cluster(cluster_selected) self.finish(json.dumps(stop_cluster)) except Exception as e: self.log.exception(f"Error fetching stop cluster") @@ -75,12 +71,25 @@ class StartClusterController(APIHandler): async def post(self): try: cluster_selected = self.get_argument("clusterSelected") - async with aiohttp.ClientSession() as client_session: - client = cluster.Client( - await credentials.get_cached(), self.log, client_session - ) - start_cluster = await client.post_start_cluster(cluster_selected) + client = cluster.Client( + await credentials.get_cached(), self.log + ) + start_cluster = await client.start_cluster(cluster_selected) self.finish(json.dumps(start_cluster)) except Exception as e: self.log.exception(f"Error fetching start cluster") self.finish({"error": str(e)}) + +class DeleteClusterController(APIHandler): + @tornado.web.authenticated + async def delete(self): + try: + cluster_selected = self.get_argument("clusterSelected") + client = cluster.Client( + await credentials.get_cached(), self.log + ) + delete_cluster = await client.delete_cluster(cluster_selected) + self.finish(json.dumps(delete_cluster)) + except Exception as e: + self.log.exception(f"Error deleting cluster") + self.finish({"error": str(e)}) diff --git a/dataproc_jupyter_plugin/handlers.py b/dataproc_jupyter_plugin/handlers.py index 568bc38f..08a7b718 100644 --- a/dataproc_jupyter_plugin/handlers.py +++ b/dataproc_jupyter_plugin/handlers.py @@ -34,9 +34,9 @@ from dataproc_jupyter_plugin.controllers import ( airflow, bigquery, + cluster, composer, dataproc, - cluster, executor, ) @@ -198,6 +198,7 @@ def full_path(name): "clusterDetail": cluster.ClusterDetailController, "stopCluster": cluster.StopClusterController, "startCluster": cluster.StartClusterController, + "deleteCluster": cluster.DeleteClusterController, "runtimeList": dataproc.RuntimeController, "createJobScheduler": executor.ExecutorController, "dagList": airflow.DagListController, diff --git a/dataproc_jupyter_plugin/services/cluster.py b/dataproc_jupyter_plugin/services/cluster.py index a020165b..4cea2b5c 100644 --- a/dataproc_jupyter_plugin/services/cluster.py +++ b/dataproc_jupyter_plugin/services/cluster.py @@ -15,10 +15,10 @@ from google.cloud import dataproc_v1 as dataproc import proto import json - +import google.oauth2.credentials as oauth2 class Client: - def __init__(self, credentials, log, client_session): + def __init__(self, credentials, log): self.log = log if not ( ("access_token" in credentials) @@ -30,17 +30,15 @@ def __init__(self, credentials, log, client_session): self._access_token = credentials["access_token"] self.project_id = credentials["project_id"] self.region_id = credentials["region_id"] - self.client_session = client_session async def list_clusters(self, page_size, page_token): - print("List cluster service") try: # Create a client client = dataproc.ClusterControllerAsyncClient( client_options={ "api_endpoint": f"us-central1-dataproc.googleapis.com:443" }, - credentials=self._access_token, + credentials=oauth2.Credentials(self._access_token), ) # Initialize request argument(s) @@ -51,17 +49,13 @@ async def list_clusters(self, page_size, page_token): region=self.region_id, ) - print("requesttttt", request) # Make the request page_result = await client.list_clusters(request=request) - - print("page_resultttt", page_result) clusters_list = [] - print("responseeee", page_result) # Handle the response async for response in page_result: - await clusters_list.append(json.loads(proto.Message.to_json(response))) + clusters_list.append(json.loads(proto.Message.to_json(response))) return clusters_list except Exception as e: @@ -75,7 +69,7 @@ async def get_cluster_detail(self, cluster_selected): client_options={ "api_endpoint": f"us-central1-dataproc.googleapis.com:443" }, - credentials=self._access_token, + credentials=oauth2.Credentials(self._access_token), ) # Initialize request argument(s) @@ -94,14 +88,14 @@ async def get_cluster_detail(self, cluster_selected): self.log.exception(f"Error fetching cluster detail") return {"error": str(e)} - async def post_stop_cluster(self, cluster_selected): + async def stop_cluster(self, cluster_selected): try: # Create a client client = dataproc.ClusterControllerAsyncClient( client_options={ "api_endpoint": f"us-central1-dataproc.googleapis.com:443" }, - credentials=self._access_token, + credentials=oauth2.Credentials(self._access_token), ) # Initialize request argument(s) @@ -113,22 +107,23 @@ async def post_stop_cluster(self, cluster_selected): operation = client.stop_cluster(request=request) + print("stoppppppp111",operation) response = (await operation).result() - + print("stoppppppp222",response) # Handle the response return json.loads(proto.Message.to_json(response)) except Exception as e: self.log.exception(f"Error fetching stop cluster") return {"error": str(e)} - async def post_start_cluster(self, cluster_selected): + async def start_cluster(self, cluster_selected): try: # Create a client client = dataproc.ClusterControllerAsyncClient( client_options={ "api_endpoint": f"us-central1-dataproc.googleapis.com:443" }, - credentials=self._access_token, + credentials=oauth2.Credentials(self._access_token), ) # Initialize request argument(s) @@ -140,10 +135,40 @@ async def post_start_cluster(self, cluster_selected): operation = client.start_cluster(request=request) + print("startttttt111",operation) response = (await operation).result() - + print("startttttt222",response) # Handle the response return json.loads(proto.Message.to_json(response)) except Exception as e: self.log.exception(f"Error fetching start cluster") return {"error": str(e)} + + async def delete_cluster(self, cluster_selected): + try: + print("delete cluster") + # Create a client + client = dataproc.ClusterControllerAsyncClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=oauth2.Credentials(self._access_token), + ) + + # Initialize request argument(s) + request = dataproc.DeleteClusterRequest( + project_id=self.project_id, + region=self.region_id, + cluster_name=cluster_selected, + ) + + operation = client.delete_cluster(request=request) + + print("delete111",operation) + response = (await operation).result() + print("delete222",response) + # Handle the response + return json.loads(proto.Message.to_json(response)) + except Exception as e: + self.log.exception(f"Error deleting cluster") + return {"error": str(e)} diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index 64afa523..3c0e5301 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -18,20 +18,14 @@ import { toast } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; import { - API_HEADER_BEARER, - API_HEADER_CONTENT_TYPE, ClusterStatus, ClusterStatusState, - // HTTP_METHOD, - POLLING_TIME_LIMIT, - gcpServiceUrls + POLLING_TIME_LIMIT } from '../utils/const'; import { authApi, toastifyCustomStyle, - loggedFetch, getProjectId, - // authenticatedFetch, statusValue } from '../utils/utils'; import { DataprocLoggingService, LOG_LEVEL } from '../utils/loggingService'; @@ -234,22 +228,7 @@ export class ClusterService { setClustersList: (value: ICluster[]) => void ) => { setRestartEnabled(true); - - let tempClustersList = [...clustersList]; - console.log(tempClustersList); - tempClustersList.forEach(clusterData => { - if (clusterData.clusterName === selectedCluster) { - clusterData.status = 'STOPPING'; - } - }); - setClustersList(tempClustersList); - try { - // const response = await authenticatedFetch({ - // uri: `clusters/${selectedCluster}:stop`, - // method: HTTP_METHOD.POST, - // regionIdentifier: 'regions' - // }); const serviceURL = `stopCluster?clusterSelected=${selectedCluster}`; let formattedResponse: any = await requestAPI(serviceURL, { @@ -279,43 +258,28 @@ export class ClusterService { }; static deleteClusterApi = async (selectedcluster: string) => { - const credentials = await authApi(); - const { DATAPROC } = await gcpServiceUrls; - if (credentials) { - loggedFetch( - `${DATAPROC}/projects/${credentials.project_id}/regions/${credentials.region_id}/clusters/${selectedcluster}`, - { - method: 'DELETE', - headers: { - 'Content-Type': API_HEADER_CONTENT_TYPE, - Authorization: API_HEADER_BEARER + credentials.access_token - } - } - ) - .then((response: Response) => { - response - .json() - .then(async (responseResult: Response) => { - console.log(responseResult); - const formattedResponse = await responseResult.json(); - if (formattedResponse?.error?.code) { - toast.error( - formattedResponse?.error?.message, - toastifyCustomStyle - ); - } else { - toast.success( - `Cluster ${selectedcluster} deleted successfully`, - toastifyCustomStyle - ); - } - }) - .catch((e: Error) => console.log(e)); - }) - .catch((err: Error) => { - DataprocLoggingService.log('Error deleting cluster', LOG_LEVEL.ERROR); - toast.error(`Error deleting cluster : ${err}`, toastifyCustomStyle); - }); + try { + const serviceURL = `deleteCluster?clusterSelected=${selectedcluster}`; + + let formattedResponse: any = await requestAPI(serviceURL, { + method: 'DELETE' + }); + + if (formattedResponse?.error) { + toast.error(formattedResponse?.error?.message, toastifyCustomStyle); + } else { + toast.success( + `Cluster ${selectedcluster} deleted successfully`, + toastifyCustomStyle + ); + } + + } catch (error) { + DataprocLoggingService.log('Error deleting cluster', LOG_LEVEL.ERROR); + toast.error( + `Error deleting cluster ${selectedcluster} : ${error}`, + toastifyCustomStyle + ); } }; @@ -323,9 +287,6 @@ export class ClusterService { selectedcluster: string, operation: 'start' | 'stop' ) => { - // const credentials = await authApi(); - // const { DATAPROC } = await gcpServiceUrls; - try { const serviceURL = operation === 'stop' @@ -347,44 +308,6 @@ export class ClusterService { toastifyCustomStyle ); } - - // if (credentials) { - // loggedFetch( - // `${DATAPROC}/projects/${credentials.project_id}/regions/${credentials.region_id}/clusters/${selectedcluster}:${operation}`, - // { - // method: 'POST', - // headers: { - // 'Content-Type': API_HEADER_CONTENT_TYPE, - // Authorization: API_HEADER_BEARER + credentials.access_token - // } - // } - // ) - // .then((response: Response) => { - // response - // .json() - // .then(async (responseResult: Response) => { - // console.log(responseResult); - // const formattedResponse = await responseResult.json(); - // if (formattedResponse?.error?.code) { - // toast.error( - // formattedResponse?.error?.message, - // toastifyCustomStyle - // ); - // } - // }) - // .catch((e: Error) => console.log(e)); - // }) - // .catch((err: Error) => { - // DataprocLoggingService.log( - // `Error ${operation} cluster`, - // LOG_LEVEL.ERROR - // ); - // toast.error( - // `Failed to ${operation} the cluster ${selectedcluster} : ${err}`, - // toastifyCustomStyle - // ); - // }); - // } }; static startClusterApi = async (selectedcluster: string) => { From 2b7b95b5d8777ccaa0d4a880dca128cac16f56aa Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 19 Aug 2024 16:10:23 +0530 Subject: [PATCH 10/29] await changes in start, stop, delete --- dataproc_jupyter_plugin/services/cluster.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dataproc_jupyter_plugin/services/cluster.py b/dataproc_jupyter_plugin/services/cluster.py index 4cea2b5c..847e77f5 100644 --- a/dataproc_jupyter_plugin/services/cluster.py +++ b/dataproc_jupyter_plugin/services/cluster.py @@ -105,10 +105,10 @@ async def stop_cluster(self, cluster_selected): cluster_name=cluster_selected, ) - operation = client.stop_cluster(request=request) + operation = await client.stop_cluster(request=request) print("stoppppppp111",operation) - response = (await operation).result() + response = await operation.result() print("stoppppppp222",response) # Handle the response return json.loads(proto.Message.to_json(response)) @@ -133,10 +133,10 @@ async def start_cluster(self, cluster_selected): cluster_name=cluster_selected, ) - operation = client.start_cluster(request=request) + operation = await client.start_cluster(request=request) print("startttttt111",operation) - response = (await operation).result() + response = await operation.result() print("startttttt222",response) # Handle the response return json.loads(proto.Message.to_json(response)) @@ -162,10 +162,10 @@ async def delete_cluster(self, cluster_selected): cluster_name=cluster_selected, ) - operation = client.delete_cluster(request=request) + operation = await client.delete_cluster(request=request) print("delete111",operation) - response = (await operation).result() + response = await operation.result() print("delete222",response) # Handle the response return json.loads(proto.Message.to_json(response)) From 9b714926742c4bafeb53adba5099a4749349022b Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 19 Aug 2024 17:59:35 +0530 Subject: [PATCH 11/29] delete cluster empty handled --- dataproc_jupyter_plugin/services/cluster.py | 13 +++++------ src/cluster/clusterServices.tsx | 24 ++++++++------------- src/cluster/listCluster.tsx | 4 +--- 3 files changed, 15 insertions(+), 26 deletions(-) diff --git a/dataproc_jupyter_plugin/services/cluster.py b/dataproc_jupyter_plugin/services/cluster.py index 847e77f5..2457d2f4 100644 --- a/dataproc_jupyter_plugin/services/cluster.py +++ b/dataproc_jupyter_plugin/services/cluster.py @@ -16,6 +16,7 @@ import proto import json import google.oauth2.credentials as oauth2 +from google.protobuf.empty_pb2 import Empty class Client: def __init__(self, credentials, log): @@ -107,9 +108,7 @@ async def stop_cluster(self, cluster_selected): operation = await client.stop_cluster(request=request) - print("stoppppppp111",operation) response = await operation.result() - print("stoppppppp222",response) # Handle the response return json.loads(proto.Message.to_json(response)) except Exception as e: @@ -135,9 +134,7 @@ async def start_cluster(self, cluster_selected): operation = await client.start_cluster(request=request) - print("startttttt111",operation) response = await operation.result() - print("startttttt222",response) # Handle the response return json.loads(proto.Message.to_json(response)) except Exception as e: @@ -146,7 +143,6 @@ async def start_cluster(self, cluster_selected): async def delete_cluster(self, cluster_selected): try: - print("delete cluster") # Create a client client = dataproc.ClusterControllerAsyncClient( client_options={ @@ -164,11 +160,12 @@ async def delete_cluster(self, cluster_selected): operation = await client.delete_cluster(request=request) - print("delete111",operation) response = await operation.result() - print("delete222",response) # Handle the response - return json.loads(proto.Message.to_json(response)) + if isinstance(response, Empty): + return "Deleted successfully" + else: + return json.loads(proto.Message.to_json(response)) except Exception as e: self.log.exception(f"Error deleting cluster") return {"error": str(e)} diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index 3c0e5301..97067893 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -80,7 +80,6 @@ export class ClusterService { const formattedResponse: any = await requestAPI(serviceURL); - console.log(formattedResponse); let transformClusterListData = []; if (formattedResponse) { transformClusterListData = formattedResponse.map((data: any) => { @@ -127,9 +126,9 @@ export class ClusterService { setIsLoading(false); setLoggedIn(true); } - if (formattedResponse?.error?.code) { + if (formattedResponse?.error) { if (!toast.isActive('clusterListingError')) { - toast.error(formattedResponse?.error?.message, { + toast.error(formattedResponse?.error, { ...toastifyCustomStyle, toastId: 'clusterListingError' }); @@ -205,8 +204,8 @@ export class ClusterService { ClusterService.startClusterApi(selectedCluster); clearInterval(timer.current); } - if (formattedResponse?.error?.code) { - toast.error(formattedResponse?.error?.message, toastifyCustomStyle); + if (formattedResponse?.error) { + toast.error(formattedResponse?.error, toastifyCustomStyle); } listClustersAPI(); } catch (error) { @@ -223,9 +222,7 @@ export class ClusterService { setRestartEnabled: (value: boolean) => void, listClustersAPI: () => void, timer: any, - statusApi: (value: string) => void, - clustersList: ICluster[], - setClustersList: (value: ICluster[]) => void + statusApi: (value: string) => void ) => { setRestartEnabled(true); try { @@ -234,8 +231,6 @@ export class ClusterService { let formattedResponse: any = await requestAPI(serviceURL, { method: 'POST' }); - // const formattedResponse = await response.json(); - console.log(formattedResponse); listClustersAPI(); timer.current = setInterval(() => { @@ -266,7 +261,7 @@ export class ClusterService { }); if (formattedResponse?.error) { - toast.error(formattedResponse?.error?.message, toastifyCustomStyle); + toast.error(formattedResponse?.error, toastifyCustomStyle); } else { toast.success( `Cluster ${selectedcluster} deleted successfully`, @@ -293,13 +288,12 @@ export class ClusterService { ? `stopCluster?clusterSelected=${selectedcluster}` : `startCluster?clusterSelected=${selectedcluster}`; - let responseResult: any = await requestAPI(serviceURL, { + let formattedResponse: any = await requestAPI(serviceURL, { method: 'POST' }); - const formattedResponse = await responseResult.json(); - if (formattedResponse?.error?.code) { - toast.error(formattedResponse?.error?.message, toastifyCustomStyle); + if (formattedResponse?.error) { + toast.error(formattedResponse?.error, toastifyCustomStyle); } } catch (err) { DataprocLoggingService.log(`Error ${operation} cluster`, LOG_LEVEL.ERROR); diff --git a/src/cluster/listCluster.tsx b/src/cluster/listCluster.tsx index 42342223..f9a6998b 100644 --- a/src/cluster/listCluster.tsx +++ b/src/cluster/listCluster.tsx @@ -203,9 +203,7 @@ function ListCluster({ setRestartEnabled, listClustersAPI, timer, - statusApi, - clustersList, - setClustersList + statusApi ); }; From 312902acc5d8edf700a1ca1ea4c6008720de0c05 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Tue, 20 Aug 2024 15:57:37 +0530 Subject: [PATCH 12/29] added new dependency "google-cloud-dataproc" --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index dbf97a6a..0c911601 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,8 @@ dependencies = [ "pendulum>=3.0.0", "pydantic~=1.10.0", "bigframes~=0.22.0", - "aiohttp~=3.9.5" + "aiohttp~=3.9.5", + "google-cloud-dataproc~=5.10.2" ] dynamic = ["version", "description", "authors", "urls", "keywords"] From 68c21db2e88d49c1a7475e311cdd56e0917dbd3a Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Thu, 22 Aug 2024 12:13:52 +0530 Subject: [PATCH 13/29] code cleanup --- src/utils/utils.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utils/utils.ts b/src/utils/utils.ts index e873fee8..45f268cb 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -32,7 +32,6 @@ import { SPARK, SPARKR, SPARKSQL, - // STATUS_CREATING, STATUS_DONE, STATUS_ERROR, STATUS_FAIL, From 4d709718239d28fe9018019f974ebb8011327e21 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Thu, 5 Sep 2024 20:10:56 +0530 Subject: [PATCH 14/29] Code review comments fix --- .../controllers/cluster.py | 95 ---------- .../controllers/dataproc.py | 73 ++++++-- dataproc_jupyter_plugin/handlers.py | 10 +- dataproc_jupyter_plugin/services/cluster.py | 171 ------------------ dataproc_jupyter_plugin/services/dataproc.py | 163 +++++++++++++++-- src/cluster/clusterServices.tsx | 14 +- src/cluster/listCluster.tsx | 24 +-- src/scheduler/schedulerServices.tsx | 4 +- src/utils/const.ts | 26 +-- src/utils/utils.ts | 2 +- 10 files changed, 244 insertions(+), 338 deletions(-) delete mode 100644 dataproc_jupyter_plugin/controllers/cluster.py delete mode 100644 dataproc_jupyter_plugin/services/cluster.py diff --git a/dataproc_jupyter_plugin/controllers/cluster.py b/dataproc_jupyter_plugin/controllers/cluster.py deleted file mode 100644 index e3725bf7..00000000 --- a/dataproc_jupyter_plugin/controllers/cluster.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import json -from jupyter_server.base.handlers import APIHandler -import tornado -from dataproc_jupyter_plugin import credentials -from dataproc_jupyter_plugin.services import cluster - - -class ClusterListPageController(APIHandler): - @tornado.web.authenticated - async def get(self): - try: - page_token = self.get_argument("pageToken") - page_size = self.get_argument("pageSize") - client = cluster.Client( - await credentials.get_cached(), self.log - ) - cluster_list = await client.list_clusters(page_size, page_token) - self.finish(json.dumps(cluster_list)) - except Exception as e: - self.log.exception(f"Error fetching cluster list") - self.finish({"error": str(e)}) - - -class ClusterDetailController(APIHandler): - @tornado.web.authenticated - async def get(self): - try: - cluster_selected = self.get_argument("clusterSelected") - client = cluster.Client( - await credentials.get_cached(), self.log - ) - get_cluster = await client.get_cluster_detail(cluster_selected) - self.finish(json.dumps(get_cluster)) - except Exception as e: - self.log.exception(f"Error fetching get cluster") - self.finish({"error": str(e)}) - - -class StopClusterController(APIHandler): - @tornado.web.authenticated - async def post(self): - try: - cluster_selected = self.get_argument("clusterSelected") - client = cluster.Client( - await credentials.get_cached(), self.log - ) - stop_cluster = await client.stop_cluster(cluster_selected) - self.finish(json.dumps(stop_cluster)) - except Exception as e: - self.log.exception(f"Error fetching stop cluster") - self.finish({"error": str(e)}) - - -class StartClusterController(APIHandler): - @tornado.web.authenticated - async def post(self): - try: - cluster_selected = self.get_argument("clusterSelected") - client = cluster.Client( - await credentials.get_cached(), self.log - ) - start_cluster = await client.start_cluster(cluster_selected) - self.finish(json.dumps(start_cluster)) - except Exception as e: - self.log.exception(f"Error fetching start cluster") - self.finish({"error": str(e)}) - -class DeleteClusterController(APIHandler): - @tornado.web.authenticated - async def delete(self): - try: - cluster_selected = self.get_argument("clusterSelected") - client = cluster.Client( - await credentials.get_cached(), self.log - ) - delete_cluster = await client.delete_cluster(cluster_selected) - self.finish(json.dumps(delete_cluster)) - except Exception as e: - self.log.exception(f"Error deleting cluster") - self.finish({"error": str(e)}) diff --git a/dataproc_jupyter_plugin/controllers/dataproc.py b/dataproc_jupyter_plugin/controllers/dataproc.py index db822234..925059e1 100644 --- a/dataproc_jupyter_plugin/controllers/dataproc.py +++ b/dataproc_jupyter_plugin/controllers/dataproc.py @@ -22,7 +22,7 @@ from dataproc_jupyter_plugin.services import dataproc -class ClusterListController(APIHandler): +class RuntimeController(APIHandler): @tornado.web.authenticated async def get(self): try: @@ -32,25 +32,74 @@ async def get(self): client = dataproc.Client( await credentials.get_cached(), self.log, client_session ) - cluster_list = await client.list_clusters(page_size, page_token) - self.finish(json.dumps(cluster_list)) + runtime_list = await client.list_runtime(page_size, page_token) + self.finish(json.dumps(runtime_list)) except Exception as e: - self.log.exception("Error fetching cluster list") + self.log.exception(f"Error fetching runtime template list: {str(e)}") self.finish({"error": str(e)}) -class RuntimeController(APIHandler): +class ClusterListController(APIHandler): @tornado.web.authenticated async def get(self): try: page_token = self.get_argument("pageToken") page_size = self.get_argument("pageSize") - async with aiohttp.ClientSession() as client_session: - client = dataproc.Client( - await credentials.get_cached(), self.log, client_session - ) - runtime_list = await client.list_runtime(page_size, page_token) - self.finish(json.dumps(runtime_list)) + client = dataproc.Client(await credentials.get_cached(), self.log) + cluster_list = await client.list_clusters(page_size, page_token) + self.finish(json.dumps(cluster_list)) except Exception as e: - self.log.exception(f"Error fetching runtime template list: {str(e)}") + self.log.exception(f"Error fetching cluster list") + self.finish({"error": str(e)}) + + +class ClusterDetailController(APIHandler): + @tornado.web.authenticated + async def get(self): + try: + cluster = self.get_argument("cluster") + client = dataproc.Client(await credentials.get_cached(), self.log) + get_cluster = await client.get_cluster_detail(cluster) + self.finish(json.dumps(get_cluster)) + except Exception as e: + self.log.exception(f"Error fetching get cluster") + self.finish({"error": str(e)}) + + +class StopClusterController(APIHandler): + @tornado.web.authenticated + async def post(self): + try: + cluster = self.get_argument("cluster") + client = dataproc.Client(await credentials.get_cached(), self.log) + stop_cluster = await client.stop_cluster(cluster) + self.finish(json.dumps(stop_cluster)) + except Exception as e: + self.log.exception(f"Error fetching stop cluster") + self.finish({"error": str(e)}) + + +class StartClusterController(APIHandler): + @tornado.web.authenticated + async def post(self): + try: + cluster = self.get_argument("cluster") + client = dataproc.Client(await credentials.get_cached(), self.log) + start_cluster = await client.start_cluster(cluster) + self.finish(json.dumps(start_cluster)) + except Exception as e: + self.log.exception(f"Error fetching start cluster") + self.finish({"error": str(e)}) + + +class DeleteClusterController(APIHandler): + @tornado.web.authenticated + async def delete(self): + try: + cluster = self.get_argument("cluster") + client = dataproc.Client(await credentials.get_cached(), self.log) + delete_cluster = await client.delete_cluster(cluster) + self.finish(json.dumps(delete_cluster)) + except Exception as e: + self.log.exception(f"Error deleting cluster") self.finish({"error": str(e)}) diff --git a/dataproc_jupyter_plugin/handlers.py b/dataproc_jupyter_plugin/handlers.py index 08a7b718..43cc098e 100644 --- a/dataproc_jupyter_plugin/handlers.py +++ b/dataproc_jupyter_plugin/handlers.py @@ -34,7 +34,6 @@ from dataproc_jupyter_plugin.controllers import ( airflow, bigquery, - cluster, composer, dataproc, executor, @@ -194,11 +193,10 @@ def full_path(name): "dagRunTask": airflow.DagRunTaskController, "dagRunTaskLogs": airflow.DagRunTaskLogsController, "clusterList": dataproc.ClusterListController, - "clusterListPage": cluster.ClusterListPageController, - "clusterDetail": cluster.ClusterDetailController, - "stopCluster": cluster.StopClusterController, - "startCluster": cluster.StartClusterController, - "deleteCluster": cluster.DeleteClusterController, + "clusterDetail": dataproc.ClusterDetailController, + "stopCluster": dataproc.StopClusterController, + "startCluster": dataproc.StartClusterController, + "deleteCluster": dataproc.DeleteClusterController, "runtimeList": dataproc.RuntimeController, "createJobScheduler": executor.ExecutorController, "dagList": airflow.DagListController, diff --git a/dataproc_jupyter_plugin/services/cluster.py b/dataproc_jupyter_plugin/services/cluster.py deleted file mode 100644 index 2457d2f4..00000000 --- a/dataproc_jupyter_plugin/services/cluster.py +++ /dev/null @@ -1,171 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from google.cloud import dataproc_v1 as dataproc -import proto -import json -import google.oauth2.credentials as oauth2 -from google.protobuf.empty_pb2 import Empty - -class Client: - def __init__(self, credentials, log): - self.log = log - if not ( - ("access_token" in credentials) - and ("project_id" in credentials) - and ("region_id" in credentials) - ): - self.log.exception("Missing required credentials") - raise ValueError("Missing required credentials") - self._access_token = credentials["access_token"] - self.project_id = credentials["project_id"] - self.region_id = credentials["region_id"] - - async def list_clusters(self, page_size, page_token): - try: - # Create a client - client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, - credentials=oauth2.Credentials(self._access_token), - ) - - # Initialize request argument(s) - request = dataproc.ListClustersRequest( - project_id=self.project_id, - page_size=int(page_size), - page_token=page_token, - region=self.region_id, - ) - - # Make the request - page_result = await client.list_clusters(request=request) - clusters_list = [] - - # Handle the response - async for response in page_result: - clusters_list.append(json.loads(proto.Message.to_json(response))) - - return clusters_list - except Exception as e: - self.log.exception(f"Error fetching cluster list") - return {"error": str(e)} - - async def get_cluster_detail(self, cluster_selected): - try: - # Create a client - client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, - credentials=oauth2.Credentials(self._access_token), - ) - - # Initialize request argument(s) - request = dataproc.GetClusterRequest( - project_id=self.project_id, - region=self.region_id, - cluster_name=cluster_selected, - ) - - # Make the request - response = await client.get_cluster(request=request) - - # Handle the response - return json.loads(proto.Message.to_json(response)) - except Exception as e: - self.log.exception(f"Error fetching cluster detail") - return {"error": str(e)} - - async def stop_cluster(self, cluster_selected): - try: - # Create a client - client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, - credentials=oauth2.Credentials(self._access_token), - ) - - # Initialize request argument(s) - request = dataproc.StopClusterRequest( - project_id=self.project_id, - region=self.region_id, - cluster_name=cluster_selected, - ) - - operation = await client.stop_cluster(request=request) - - response = await operation.result() - # Handle the response - return json.loads(proto.Message.to_json(response)) - except Exception as e: - self.log.exception(f"Error fetching stop cluster") - return {"error": str(e)} - - async def start_cluster(self, cluster_selected): - try: - # Create a client - client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, - credentials=oauth2.Credentials(self._access_token), - ) - - # Initialize request argument(s) - request = dataproc.StartClusterRequest( - project_id=self.project_id, - region=self.region_id, - cluster_name=cluster_selected, - ) - - operation = await client.start_cluster(request=request) - - response = await operation.result() - # Handle the response - return json.loads(proto.Message.to_json(response)) - except Exception as e: - self.log.exception(f"Error fetching start cluster") - return {"error": str(e)} - - async def delete_cluster(self, cluster_selected): - try: - # Create a client - client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, - credentials=oauth2.Credentials(self._access_token), - ) - - # Initialize request argument(s) - request = dataproc.DeleteClusterRequest( - project_id=self.project_id, - region=self.region_id, - cluster_name=cluster_selected, - ) - - operation = await client.delete_cluster(request=request) - - response = await operation.result() - # Handle the response - if isinstance(response, Empty): - return "Deleted successfully" - else: - return json.loads(proto.Message.to_json(response)) - except Exception as e: - self.log.exception(f"Error deleting cluster") - return {"error": str(e)} diff --git a/dataproc_jupyter_plugin/services/dataproc.py b/dataproc_jupyter_plugin/services/dataproc.py index 249fb12c..f58633be 100644 --- a/dataproc_jupyter_plugin/services/dataproc.py +++ b/dataproc_jupyter_plugin/services/dataproc.py @@ -18,9 +18,15 @@ DATAPROC_SERVICE_NAME, ) +from google.cloud import dataproc_v1 as dataproc +import proto +import json +import google.oauth2.credentials as oauth2 +from google.protobuf.empty_pb2 import Empty + class Client: - def __init__(self, credentials, log, client_session): + def __init__(self, credentials, log, client_session=None): self.log = log if not ( ("access_token" in credentials) @@ -40,10 +46,10 @@ def create_headers(self): "Authorization": f"Bearer {self._access_token}", } - async def list_clusters(self, page_size, page_token): + async def list_runtime(self, page_size, page_token): try: dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) - api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/regions/{self.region_id}/clusters?pageSize={page_size}&pageToken={page_token}" + api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}" async with self.client_session.get( api_endpoint, headers=self.create_headers() ) as response: @@ -52,27 +58,146 @@ async def list_clusters(self, page_size, page_token): return resp else: return { - "error": f"Failed to fetch clusters: {response.status} {await response.text()}" + "error": f"Failed to fetch runtimes: {response.status} {await response.text()}" } + except Exception as e: + self.log.exception(f"Error fetching runtime list: {str(e)}") + return {"error": str(e)} + + async def list_clusters(self, page_size, page_token): + try: + # Create a client + client = dataproc.ClusterControllerAsyncClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=oauth2.Credentials(self._access_token), + ) + + # Initialize request argument(s) + request = dataproc.ListClustersRequest( + project_id=self.project_id, + page_size=int(page_size), + page_token=page_token, + region=self.region_id, + ) + + # Make the request + page_result = await client.list_clusters(request=request) + clusters_list = [] + # Handle the response + async for response in page_result: + clusters_list.append(json.loads(proto.Message.to_json(response))) + + return clusters_list except Exception as e: - self.log.exception("Error fetching cluster list") + self.log.exception(f"Error fetching cluster list") return {"error": str(e)} - async def list_runtime(self, page_size, page_token): + async def get_cluster_detail(self, cluster): try: - dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) - api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}" - async with self.client_session.get( - api_endpoint, headers=self.create_headers() - ) as response: - if response.status == 200: - resp = await response.json() - return resp - else: - return { - "error": f"Failed to fetch runtimes: {response.status} {await response.text()}" - } + # Create a client + client = dataproc.ClusterControllerAsyncClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=oauth2.Credentials(self._access_token), + ) + + # Initialize request argument(s) + request = dataproc.GetClusterRequest( + project_id=self.project_id, + region=self.region_id, + cluster_name=cluster, + ) + + # Make the request + response = await client.get_cluster(request=request) + + # Handle the response + return json.loads(proto.Message.to_json(response)) except Exception as e: - self.log.exception(f"Error fetching runtime list: {str(e)}") + self.log.exception(f"Error fetching cluster detail") + return {"error": str(e)} + + async def stop_cluster(self, cluster): + try: + # Create a client + client = dataproc.ClusterControllerAsyncClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=oauth2.Credentials(self._access_token), + ) + + # Initialize request argument(s) + request = dataproc.StopClusterRequest( + project_id=self.project_id, + region=self.region_id, + cluster_name=cluster, + ) + + operation = await client.stop_cluster(request=request) + + response = await operation.result() + # Handle the response + return json.loads(proto.Message.to_json(response)) + except Exception as e: + self.log.exception(f"Error fetching stop cluster") + return {"error": str(e)} + + async def start_cluster(self, cluster): + try: + # Create a client + client = dataproc.ClusterControllerAsyncClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=oauth2.Credentials(self._access_token), + ) + + # Initialize request argument(s) + request = dataproc.StartClusterRequest( + project_id=self.project_id, + region=self.region_id, + cluster_name=cluster, + ) + + operation = await client.start_cluster(request=request) + + response = await operation.result() + # Handle the response + return json.loads(proto.Message.to_json(response)) + except Exception as e: + self.log.exception(f"Error fetching start cluster") + return {"error": str(e)} + + async def delete_cluster(self, cluster): + try: + # Create a client + client = dataproc.ClusterControllerAsyncClient( + client_options={ + "api_endpoint": f"us-central1-dataproc.googleapis.com:443" + }, + credentials=oauth2.Credentials(self._access_token), + ) + + # Initialize request argument(s) + request = dataproc.DeleteClusterRequest( + project_id=self.project_id, + region=self.region_id, + cluster_name=cluster, + ) + + operation = await client.delete_cluster(request=request) + + response = await operation.result() + # Handle the response + if isinstance(response, Empty): + return "Deleted successfully" + else: + return json.loads(proto.Message.to_json(response)) + except Exception as e: + self.log.exception(f"Error deleting cluster") return {"error": str(e)} diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index 97067893..1e51ac7c 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -76,7 +76,7 @@ export class ClusterService { const projectId = await getProjectId(); setProjectId(projectId); - const serviceURL = `clusterListPage?pageSize=50&pageToken=${pageToken}`; + const serviceURL = `clusterList?pageSize=50&pageToken=${pageToken}`; const formattedResponse: any = await requestAPI(serviceURL); @@ -158,7 +158,7 @@ export class ClusterService { setProjectName(credentials.project_id || ''); try { - const serviceURL = `clusterDetail?clusterSelected=${clusterSelected}`; + const serviceURL = `clusterDetail?cluster=${clusterSelected}`; let responseResult: any = await requestAPI(serviceURL); responseResult.status.state = @@ -194,7 +194,7 @@ export class ClusterService { timer: any ) => { try { - const serviceURL = `clusterDetail?clusterSelected=${selectedCluster}`; + const serviceURL = `clusterDetail?cluster=${selectedCluster}`; let formattedResponse: any = await requestAPI(serviceURL); formattedResponse.status.state = @@ -226,7 +226,7 @@ export class ClusterService { ) => { setRestartEnabled(true); try { - const serviceURL = `stopCluster?clusterSelected=${selectedCluster}`; + const serviceURL = `stopCluster?cluster=${selectedCluster}`; let formattedResponse: any = await requestAPI(serviceURL, { method: 'POST' @@ -254,7 +254,7 @@ export class ClusterService { static deleteClusterApi = async (selectedcluster: string) => { try { - const serviceURL = `deleteCluster?clusterSelected=${selectedcluster}`; + const serviceURL = `deleteCluster?cluster=${selectedcluster}`; let formattedResponse: any = await requestAPI(serviceURL, { method: 'DELETE' @@ -285,8 +285,8 @@ export class ClusterService { try { const serviceURL = operation === 'stop' - ? `stopCluster?clusterSelected=${selectedcluster}` - : `startCluster?clusterSelected=${selectedcluster}`; + ? `stopCluster?cluster=${selectedcluster}` + : `startCluster?cluster=${selectedcluster}`; let formattedResponse: any = await requestAPI(serviceURL, { method: 'POST' diff --git a/src/cluster/listCluster.tsx b/src/cluster/listCluster.tsx index f9a6998b..8983326e 100644 --- a/src/cluster/listCluster.tsx +++ b/src/cluster/listCluster.tsx @@ -215,23 +215,23 @@ function ListCluster({
ClusterService.startClusterApi(data.clusterName) : undefined } > - {ClusterStatusState[data.status.state.toString()] === ClusterStatus.STATUS_STOPPED && + {ClusterStatusState[data.status.state] === ClusterStatus.STATUS_STOPPED && !restartEnabled ? ( ClusterService.stopClusterApi(data.clusterName) : undefined } > - {ClusterStatusState[data.status.state.toString()] === ClusterStatus.STATUS_RUNNING ? ( + {ClusterStatusState[data.status.state] === ClusterStatus.STATUS_RUNNING ? ( restartClusterApi(data.clusterName) : undefined } > - {ClusterStatusState[data.status.state.toString()] === ClusterStatus.STATUS_RUNNING ? ( + {ClusterStatusState[data.status.state] === ClusterStatus.STATUS_RUNNING ? ( { return { clusterName: data.clusterName diff --git a/src/utils/const.ts b/src/utils/const.ts index c12198ab..7c490595 100644 --- a/src/utils/const.ts +++ b/src/utils/const.ts @@ -68,19 +68,6 @@ export enum ClusterStatus { STATUS_STOPPED = 'STOPPED', STATUS_ACTIVE = 'ACTIVE' } -export const ClusterStatusState: any = { - '0': 'UNKNOWN', - '1': 'CREATING', - '2': 'RUNNING', - '3': 'ERROR', - '4': 'DELETING', - '5': 'UPDATING', - '6': 'STOPPING', - '7': 'STOPPED', - '8': 'STARTING', - '9': 'ERROR_DUE_TO_UPDATE', - '10': 'REPAIRING' -}; export enum BatchStatus { STATUS_PENDING = 'PENDING' } @@ -269,3 +256,16 @@ export const TIER_SELECT_OPTIONS = [ { key: 'standard', value: 'standard', text: 'standard' }, { key: 'premium', value: 'premium', text: 'premium' } ]; +export const ClusterStatusState: any = { + 0: 'UNKNOWN', + 1: STATUS_CREATING, + 2: STATUS_RUNNING, + 3: STATUS_ERROR, + 4: STATUS_DELETING, + 5: 'UPDATING', + 6: STATUS_STOPPING, + 7: STATUS_STOPPED, + 8: STATUS_STARTING, + 9: 'ERROR_DUE_TO_UPDATE', + 10: 'REPAIRING' +}; diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 45f268cb..42c16093 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -266,7 +266,7 @@ export const statusValue = (data: { status: { state: number } }) => { if (data.status.state === 1) { return STATUS_PROVISIONING; } else { - return ClusterStatusState[data.status.state.toString()]; + return ClusterStatusState[data.status.state]; } }; From 2eb3d98912cb9c088be8c594194fcbe080d9abf7 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Fri, 6 Sep 2024 11:28:13 +0530 Subject: [PATCH 15/29] package conflicts resolved in pyproject --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 874a34f4..6672605f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "pydantic~=1.10.0", "bigframes~=0.22.0", "aiohttp~=3.9.5", - "google-cloud-dataproc~=5.10.2" + "google-cloud-dataproc~=5.10.2", "google-cloud-storage~=2.18.2" ] dynamic = ["version", "description", "authors", "urls", "keywords"] From cb818d08c02c707c77968a2066af401f8a6a1cf1 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Fri, 6 Sep 2024 12:09:34 +0530 Subject: [PATCH 16/29] Code review feedback changes BE and FE --- .../controllers/dataproc.py | 10 +++--- dataproc_jupyter_plugin/services/dataproc.py | 32 +++++++++---------- src/cluster/clusterServices.tsx | 6 ---- src/cluster/listCluster.tsx | 25 +++++++-------- src/utils/const.ts | 14 +------- src/utils/utils.ts | 8 ++--- 6 files changed, 37 insertions(+), 58 deletions(-) diff --git a/dataproc_jupyter_plugin/controllers/dataproc.py b/dataproc_jupyter_plugin/controllers/dataproc.py index 925059e1..b6670b75 100644 --- a/dataproc_jupyter_plugin/controllers/dataproc.py +++ b/dataproc_jupyter_plugin/controllers/dataproc.py @@ -49,7 +49,7 @@ async def get(self): cluster_list = await client.list_clusters(page_size, page_token) self.finish(json.dumps(cluster_list)) except Exception as e: - self.log.exception(f"Error fetching cluster list") + self.log.exception(f"Error fetching cluster list: {str(e)}") self.finish({"error": str(e)}) @@ -62,7 +62,7 @@ async def get(self): get_cluster = await client.get_cluster_detail(cluster) self.finish(json.dumps(get_cluster)) except Exception as e: - self.log.exception(f"Error fetching get cluster") + self.log.exception(f"Error fetching a cluster: {str(e)}") self.finish({"error": str(e)}) @@ -75,7 +75,7 @@ async def post(self): stop_cluster = await client.stop_cluster(cluster) self.finish(json.dumps(stop_cluster)) except Exception as e: - self.log.exception(f"Error fetching stop cluster") + self.log.exception(f"Error stopping a cluster: {str(e)}") self.finish({"error": str(e)}) @@ -88,7 +88,7 @@ async def post(self): start_cluster = await client.start_cluster(cluster) self.finish(json.dumps(start_cluster)) except Exception as e: - self.log.exception(f"Error fetching start cluster") + self.log.exception(f"Error starting a cluster: {str(e)}") self.finish({"error": str(e)}) @@ -101,5 +101,5 @@ async def delete(self): delete_cluster = await client.delete_cluster(cluster) self.finish(json.dumps(delete_cluster)) except Exception as e: - self.log.exception(f"Error deleting cluster") + self.log.exception(f"Error deleting a cluster: {str(e)}") self.finish({"error": str(e)}) diff --git a/dataproc_jupyter_plugin/services/dataproc.py b/dataproc_jupyter_plugin/services/dataproc.py index f58633be..5f8f6295 100644 --- a/dataproc_jupyter_plugin/services/dataproc.py +++ b/dataproc_jupyter_plugin/services/dataproc.py @@ -12,18 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import proto +import json +import google.oauth2.credentials as oauth2 +from google.cloud import dataproc_v1 as dataproc +from google.protobuf.empty_pb2 import Empty from dataproc_jupyter_plugin import urls from dataproc_jupyter_plugin.commons.constants import ( CONTENT_TYPE, DATAPROC_SERVICE_NAME, ) -from google.cloud import dataproc_v1 as dataproc -import proto -import json -import google.oauth2.credentials as oauth2 -from google.protobuf.empty_pb2 import Empty - class Client: def __init__(self, credentials, log, client_session=None): @@ -88,11 +87,10 @@ async def list_clusters(self, page_size, page_token): # Handle the response async for response in page_result: - clusters_list.append(json.loads(proto.Message.to_json(response))) - + clusters_list.append(proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False)) return clusters_list except Exception as e: - self.log.exception(f"Error fetching cluster list") + self.log.exception(f"Error fetching cluster list: {str(e)}") return {"error": str(e)} async def get_cluster_detail(self, cluster): @@ -116,9 +114,9 @@ async def get_cluster_detail(self, cluster): response = await client.get_cluster(request=request) # Handle the response - return json.loads(proto.Message.to_json(response)) + return proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False) except Exception as e: - self.log.exception(f"Error fetching cluster detail") + self.log.exception(f"Error fetching cluster detail: {str(e)}") return {"error": str(e)} async def stop_cluster(self, cluster): @@ -142,9 +140,9 @@ async def stop_cluster(self, cluster): response = await operation.result() # Handle the response - return json.loads(proto.Message.to_json(response)) + return proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False) except Exception as e: - self.log.exception(f"Error fetching stop cluster") + self.log.exception(f"Error stopping a cluster: {str(e)}") return {"error": str(e)} async def start_cluster(self, cluster): @@ -168,9 +166,9 @@ async def start_cluster(self, cluster): response = await operation.result() # Handle the response - return json.loads(proto.Message.to_json(response)) + return proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False) except Exception as e: - self.log.exception(f"Error fetching start cluster") + self.log.exception(f"Error starting a cluster: {str(e)}") return {"error": str(e)} async def delete_cluster(self, cluster): @@ -197,7 +195,7 @@ async def delete_cluster(self, cluster): if isinstance(response, Empty): return "Deleted successfully" else: - return json.loads(proto.Message.to_json(response)) + return proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False) except Exception as e: - self.log.exception(f"Error deleting cluster") + self.log.exception(f"Error deleting a cluster: {str(e)}") return {"error": str(e)} diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index 1e51ac7c..7dee0166 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -19,7 +19,6 @@ import { toast } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; import { ClusterStatus, - ClusterStatusState, POLLING_TIME_LIMIT } from '../utils/const'; import { @@ -161,8 +160,6 @@ export class ClusterService { const serviceURL = `clusterDetail?cluster=${clusterSelected}`; let responseResult: any = await requestAPI(serviceURL); - responseResult.status.state = - ClusterStatusState[responseResult.status.state.toString()]; if (responseResult) { if (responseResult.error && responseResult.error.code === 404) { setErrorView(true); @@ -197,9 +194,6 @@ export class ClusterService { const serviceURL = `clusterDetail?cluster=${selectedCluster}`; let formattedResponse: any = await requestAPI(serviceURL); - formattedResponse.status.state = - ClusterStatusState[formattedResponse.status.state.toString()]; - if (formattedResponse.status.state === ClusterStatus.STATUS_STOPPED) { ClusterService.startClusterApi(selectedCluster); clearInterval(timer.current); diff --git a/src/cluster/listCluster.tsx b/src/cluster/listCluster.tsx index 8983326e..009ae163 100644 --- a/src/cluster/listCluster.tsx +++ b/src/cluster/listCluster.tsx @@ -26,7 +26,6 @@ import clusterErrorIcon from '../../style/icons/cluster_error_icon.svg'; import { CREATE_CLUSTER_URL, ClusterStatus, - ClusterStatusState, STATUS_CREATING, STATUS_DELETING, STATUS_ERROR, @@ -215,23 +214,23 @@ function ListCluster({
ClusterService.startClusterApi(data.clusterName) : undefined } > - {ClusterStatusState[data.status.state] === ClusterStatus.STATUS_STOPPED && + {data.status.state === ClusterStatus.STATUS_STOPPED && !restartEnabled ? ( ClusterService.stopClusterApi(data.clusterName) : undefined } > - {ClusterStatusState[data.status.state] === ClusterStatus.STATUS_RUNNING ? ( + {data.status.state === ClusterStatus.STATUS_RUNNING ? ( restartClusterApi(data.clusterName) : undefined } > - {ClusterStatusState[data.status.state] === ClusterStatus.STATUS_RUNNING ? ( + {data.status.state === ClusterStatus.STATUS_RUNNING ? ( { } }; -export const statusValue = (data: { status: { state: number } }) => { - if (data.status.state === 1) { +export const statusValue = (data: { status: { state: string } }) => { + if (data.status.state === STATUS_CREATING) { return STATUS_PROVISIONING; } else { - return ClusterStatusState[data.status.state]; + return data.status.state; } }; From 8a6072abe735b87f1c3b99364bc3d2fdbe838c5b Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Fri, 6 Sep 2024 12:10:50 +0530 Subject: [PATCH 17/29] line space removed --- src/utils/const.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utils/const.ts b/src/utils/const.ts index 31809c47..48e4c986 100644 --- a/src/utils/const.ts +++ b/src/utils/const.ts @@ -256,4 +256,3 @@ export const TIER_SELECT_OPTIONS = [ { key: 'standard', value: 'standard', text: 'standard' }, { key: 'premium', value: 'premium', text: 'premium' } ]; - From c5ae698d97aad5761eb3f75aab77650fde0b2308 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Fri, 6 Sep 2024 15:15:24 +0530 Subject: [PATCH 18/29] api endpoint changes in client library --- .../controllers/dataproc.py | 30 +++++++--- dataproc_jupyter_plugin/services/dataproc.py | 56 +++++++++++-------- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/dataproc_jupyter_plugin/controllers/dataproc.py b/dataproc_jupyter_plugin/controllers/dataproc.py index b6670b75..c75e89c1 100644 --- a/dataproc_jupyter_plugin/controllers/dataproc.py +++ b/dataproc_jupyter_plugin/controllers/dataproc.py @@ -13,12 +13,11 @@ # limitations under the License. import json - import aiohttp import tornado from jupyter_server.base.handlers import APIHandler - -from dataproc_jupyter_plugin import credentials +from dataproc_jupyter_plugin.commons.constants import DATAPROC_SERVICE_NAME +from dataproc_jupyter_plugin import credentials, urls from dataproc_jupyter_plugin.services import dataproc @@ -45,7 +44,10 @@ async def get(self): try: page_token = self.get_argument("pageToken") page_size = self.get_argument("pageSize") - client = dataproc.Client(await credentials.get_cached(), self.log) + dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) + client = dataproc.Client( + await credentials.get_cached(), self.log, dataproc_url + ) cluster_list = await client.list_clusters(page_size, page_token) self.finish(json.dumps(cluster_list)) except Exception as e: @@ -58,7 +60,10 @@ class ClusterDetailController(APIHandler): async def get(self): try: cluster = self.get_argument("cluster") - client = dataproc.Client(await credentials.get_cached(), self.log) + dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) + client = dataproc.Client( + await credentials.get_cached(), self.log, dataproc_url + ) get_cluster = await client.get_cluster_detail(cluster) self.finish(json.dumps(get_cluster)) except Exception as e: @@ -71,7 +76,10 @@ class StopClusterController(APIHandler): async def post(self): try: cluster = self.get_argument("cluster") - client = dataproc.Client(await credentials.get_cached(), self.log) + dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) + client = dataproc.Client( + await credentials.get_cached(), self.log, dataproc_url + ) stop_cluster = await client.stop_cluster(cluster) self.finish(json.dumps(stop_cluster)) except Exception as e: @@ -84,7 +92,10 @@ class StartClusterController(APIHandler): async def post(self): try: cluster = self.get_argument("cluster") - client = dataproc.Client(await credentials.get_cached(), self.log) + dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) + client = dataproc.Client( + await credentials.get_cached(), self.log, dataproc_url + ) start_cluster = await client.start_cluster(cluster) self.finish(json.dumps(start_cluster)) except Exception as e: @@ -97,7 +108,10 @@ class DeleteClusterController(APIHandler): async def delete(self): try: cluster = self.get_argument("cluster") - client = dataproc.Client(await credentials.get_cached(), self.log) + dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) + client = dataproc.Client( + await credentials.get_cached(), self.log, dataproc_url + ) delete_cluster = await client.delete_cluster(cluster) self.finish(json.dumps(delete_cluster)) except Exception as e: diff --git a/dataproc_jupyter_plugin/services/dataproc.py b/dataproc_jupyter_plugin/services/dataproc.py index 5f8f6295..2d216fa5 100644 --- a/dataproc_jupyter_plugin/services/dataproc.py +++ b/dataproc_jupyter_plugin/services/dataproc.py @@ -13,7 +13,6 @@ # limitations under the License. import proto -import json import google.oauth2.credentials as oauth2 from google.cloud import dataproc_v1 as dataproc from google.protobuf.empty_pb2 import Empty @@ -25,7 +24,7 @@ class Client: - def __init__(self, credentials, log, client_session=None): + def __init__(self, credentials, log, dataproc_url, client_session=None): self.log = log if not ( ("access_token" in credentials) @@ -38,6 +37,7 @@ def __init__(self, credentials, log, client_session=None): self.project_id = credentials["project_id"] self.region_id = credentials["region_id"] self.client_session = client_session + self.api_endpoint = f"{self.region_id}-{dataproc_url.split('/')[2]}:443" def create_headers(self): return { @@ -67,9 +67,7 @@ async def list_clusters(self, page_size, page_token): try: # Create a client client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, + client_options={"api_endpoint": self.api_endpoint}, credentials=oauth2.Credentials(self._access_token), ) @@ -87,7 +85,13 @@ async def list_clusters(self, page_size, page_token): # Handle the response async for response in page_result: - clusters_list.append(proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False)) + clusters_list.append( + proto.Message.to_dict( + response, + use_integers_for_enums=False, + preserving_proto_field_name=False, + ) + ) return clusters_list except Exception as e: self.log.exception(f"Error fetching cluster list: {str(e)}") @@ -97,9 +101,7 @@ async def get_cluster_detail(self, cluster): try: # Create a client client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, + client_options={"api_endpoint": self.api_endpoint}, credentials=oauth2.Credentials(self._access_token), ) @@ -114,7 +116,11 @@ async def get_cluster_detail(self, cluster): response = await client.get_cluster(request=request) # Handle the response - return proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False) + return proto.Message.to_dict( + response, + use_integers_for_enums=False, + preserving_proto_field_name=False, + ) except Exception as e: self.log.exception(f"Error fetching cluster detail: {str(e)}") return {"error": str(e)} @@ -123,9 +129,7 @@ async def stop_cluster(self, cluster): try: # Create a client client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, + client_options={"api_endpoint": self.api_endpoint}, credentials=oauth2.Credentials(self._access_token), ) @@ -140,7 +144,11 @@ async def stop_cluster(self, cluster): response = await operation.result() # Handle the response - return proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False) + return proto.Message.to_dict( + response, + use_integers_for_enums=False, + preserving_proto_field_name=False, + ) except Exception as e: self.log.exception(f"Error stopping a cluster: {str(e)}") return {"error": str(e)} @@ -149,9 +157,7 @@ async def start_cluster(self, cluster): try: # Create a client client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, + client_options={"api_endpoint": self.api_endpoint}, credentials=oauth2.Credentials(self._access_token), ) @@ -166,7 +172,11 @@ async def start_cluster(self, cluster): response = await operation.result() # Handle the response - return proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False) + return proto.Message.to_dict( + response, + use_integers_for_enums=False, + preserving_proto_field_name=False, + ) except Exception as e: self.log.exception(f"Error starting a cluster: {str(e)}") return {"error": str(e)} @@ -175,9 +185,7 @@ async def delete_cluster(self, cluster): try: # Create a client client = dataproc.ClusterControllerAsyncClient( - client_options={ - "api_endpoint": f"us-central1-dataproc.googleapis.com:443" - }, + client_options={"api_endpoint": self.api_endpoint}, credentials=oauth2.Credentials(self._access_token), ) @@ -195,7 +203,11 @@ async def delete_cluster(self, cluster): if isinstance(response, Empty): return "Deleted successfully" else: - return proto.Message.to_dict(response, use_integers_for_enums=False, preserving_proto_field_name=False) + return proto.Message.to_dict( + response, + use_integers_for_enums=False, + preserving_proto_field_name=False, + ) except Exception as e: self.log.exception(f"Error deleting a cluster: {str(e)}") return {"error": str(e)} From 6daa12406fc6708932b49dd29640026b08b2ef30 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Fri, 6 Sep 2024 16:07:32 +0530 Subject: [PATCH 19/29] prettier changes --- src/cluster/clusterServices.tsx | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/cluster/clusterServices.tsx b/src/cluster/clusterServices.tsx index 7dee0166..e7401ce8 100644 --- a/src/cluster/clusterServices.tsx +++ b/src/cluster/clusterServices.tsx @@ -17,10 +17,7 @@ import { toast } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; -import { - ClusterStatus, - POLLING_TIME_LIMIT -} from '../utils/const'; +import { ClusterStatus, POLLING_TIME_LIMIT } from '../utils/const'; import { authApi, toastifyCustomStyle, @@ -253,7 +250,7 @@ export class ClusterService { let formattedResponse: any = await requestAPI(serviceURL, { method: 'DELETE' }); - + if (formattedResponse?.error) { toast.error(formattedResponse?.error, toastifyCustomStyle); } else { @@ -262,7 +259,6 @@ export class ClusterService { toastifyCustomStyle ); } - } catch (error) { DataprocLoggingService.log('Error deleting cluster', LOG_LEVEL.ERROR); toast.error( From 7f647952751ef1572655299fc6eb0f7bbd3a1bed Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 9 Sep 2024 13:08:23 +0530 Subject: [PATCH 20/29] runtime list code review fix --- dataproc_jupyter_plugin/controllers/dataproc.py | 3 ++- dataproc_jupyter_plugin/services/dataproc.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dataproc_jupyter_plugin/controllers/dataproc.py b/dataproc_jupyter_plugin/controllers/dataproc.py index c75e89c1..d19a5fea 100644 --- a/dataproc_jupyter_plugin/controllers/dataproc.py +++ b/dataproc_jupyter_plugin/controllers/dataproc.py @@ -27,9 +27,10 @@ async def get(self): try: page_token = self.get_argument("pageToken") page_size = self.get_argument("pageSize") + dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) async with aiohttp.ClientSession() as client_session: client = dataproc.Client( - await credentials.get_cached(), self.log, client_session + await credentials.get_cached(), self.log, dataproc_url, client_session ) runtime_list = await client.list_runtime(page_size, page_token) self.finish(json.dumps(runtime_list)) diff --git a/dataproc_jupyter_plugin/services/dataproc.py b/dataproc_jupyter_plugin/services/dataproc.py index 2d216fa5..8c739351 100644 --- a/dataproc_jupyter_plugin/services/dataproc.py +++ b/dataproc_jupyter_plugin/services/dataproc.py @@ -37,6 +37,7 @@ def __init__(self, credentials, log, dataproc_url, client_session=None): self.project_id = credentials["project_id"] self.region_id = credentials["region_id"] self.client_session = client_session + self.dataproc_url = dataproc_url self.api_endpoint = f"{self.region_id}-{dataproc_url.split('/')[2]}:443" def create_headers(self): @@ -47,8 +48,7 @@ def create_headers(self): async def list_runtime(self, page_size, page_token): try: - dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) - api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}" + api_endpoint = f"{self.dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}" async with self.client_session.get( api_endpoint, headers=self.create_headers() ) as response: From a734f1ef873f096428fa9831e82028df3582e8d9 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Thu, 12 Sep 2024 11:45:08 +0530 Subject: [PATCH 21/29] changed all package versions >= instead ~= --- pyproject.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6672605f..91bbe0a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,11 +27,11 @@ dependencies = [ "google-cloud-jupyter-config>=0.0.10", "kernels-mixer>=0.0.13", "pendulum>=3.0.0", - "pydantic~=1.10.0", - "bigframes~=0.22.0", - "aiohttp~=3.9.5", - "google-cloud-dataproc~=5.10.2", - "google-cloud-storage~=2.18.2" + "pydantic>=1.10.0", + "bigframes>=0.22.0", + "aiohttp>=3.9.5", + "google-cloud-dataproc>=5.10.2", + "google-cloud-storage>=2.18.2" ] dynamic = ["version", "description", "authors", "urls", "keywords"] From ff65fda7db10942cdc7b5197723c3a3c707fe86f Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Thu, 12 Sep 2024 21:50:49 +0530 Subject: [PATCH 22/29] code format fix python --- dataproc_jupyter_plugin/controllers/dataproc.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dataproc_jupyter_plugin/controllers/dataproc.py b/dataproc_jupyter_plugin/controllers/dataproc.py index d19a5fea..06ae8448 100644 --- a/dataproc_jupyter_plugin/controllers/dataproc.py +++ b/dataproc_jupyter_plugin/controllers/dataproc.py @@ -30,7 +30,10 @@ async def get(self): dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) async with aiohttp.ClientSession() as client_session: client = dataproc.Client( - await credentials.get_cached(), self.log, dataproc_url, client_session + await credentials.get_cached(), + self.log, + dataproc_url, + client_session, ) runtime_list = await client.list_runtime(page_size, page_token) self.finish(json.dumps(runtime_list)) From 7580728cb281dbfc90a81eabdbe5dc7d75a276e1 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Fri, 13 Sep 2024 13:42:06 +0530 Subject: [PATCH 23/29] pyproject version change --- pyproject.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 91bbe0a6..cbf5ece4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,10 +28,10 @@ dependencies = [ "kernels-mixer>=0.0.13", "pendulum>=3.0.0", "pydantic>=1.10.0", - "bigframes>=0.22.0", - "aiohttp>=3.9.5", + "bigframes~=0.22.0", + "aiohttp~=3.9.5", "google-cloud-dataproc>=5.10.2", - "google-cloud-storage>=2.18.2" + "google-cloud-storage~=2.18.2" ] dynamic = ["version", "description", "authors", "urls", "keywords"] From 36dfd9ebb70820bb651e532342bb79780a4006b0 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Fri, 13 Sep 2024 14:21:22 +0530 Subject: [PATCH 24/29] pyproject change '>=' from '~=' --- pyproject.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cbf5ece4..91bbe0a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,10 +28,10 @@ dependencies = [ "kernels-mixer>=0.0.13", "pendulum>=3.0.0", "pydantic>=1.10.0", - "bigframes~=0.22.0", - "aiohttp~=3.9.5", + "bigframes>=0.22.0", + "aiohttp>=3.9.5", "google-cloud-dataproc>=5.10.2", - "google-cloud-storage~=2.18.2" + "google-cloud-storage>=2.18.2" ] dynamic = ["version", "description", "authors", "urls", "keywords"] From a3edf3d6eb670995c04fefccbee8b168f030d3a3 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 16 Sep 2024 16:58:20 +0530 Subject: [PATCH 25/29] client library test for list cluster --- .../tests/test_dataproc.py | 58 +++++++++++++------ 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/dataproc_jupyter_plugin/tests/test_dataproc.py b/dataproc_jupyter_plugin/tests/test_dataproc.py index d914c32b..fefe9eab 100644 --- a/dataproc_jupyter_plugin/tests/test_dataproc.py +++ b/dataproc_jupyter_plugin/tests/test_dataproc.py @@ -13,29 +13,53 @@ # limitations under the License. import json +from unittest import mock from dataproc_jupyter_plugin.tests import mocks +from google.cloud.dataproc_v1.services.cluster_controller import ( + ClusterControllerClient, + pagers, +) +from google.auth import credentials as ga_credentials +from google.cloud.dataproc_v1.types import clusters +import pytest -async def test_list_clusters(monkeypatch, jp_fetch): - mocks.patch_mocks(monkeypatch) - mock_project_id = "credentials-project" - mock_page_token = "mock-page-token" - mock_region_id = "mock-region" - mock_page_size = "mock_page_size" - response = await jp_fetch( - "dataproc-plugin", - "clusterList", - params={"pageSize": mock_page_size, "pageToken": mock_page_token}, - ) - assert response.code == 200 - payload = json.loads(response.body) - assert ( - payload["api_endpoint"] - == f"https://dataproc.googleapis.com//v1/projects/credentials-project/regions/{mock_region_id}/clusters?pageSize={mock_page_size}&pageToken={mock_page_token}" +@pytest.mark.parametrize( + "request_type", + [ + clusters.ListClustersRequest, + dict, + ], +) +def test_list_clusters(request_type, transport: str = "grpc"): + client = ClusterControllerClient( + credentials=ga_credentials.AnonymousCredentials(), + transport=transport, ) - assert payload["headers"]["Authorization"] == f"Bearer mock-token" + + # Everything is optional in proto3 as far as the runtime is concerned, + # and we are mocking out the actual API, so just send an empty request. + request = request_type() + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.list_clusters), "__call__") as call: + # Designate an appropriate return value for the call. + call.return_value = clusters.ListClustersResponse( + next_page_token="next_page_token_value", + ) + response = client.list_clusters(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + request = clusters.ListClustersRequest() + assert args[0] == request + + # Establish that the response is the type that we expect. + assert isinstance(response, pagers.ListClustersPager) + assert response.next_page_token == "next_page_token_value" async def test_list_runtime(monkeypatch, jp_fetch): From d8e35676a6a8eebf2c60df2e2a08fdb0afea3379 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 16 Sep 2024 17:50:13 +0530 Subject: [PATCH 26/29] aioHttp pyproject version change --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 91bbe0a6..2be63789 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ "pendulum>=3.0.0", "pydantic>=1.10.0", "bigframes>=0.22.0", - "aiohttp>=3.9.5", + "aiohttp~=3.9.5", "google-cloud-dataproc>=5.10.2", "google-cloud-storage>=2.18.2" ] From f87ce9300576b6078407e2f745e952b5795b00c5 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 16 Sep 2024 18:16:51 +0530 Subject: [PATCH 27/29] list cluster revert changes test --- .../tests/test_dataproc.py | 58 ++++++------------- 1 file changed, 17 insertions(+), 41 deletions(-) diff --git a/dataproc_jupyter_plugin/tests/test_dataproc.py b/dataproc_jupyter_plugin/tests/test_dataproc.py index fefe9eab..d914c32b 100644 --- a/dataproc_jupyter_plugin/tests/test_dataproc.py +++ b/dataproc_jupyter_plugin/tests/test_dataproc.py @@ -13,53 +13,29 @@ # limitations under the License. import json -from unittest import mock from dataproc_jupyter_plugin.tests import mocks -from google.cloud.dataproc_v1.services.cluster_controller import ( - ClusterControllerClient, - pagers, -) -from google.auth import credentials as ga_credentials -from google.cloud.dataproc_v1.types import clusters -import pytest +async def test_list_clusters(monkeypatch, jp_fetch): + mocks.patch_mocks(monkeypatch) -@pytest.mark.parametrize( - "request_type", - [ - clusters.ListClustersRequest, - dict, - ], -) -def test_list_clusters(request_type, transport: str = "grpc"): - client = ClusterControllerClient( - credentials=ga_credentials.AnonymousCredentials(), - transport=transport, + mock_project_id = "credentials-project" + mock_page_token = "mock-page-token" + mock_region_id = "mock-region" + mock_page_size = "mock_page_size" + response = await jp_fetch( + "dataproc-plugin", + "clusterList", + params={"pageSize": mock_page_size, "pageToken": mock_page_token}, ) - - # Everything is optional in proto3 as far as the runtime is concerned, - # and we are mocking out the actual API, so just send an empty request. - request = request_type() - - # Mock the actual call within the gRPC stub, and fake the request. - with mock.patch.object(type(client.transport.list_clusters), "__call__") as call: - # Designate an appropriate return value for the call. - call.return_value = clusters.ListClustersResponse( - next_page_token="next_page_token_value", - ) - response = client.list_clusters(request) - - # Establish that the underlying gRPC stub method was called. - assert len(call.mock_calls) == 1 - _, args, _ = call.mock_calls[0] - request = clusters.ListClustersRequest() - assert args[0] == request - - # Establish that the response is the type that we expect. - assert isinstance(response, pagers.ListClustersPager) - assert response.next_page_token == "next_page_token_value" + assert response.code == 200 + payload = json.loads(response.body) + assert ( + payload["api_endpoint"] + == f"https://dataproc.googleapis.com//v1/projects/credentials-project/regions/{mock_region_id}/clusters?pageSize={mock_page_size}&pageToken={mock_page_token}" + ) + assert payload["headers"]["Authorization"] == f"Bearer mock-token" async def test_list_runtime(monkeypatch, jp_fetch): From f656955e118dd42704393455c953828435a75145 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 16 Sep 2024 18:21:21 +0530 Subject: [PATCH 28/29] test changes for list cluster --- .../tests/test_dataproc.py | 59 +++++++++++++------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/dataproc_jupyter_plugin/tests/test_dataproc.py b/dataproc_jupyter_plugin/tests/test_dataproc.py index d914c32b..528d655a 100644 --- a/dataproc_jupyter_plugin/tests/test_dataproc.py +++ b/dataproc_jupyter_plugin/tests/test_dataproc.py @@ -13,29 +13,54 @@ # limitations under the License. import json +from unittest import mock from dataproc_jupyter_plugin.tests import mocks +from google.cloud.dataproc_v1.services.cluster_controller import ( + ClusterControllerClient, + pagers +) +from google.auth import credentials as ga_credentials +from google.cloud.dataproc_v1.types import clusters +import pytest -async def test_list_clusters(monkeypatch, jp_fetch): - mocks.patch_mocks(monkeypatch) - mock_project_id = "credentials-project" - mock_page_token = "mock-page-token" - mock_region_id = "mock-region" - mock_page_size = "mock_page_size" - response = await jp_fetch( - "dataproc-plugin", - "clusterList", - params={"pageSize": mock_page_size, "pageToken": mock_page_token}, - ) - assert response.code == 200 - payload = json.loads(response.body) - assert ( - payload["api_endpoint"] - == f"https://dataproc.googleapis.com//v1/projects/credentials-project/regions/{mock_region_id}/clusters?pageSize={mock_page_size}&pageToken={mock_page_token}" +@pytest.mark.parametrize( + "request_type", + [ + clusters.ListClustersRequest, + dict, + ], +) + +def test_list_clusters(request_type, transport: str = "grpc"): + client = ClusterControllerClient( + credentials=ga_credentials.AnonymousCredentials(), + transport=transport, ) - assert payload["headers"]["Authorization"] == f"Bearer mock-token" + + # Everything is optional in proto3 as far as the runtime is concerned, + # and we are mocking out the actual API, so just send an empty request. + request = request_type() + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.list_clusters), "__call__") as call: + # Designate an appropriate return value for the call. + call.return_value = clusters.ListClustersResponse( + next_page_token="next_page_token_value", + ) + response = client.list_clusters(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + request = clusters.ListClustersRequest() + assert args[0] == request + + # Establish that the response is the type that we expect. + assert isinstance(response, pagers.ListClustersPager) + assert response.next_page_token == "next_page_token_value" async def test_list_runtime(monkeypatch, jp_fetch): From 218ecde9db4984ec900b7b6db6c423d5694fd6b0 Mon Sep 17 00:00:00 2001 From: Jeyaprakash-NK Date: Mon, 23 Sep 2024 15:39:30 +0530 Subject: [PATCH 29/29] list cluster test file changes --- .../tests/test_dataproc.py | 51 ++++--------------- 1 file changed, 9 insertions(+), 42 deletions(-) diff --git a/dataproc_jupyter_plugin/tests/test_dataproc.py b/dataproc_jupyter_plugin/tests/test_dataproc.py index 528d655a..c9e53df1 100644 --- a/dataproc_jupyter_plugin/tests/test_dataproc.py +++ b/dataproc_jupyter_plugin/tests/test_dataproc.py @@ -13,54 +13,21 @@ # limitations under the License. import json -from unittest import mock from dataproc_jupyter_plugin.tests import mocks -from google.cloud.dataproc_v1.services.cluster_controller import ( - ClusterControllerClient, - pagers -) -from google.auth import credentials as ga_credentials -from google.cloud.dataproc_v1.types import clusters -import pytest +async def test_list_clusters(monkeypatch, jp_fetch): + mocks.patch_mocks(monkeypatch) -@pytest.mark.parametrize( - "request_type", - [ - clusters.ListClustersRequest, - dict, - ], -) - -def test_list_clusters(request_type, transport: str = "grpc"): - client = ClusterControllerClient( - credentials=ga_credentials.AnonymousCredentials(), - transport=transport, + mock_page_token = "mock-page-token" + mock_page_size = 1 + response = await jp_fetch( + "dataproc-plugin", + "clusterList", + params={"pageSize": mock_page_size, "pageToken": mock_page_token}, ) - - # Everything is optional in proto3 as far as the runtime is concerned, - # and we are mocking out the actual API, so just send an empty request. - request = request_type() - - # Mock the actual call within the gRPC stub, and fake the request. - with mock.patch.object(type(client.transport.list_clusters), "__call__") as call: - # Designate an appropriate return value for the call. - call.return_value = clusters.ListClustersResponse( - next_page_token="next_page_token_value", - ) - response = client.list_clusters(request) - - # Establish that the underlying gRPC stub method was called. - assert len(call.mock_calls) == 1 - _, args, _ = call.mock_calls[0] - request = clusters.ListClustersRequest() - assert args[0] == request - - # Establish that the response is the type that we expect. - assert isinstance(response, pagers.ListClustersPager) - assert response.next_page_token == "next_page_token_value" + assert response.code == 200 async def test_list_runtime(monkeypatch, jp_fetch):