Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster - Client library API migration changes #177

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
14a7916
list and get cluster temporary changes
Jeyaprakash-NK Apr 15, 2024
75e1576
Merge branch 'main' of https://github.com/Shubha-accenture/dataproc-j…
Jeyaprakash-NK Apr 15, 2024
c8f9643
Merge branch 'main' of https://github.com/Shubha-accenture/dataproc-j…
Jeyaprakash-NK Apr 16, 2024
654b621
cluster service BE code change
Jeyaprakash-NK Apr 16, 2024
9659c02
list cluster client library temp changes
Jeyaprakash-NK Apr 17, 2024
3708794
list and get cluster api status changes
Jeyaprakash-NK Aug 8, 2024
25e25c1
stop cluster BE and FE temp changes
Jeyaprakash-NK Aug 12, 2024
cdec0d7
controller rename changes
Jeyaprakash-NK Aug 12, 2024
2a37f2c
latest pull from main and conflicts resolved
Jeyaprakash-NK Aug 12, 2024
d539d90
start cluster and BE temp changes
Jeyaprakash-NK Aug 12, 2024
1d60038
Service file rename changes
Jeyaprakash-NK Aug 12, 2024
611fbac
Merge branch 'main' of https://github.com/Shubha-accenture/dataproc-j…
Jeyaprakash-NK Aug 14, 2024
cd84677
delete cluster and auth access token fix
Jeyaprakash-NK Aug 19, 2024
2b7b95b
await changes in start, stop, delete
Jeyaprakash-NK Aug 19, 2024
9b71492
delete cluster empty handled
Jeyaprakash-NK Aug 19, 2024
312902a
added new dependency "google-cloud-dataproc"
Jeyaprakash-NK Aug 20, 2024
68c21db
code cleanup
Jeyaprakash-NK Aug 22, 2024
4d70971
Code review comments fix
Jeyaprakash-NK Sep 5, 2024
65d004f
pull from main and conflicts resolved
Jeyaprakash-NK Sep 6, 2024
2eb3d98
package conflicts resolved in pyproject
Jeyaprakash-NK Sep 6, 2024
cb818d0
Code review feedback changes BE and FE
Jeyaprakash-NK Sep 6, 2024
8a6072a
line space removed
Jeyaprakash-NK Sep 6, 2024
c5ae698
api endpoint changes in client library
Jeyaprakash-NK Sep 6, 2024
6daa124
prettier changes
Jeyaprakash-NK Sep 6, 2024
7f64795
runtime list code review fix
Jeyaprakash-NK Sep 9, 2024
50f2d2d
Merge branch 'main' of https://github.com/Shubha-accenture/dataproc-j…
Jeyaprakash-NK Sep 11, 2024
a734f1e
changed all package versions >= instead ~=
Jeyaprakash-NK Sep 12, 2024
ff65fda
code format fix python
Jeyaprakash-NK Sep 12, 2024
7580728
pyproject version change
Jeyaprakash-NK Sep 13, 2024
36dfd9e
pyproject change '>=' from '~='
Jeyaprakash-NK Sep 13, 2024
a3edf3d
client library test for list cluster
Jeyaprakash-NK Sep 16, 2024
d8e3567
aioHttp pyproject version change
Jeyaprakash-NK Sep 16, 2024
f87ce93
list cluster revert changes test
Jeyaprakash-NK Sep 16, 2024
f656955
test changes for list cluster
Jeyaprakash-NK Sep 16, 2024
218ecde
list cluster test file changes
Jeyaprakash-NK Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 80 additions & 16 deletions dataproc_jupyter_plugin/controllers/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,108 @@
# limitations under the License.

import json

import aiohttp
import tornado
from jupyter_server.base.handlers import APIHandler

from dataproc_jupyter_plugin import credentials
from dataproc_jupyter_plugin.commons.constants import DATAPROC_SERVICE_NAME
from dataproc_jupyter_plugin import credentials, urls
from dataproc_jupyter_plugin.services import dataproc


