Skip to content

Commit

Permalink
Integrate kind for local testing (#242)
Browse files Browse the repository at this point in the history
* Add kind for local testing

* Rename flag to use local cluster

* Allow to switch local cluster

* Add local testing support for job command

* Set context when creating kind cluster
  • Loading branch information
IrvingMg authored Nov 22, 2024
1 parent c00f3c0 commit 6e346b5
Show file tree
Hide file tree
Showing 9 changed files with 470 additions and 14 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,50 @@ You can specify what kind of resources(clusterqueue or localqueue) you want to s
python3 xpk.py info --cluster my-cluster --localqueue
```
# Local testing with Kind
To facilitate development and testing locally, we have integrated support for testing with `kind`. This enables you to simulate a Kubernetes environment on your local machine.
## Prerequisites
- Install kind on your local machine. Follow the official documentation here: [Kind Installation Guide.](https://kind.sigs.k8s.io/docs/user/quick-start#installation)
## Usage
xpk interfaces seamlessly with kind to manage Kubernetes clusters locally, facilitating the orchestration and management of workloads. Below are the commands for managing clusters:
### Cluster Create
* Cluster create:
```shell
python3 xpk.py kind create \
--cluster xpk-test
```
### Cluster Delete
* Cluster Delete:
```shell
python3 xpk.py kind delete \
--cluster xpk-test
```
### Cluster List
* Cluster List:
```shell
python3 xpk.py kind list
```
## Local Testing Basics
Local testing is available exclusively through the `batch` and `job` commands of xpk with the `--kind-cluster` flag. This allows you to simulate training jobs locally:
```shell
python xpk.py batch [other-options] --kind-cluster script
```
Please note that all other xpk subcommands are intended for use with cloud systems on Google Cloud Engine (GCE) and don't support local testing. This includes commands like cluster, info, inspector, etc.
# Other advanced usage
[Use a Jupyter notebook to interact with a Cloud TPU cluster](xpk-notebooks.md)
13 changes: 8 additions & 5 deletions src/xpk/commands/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
from ..core.job_template import create_job_template_instance
from ..core.app_profile import create_app_profile_instance
from ..core.app_profile import APP_PROFILE_TEMPLATE_DEFAULT_NAME
from ..core.commands import (
run_command_for_value,
)
from ..core.commands import run_command_for_value
from .kind import set_local_cluster_command


def batch(args: Namespace) -> None:
Expand All @@ -34,8 +33,12 @@ def batch(args: Namespace) -> None:
Returns:
None
"""
add_zone_and_project(args)
set_cluster_command_code = set_cluster_command(args)
if not args.kind_cluster:
add_zone_and_project(args)
set_cluster_command_code = set_cluster_command(args)
else:
set_cluster_command_code = set_local_cluster_command(args)

if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)

Expand Down
25 changes: 16 additions & 9 deletions src/xpk/commands/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
run_command_with_updates,
)
from .cluster import set_cluster_command
from .kind import set_local_cluster_command


def job_list(args) -> None:
Expand All @@ -32,15 +33,17 @@ def job_list(args) -> None:
Returns:
None
"""
add_zone_and_project(args)
set_cluster_command_code = set_cluster_command(args)
if not args.kind_cluster:
add_zone_and_project(args)
set_cluster_command_code = set_cluster_command(args)
msg = f'Listing jobs for project {args.project} and zone {args.zone}:'
else:
set_cluster_command_code = set_local_cluster_command(args)
msg = 'Listing jobs:'

if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)

xpk_print(
f'Listing jobs for project {args.project} and zone {args.zone}:',
flush=True,
)
xpk_print(msg, flush=True)

return_code = run_slurm_job_list_command(args)
xpk_exit(return_code)
Expand All @@ -67,8 +70,12 @@ def job_cancel(args) -> None:
None
"""
xpk_print(f'Starting job cancel for job: {args.name}', flush=True)
add_zone_and_project(args)
set_cluster_command_code = set_cluster_command(args)
if not args.kind_cluster:
add_zone_and_project(args)
set_cluster_command_code = set_cluster_command(args)
else:
set_cluster_command_code = set_local_cluster_command(args)

if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)

Expand Down
253 changes: 253 additions & 0 deletions src/xpk/commands/kind.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
"""
Copyright 2024 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from ..core.commands import (
run_command_for_value,
run_command_with_updates,
)
from ..core.core import (
set_jobset_on_cluster,
)
from ..core.kjob import (
verify_kjob_installed,
prepare_kjob,
apply_kjob_crds,
)
from ..core.kueue import (
install_kueue_on_cluster,
)
from ..utils.console import (xpk_exit, xpk_print)


def cluster_create(args) -> None:
"""Function around cluster creation.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and 1 otherwise.
"""
xpk_print(f'Starting cluster create for cluster {args.cluster}:', flush=True)

create_cluster_command_code = create_cluster_if_necessary(args)
if create_cluster_command_code != 0:
xpk_exit(create_cluster_command_code)

set_cluster_command_code = set_local_cluster_command(args)
if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)

xpk_print(
'Enabling the jobset API on our cluster, to be deprecated when Jobset is'
' globally available'
)
set_jobset_on_cluster_code = set_jobset_on_cluster(args)
if set_jobset_on_cluster_code != 0:
xpk_exit(set_jobset_on_cluster_code)

xpk_print('Enabling Kueue on the cluster')
install_kueue_on_cluster_code = install_kueue_on_cluster(args)
if install_kueue_on_cluster_code != 0:
xpk_exit(install_kueue_on_cluster_code)

xpk_print('Verifying kjob installation')
err_code = verify_kjob_installed(args)
if err_code > 0:
xpk_exit(err_code)

xpk_print('Applying kjob CDRs')
err_code = apply_kjob_crds(args)
if err_code > 0:
xpk_exit(err_code)

xpk_print('Preparing kjob')
err_code = prepare_kjob(args)
if err_code > 0:
xpk_exit(err_code)

xpk_print('Kind commands done! Resources are created.')
xpk_exit(0)


def cluster_delete(args) -> None:
"""Function around cluster delete.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and 1 otherwise.
"""
xpk_print(f'Starting cluster delete for cluster: {args.cluster}', flush=True)

run_kind_cluster_delete_command_code = run_kind_cluster_delete_command(args)
if run_kind_cluster_delete_command_code != 0:
xpk_exit(run_kind_cluster_delete_command_code)
xpk_print(f'Kind commands done! Cluster {args.cluster} deleted.')
xpk_exit(0)


def cluster_list(args) -> None:
"""Function around cluster list.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and 1 otherwise.
"""
if run_kind_clusters_list_command(args):
xpk_exit(1)
xpk_exit(0)


def create_cluster_if_necessary(args) -> int:
"""Creates cluster if not present in the project.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and 1 otherwise.
"""
all_clusters, return_code = get_all_local_clusters_programmatic(args)
if return_code > 0:
xpk_print('Listing all clusters failed!')
return 1
if args.cluster in all_clusters:
xpk_print('Skipping cluster creation since it already exists.')
return 0
else:
return run_kind_cluster_create_command(args)


def run_kind_cluster_delete_command(args) -> int:
"""Run the Delete Kind Cluster request.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and 1 otherwise.
"""
command = 'kind delete cluster'

if args.cluster:
command += f' --name={args.cluster}'

return_code = run_command_with_updates(command, 'Cluster Delete', args)
if return_code != 0:
xpk_print(f'Cluster delete request returned ERROR {return_code}')
return 1

return 0


def run_kind_clusters_list_command(args) -> int:
"""List Kind Clusters within the project and location.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and 1 otherwise.
"""
command = 'kind get clusters'
return_code = run_command_with_updates(command, 'Cluster List', args)
if return_code != 0:
xpk_print(f'Cluster list request returned ERROR {return_code}')
return 1

return 0


def run_kind_cluster_create_command(args) -> int:
"""Run the Create Kind Cluster request.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and 1 otherwise.
"""
command = 'kind create cluster'

if args.cluster:
command += f' --name={args.cluster}'

if args.k8s_version:
command += f' --image=kindest/node:v{args.k8s_version}'

return_code = run_command_with_updates(command, 'Kind Cluster Create', args)
if return_code != 0:
xpk_print(f'GKE Cluster Create request returned ERROR {return_code}')
return 1
return 0


def get_all_local_clusters_programmatic(args) -> tuple[list[str], int]:
"""Gets all the local clusters.
Args:
args: user provided arguments for running the command.
Returns:
List of cluster names and 0 if successful and 1 otherwise.
"""
command = 'kind get clusters'
return_code, raw_cluster_output = run_command_for_value(
command, 'Find if Cluster Exists', args
)
if return_code != 0:
xpk_print(f'Find if Cluster Exists returned ERROR {return_code}')
return [], return_code

return raw_cluster_output.splitlines(), 0


def set_local_cluster_command(args) -> int:
"""Run local cluster configuration command to set the kubectl config.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and 1 otherwise.
"""
if not args.cluster:
command = 'kubectl config current-context'
return_code, current_context = run_command_for_value(
command, 'get current-context', args
)
xpk_print(
'No local cluster name specified. Using current-context'
f' `{current_context.strip()}`'
)
return return_code

command = (
f'kubectl config use-context kind-{args.cluster} --namespace=default'
)
task = f'switch to cluster {args.cluster}'
return_code = run_command_with_updates(
command,
task,
args,
)
if return_code != 0:
xpk_print(f'{task} returned ERROR {return_code}')
return return_code
Loading

0 comments on commit 6e346b5

Please sign in to comment.