class ClusterListController(APIHandler):
class RuntimeController(APIHandler):
@tornado.web.authenticated
async def get(self):
try:
page_token = self.get_argument("pageToken")
page_size = self.get_argument("pageSize")
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME)
async with aiohttp.ClientSession() as client_session:
client = dataproc.Client(
await credentials.get_cached(), self.log, client_session
await credentials.get_cached(), self.log, dataproc_url, client_session
)
cluster_list = await client.list_clusters(page_size, page_token)
self.finish(json.dumps(cluster_list))
runtime_list = await client.list_runtime(page_size, page_token)
self.finish(json.dumps(runtime_list))
except Exception as e:
self.log.exception("Error fetching cluster list")
self.log.exception(f"Error fetching runtime template list: {str(e)}")
self.finish({"error": str(e)})


class RuntimeController(APIHandler):
class ClusterListController(APIHandler):
@tornado.web.authenticated
async def get(self):
try:
page_token = self.get_argument("pageToken")
page_size = self.get_argument("pageSize")
async with aiohttp.ClientSession() as client_session:
client = dataproc.Client(
await credentials.get_cached(), self.log, client_session
)
runtime_list = await client.list_runtime(page_size, page_token)
self.finish(json.dumps(runtime_list))
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME)
client = dataproc.Client(
await credentials.get_cached(), self.log, dataproc_url
)
cluster_list = await client.list_clusters(page_size, page_token)
self.finish(json.dumps(cluster_list))
except Exception as e:
self.log.exception(f"Error fetching runtime template list: {str(e)}")
self.log.exception(f"Error fetching cluster list: {str(e)}")
self.finish({"error": str(e)})


class ClusterDetailController(APIHandler):
@tornado.web.authenticated
async def get(self):
try:
cluster = self.get_argument("cluster")
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME)
client = dataproc.Client(
await credentials.get_cached(), self.log, dataproc_url
)
get_cluster = await client.get_cluster_detail(cluster)
self.finish(json.dumps(get_cluster))
except Exception as e:
self.log.exception(f"Error fetching a cluster: {str(e)}")
self.finish({"error": str(e)})


class StopClusterController(APIHandler):
@tornado.web.authenticated
async def post(self):
try:
cluster = self.get_argument("cluster")
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME)
client = dataproc.Client(
await credentials.get_cached(), self.log, dataproc_url
)
stop_cluster = await client.stop_cluster(cluster)
self.finish(json.dumps(stop_cluster))
except Exception as e:
self.log.exception(f"Error stopping a cluster: {str(e)}")
self.finish({"error": str(e)})


class StartClusterController(APIHandler):
@tornado.web.authenticated
async def post(self):
try:
cluster = self.get_argument("cluster")
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME)
client = dataproc.Client(
await credentials.get_cached(), self.log, dataproc_url
)
start_cluster = await client.start_cluster(cluster)
self.finish(json.dumps(start_cluster))
except Exception as e:
self.log.exception(f"Error starting a cluster: {str(e)}")
self.finish({"error": str(e)})


class DeleteClusterController(APIHandler):
@tornado.web.authenticated
async def delete(self):
try:
cluster = self.get_argument("cluster")
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME)
client = dataproc.Client(
await credentials.get_cached(), self.log, dataproc_url
)
delete_cluster = await client.delete_cluster(cluster)
self.finish(json.dumps(delete_cluster))
except Exception as e:
self.log.exception(f"Error deleting a cluster: {str(e)}")
self.finish({"error": str(e)})
4 changes: 4 additions & 0 deletions dataproc_jupyter_plugin/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ def full_path(name):
"dagRunTask": airflow.DagRunTaskController,
"dagRunTaskLogs": airflow.DagRunTaskLogsController,
"clusterList": dataproc.ClusterListController,
"clusterDetail": dataproc.ClusterDetailController,
"stopCluster": dataproc.StopClusterController,
"startCluster": dataproc.StartClusterController,
"deleteCluster": dataproc.DeleteClusterController,
"runtimeList": dataproc.RuntimeController,
"createJobScheduler": executor.ExecutorController,
"dagList": airflow.DagListController,
Expand Down
175 changes: 155 additions & 20 deletions dataproc_jupyter_plugin/services/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import proto
import google.oauth2.credentials as oauth2
from google.cloud import dataproc_v1 as dataproc
from google.protobuf.empty_pb2 import Empty
from dataproc_jupyter_plugin import urls
from dataproc_jupyter_plugin.commons.constants import (
CONTENT_TYPE,
Expand All @@ -20,7 +24,7 @@


class Client:
def __init__(self, credentials, log, client_session):
def __init__(self, credentials, log, dataproc_url, client_session=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will break the runtime list functionality since it isn't passing in the dataproc URL.

Did you test that?

self.log = log
if not (
("access_token" in credentials)
Expand All @@ -33,17 +37,18 @@ def __init__(self, credentials, log, client_session):
self.project_id = credentials["project_id"]
self.region_id = credentials["region_id"]
self.client_session = client_session
self.dataproc_url = dataproc_url
self.api_endpoint = f"{self.region_id}-{dataproc_url.split('/')[2]}:443"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you including the region ID in the hostname? Is that required? The upstream client codebase would suggest that it is not, and I would expect that to break any customers who have their own hostname defined (as they are unlikely to support the region being added onto it).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client = dataproc.ClusterControllerAsyncClient(
client_options={"api_endpoint": self.api_endpoint},
credentials=oauth2.Credentials(self._access_token),
)

when removing client_options api_endpoint getting the below error.

raise exceptions.from_grpc_error(rpc_error) from rpc_error
google.api_core.exceptions.InvalidArgument: 400 Region 'us-central1' specified in request does not match endpoint region 'global'. To use 'us-central1' region, specify 'us-central1' region in request and configure client to use 'us-central1-dataproc.googleapis.com:443' endpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so it seems clear that the region name is required for the Dataproc API when not using an API endpoint override.

Our support for API endpoint overrides is primarily to support users of private service connect, so I went ahead and created a private service connect endpoint to access the Dataproc API, and tested it out to see how the expected DNS name in that case compares to the DNS name when not using private service connect.

It turns out that when using the default DNS names (e.g. dataproc-<ENDPOINT>.p.googleapis.com), you also have to add a prefix on the domain name for the region, or else you get this same error.

As such, my concerns appear to have been unwarranted, and we do in fact want to add on the {region}- prefix onto the domain name for the API endpoint override.


def create_headers(self):
return {
"Content-Type": CONTENT_TYPE,
"Authorization": f"Bearer {self._access_token}",
}

async def list_clusters(self, page_size, page_token):
async def list_runtime(self, page_size, page_token):
try:
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME)
api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/regions/{self.region_id}/clusters?pageSize={page_size}&pageToken={page_token}"
api_endpoint = f"{self.dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}"
async with self.client_session.get(
api_endpoint, headers=self.create_headers()
) as response:
Expand All @@ -52,27 +57,157 @@ async def list_clusters(self, page_size, page_token):
return resp
else:
return {
"error": f"Failed to fetch clusters: {response.status} {await response.text()}"
"error": f"Failed to fetch runtimes: {response.status} {await response.text()}"
}
except Exception as e:
self.log.exception(f"Error fetching runtime list: {str(e)}")
return {"error": str(e)}

async def list_clusters(self, page_size, page_token):
try:
# Create a client
client = dataproc.ClusterControllerAsyncClient(
client_options={"api_endpoint": self.api_endpoint},
credentials=oauth2.Credentials(self._access_token),
)

# Initialize request argument(s)
request = dataproc.ListClustersRequest(
project_id=self.project_id,
page_size=int(page_size),
page_token=page_token,
region=self.region_id,
)

# Make the request
page_result = await client.list_clusters(request=request)
clusters_list = []

# Handle the response
async for response in page_result:
clusters_list.append(
proto.Message.to_dict(
response,
use_integers_for_enums=False,
preserving_proto_field_name=False,
)
)
return clusters_list
except Exception as e:
self.log.exception("Error fetching cluster list")
self.log.exception(f"Error fetching cluster list: {str(e)}")
return {"error": str(e)}

async def list_runtime(self, page_size, page_token):
async def get_cluster_detail(self, cluster):
try:
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME)
api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}"
async with self.client_session.get(
api_endpoint, headers=self.create_headers()
) as response:
if response.status == 200:
resp = await response.json()
return resp
else:
return {
"error": f"Failed to fetch runtimes: {response.status} {await response.text()}"
}
# Create a client
client = dataproc.ClusterControllerAsyncClient(
client_options={"api_endpoint": self.api_endpoint},
credentials=oauth2.Credentials(self._access_token),
)

# Initialize request argument(s)
request = dataproc.GetClusterRequest(
project_id=self.project_id,
region=self.region_id,
cluster_name=cluster,
)

# Make the request
response = await client.get_cluster(request=request)

# Handle the response
return proto.Message.to_dict(
response,
use_integers_for_enums=False,
preserving_proto_field_name=False,
)
except Exception as e:
self.log.exception(f"Error fetching runtime list: {str(e)}")
self.log.exception(f"Error fetching cluster detail: {str(e)}")
return {"error": str(e)}

async def stop_cluster(self, cluster):
try:
# Create a client
client = dataproc.ClusterControllerAsyncClient(
client_options={"api_endpoint": self.api_endpoint},
credentials=oauth2.Credentials(self._access_token),
)

# Initialize request argument(s)
request = dataproc.StopClusterRequest(
project_id=self.project_id,
region=self.region_id,
cluster_name=cluster,
)

operation = await client.stop_cluster(request=request)

response = await operation.result()
# Handle the response
return proto.Message.to_dict(
response,
use_integers_for_enums=False,
preserving_proto_field_name=False,
)
except Exception as e:
self.log.exception(f"Error stopping a cluster: {str(e)}")
return {"error": str(e)}

async def start_cluster(self, cluster):
try:
# Create a client
client = dataproc.ClusterControllerAsyncClient(
client_options={"api_endpoint": self.api_endpoint},
credentials=oauth2.Credentials(self._access_token),
)

# Initialize request argument(s)
request = dataproc.StartClusterRequest(
project_id=self.project_id,
region=self.region_id,
cluster_name=cluster,
)

operation = await client.start_cluster(request=request)

response = await operation.result()
# Handle the response
return proto.Message.to_dict(
response,
use_integers_for_enums=False,
preserving_proto_field_name=False,
)
except Exception as e:
self.log.exception(f"Error starting a cluster: {str(e)}")
return {"error": str(e)}

async def delete_cluster(self, cluster):
try:
# Create a client
client = dataproc.ClusterControllerAsyncClient(
client_options={"api_endpoint": self.api_endpoint},
credentials=oauth2.Credentials(self._access_token),
)

# Initialize request argument(s)
request = dataproc.DeleteClusterRequest(
project_id=self.project_id,
region=self.region_id,
cluster_name=cluster,
)

operation = await client.delete_cluster(request=request)

response = await operation.result()
# Handle the response
if isinstance(response, Empty):
return "Deleted successfully"
else:
return proto.Message.to_dict(
response,
use_integers_for_enums=False,
preserving_proto_field_name=False,
)
except Exception as e:
self.log.exception(f"Error deleting a cluster: {str(e)}")
return {"error": str(e)}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
"pydantic~=1.10.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we pinning the minor versions in these packages?

I.E. why "~=.." instead of ">=.."?

"bigframes~=0.22.0",
"aiohttp~=3.9.5",
"google-cloud-dataproc~=5.10.2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to support the latest version, which is "5.11.0".

"google-cloud-storage~=2.18.2"
]
dynamic = ["version", "description", "authors", "urls", "keywords"]
Expand Down
Loading
Loading