Skip to content

Commit

Permalink
Adjust KFTO PyTorchJob upgrade tests to be idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
sutaakar committed Dec 4, 2024
1 parent f3697a9 commit f0df855
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 54 deletions.
8 changes: 8 additions & 0 deletions tests/kfto/core/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func PytorchJob(t Test, namespace, name string) func(g Gomega) *kftov1.PyTorchJo
}
}

func PytorchJobs(t Test, namespace string) func(g Gomega) []kftov1.PyTorchJob {
return func(g Gomega) []kftov1.PyTorchJob {
jobs, err := t.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).List(t.Ctx(), metav1.ListOptions{})
g.Expect(err).NotTo(HaveOccurred())
return jobs.Items
}
}

func PytorchJobConditionRunning(job *kftov1.PyTorchJob) corev1.ConditionStatus {
return PytorchJobCondition(job, kftov1.JobRunning)
}
Expand Down
105 changes: 59 additions & 46 deletions tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -42,14 +43,7 @@ var (
func TestSetupPytorchjob(t *testing.T) {
test := With(t)

// Create a namespace
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespaceName,
},
}
_, err := test.Client().Core().CoreV1().Namespaces().Create(test.Ctx(), namespace, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
createOrGetUpgradeTestNamespace(test, namespaceName)

// Create a ConfigMap with training dataset and configuration
configData := map[string][]byte{
Expand All @@ -59,47 +53,31 @@ func TestSetupPytorchjob(t *testing.T) {
config := CreateConfigMap(test, namespaceName, configData)

// Create Kueue resources
resourceFlavor := &kueuev1beta1.ResourceFlavor{
ObjectMeta: metav1.ObjectMeta{
Name: resourceFlavorName,
},
}
resourceFlavor, err = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Create(test.Ctx(), resourceFlavor, metav1.CreateOptions{})
resourceFlavor := kueueacv1beta1.ResourceFlavor(resourceFlavorName)
_, err := test.Client().Kueue().KueueV1beta1().ResourceFlavors().Apply(test.Ctx(), resourceFlavor, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true})
test.Expect(err).NotTo(HaveOccurred())

clusterQueue := &kueuev1beta1.ClusterQueue{
ObjectMeta: metav1.ObjectMeta{
Name: clusterQueueName,
},
Spec: kueuev1beta1.ClusterQueueSpec{
NamespaceSelector: &metav1.LabelSelector{},
ResourceGroups: []kueuev1beta1.ResourceGroup{
{
CoveredResources: []corev1.ResourceName{corev1.ResourceName("cpu"), corev1.ResourceName("memory")},
Flavors: []kueuev1beta1.FlavorQuotas{
{
Name: kueuev1beta1.ResourceFlavorReference(resourceFlavor.Name),
Resources: []kueuev1beta1.ResourceQuota{
{
Name: corev1.ResourceCPU,
NominalQuota: resource.MustParse("8"),
},
{
Name: corev1.ResourceMemory,
NominalQuota: resource.MustParse("12Gi"),
},
},
},
},
},
},
StopPolicy: Ptr(kueuev1beta1.Hold),
},
}
clusterQueue, err = test.Client().Kueue().KueueV1beta1().ClusterQueues().Create(test.Ctx(), clusterQueue, metav1.CreateOptions{})
clusterQueue := kueueacv1beta1.ClusterQueue(clusterQueueName).WithSpec(
kueueacv1beta1.ClusterQueueSpec().
WithNamespaceSelector(metav1.LabelSelector{}).
WithResourceGroups(
kueueacv1beta1.ResourceGroup().WithCoveredResources(
corev1.ResourceName("cpu"), corev1.ResourceName("memory"),
).WithFlavors(
kueueacv1beta1.FlavorQuotas().
WithName(kueuev1beta1.ResourceFlavorReference(resourceFlavorName)).
WithResources(
kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceCPU).WithNominalQuota(resource.MustParse("8")),
kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceMemory).WithNominalQuota(resource.MustParse("12Gi")),
),
),
).
WithStopPolicy(kueuev1beta1.Hold),
)
_, err = test.Client().Kueue().KueueV1beta1().ClusterQueues().Apply(test.Ctx(), clusterQueue, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true})
test.Expect(err).NotTo(HaveOccurred())

localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueue.Name, AsDefaultQueue)
localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueueName, AsDefaultQueue)

// Create training PyTorch job
tuningJob := createPyTorchJob(test, namespaceName, localQueue.Name, *config)
Expand Down Expand Up @@ -133,6 +111,17 @@ func TestRunPytorchjob(t *testing.T) {
}

func createPyTorchJob(test Test, namespace, localQueueName string, config corev1.ConfigMap) *kftov1.PyTorchJob {
// Does PyTorchJob already exist?
_, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Get(test.Ctx(), pyTorchJobName, metav1.GetOptions{})
if err == nil {
// If yes then delete it and wait until there are no PyTorchJobs in the namespace
err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), pyTorchJobName, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.Eventually(kftocore.PytorchJobs(test, namespace), TestTimeoutShort).Should(BeEmpty())
} else if !errors.IsNotFound(err) {
test.T().Fatalf("Error retrieving PyTorchJob with name `%s`: %v", pyTorchJobName, err)
}

tuningJob := &kftov1.PyTorchJob{
ObjectMeta: metav1.ObjectMeta{
Name: pyTorchJobName,
Expand Down Expand Up @@ -186,6 +175,10 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
Name: "tmp-volume",
MountPath: "/tmp",
},
{
Name: "output-volume",
MountPath: "/mnt/output",
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
Expand Down Expand Up @@ -226,6 +219,12 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
Name: "output-volume",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
},
},
Expand All @@ -234,9 +233,23 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
},
}

tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
tuningJob, err = test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)

return tuningJob
}

func createOrGetUpgradeTestNamespace(test Test, name string, options ...Option[*corev1.Namespace]) (namespace *corev1.Namespace) {
// Verify that the namespace really exists and return it, create it if doesn't exist yet
namespace, err := test.Client().Core().CoreV1().Namespaces().Get(test.Ctx(), name, metav1.GetOptions{})
if err == nil {
return
} else if errors.IsNotFound(err) {
test.T().Logf("%s namespace doesn't exists. Creating ...", name)
return CreateTestNamespaceWithName(test, name, options...)
} else {
test.T().Fatalf("Error retrieving namespace with name `%s`: %v", name, err)
}
return
}
22 changes: 14 additions & 8 deletions tests/kfto/upgrade/kfto_sft_upgrade_sleep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
. "github.com/project-codeflare/codeflare-common/support"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
Expand All @@ -38,13 +39,7 @@ func TestSetupSleepPytorchjob(t *testing.T) {
test := With(t)

// Create a namespace
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: sleepNamespaceName,
},
}
_, err := test.Client().Core().CoreV1().Namespaces().Create(test.Ctx(), namespace, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
createOrGetUpgradeTestNamespace(test, sleepNamespaceName)

// Create training PyTorch job
createSleepPyTorchJob(test, sleepNamespaceName)
Expand Down Expand Up @@ -76,6 +71,17 @@ func TestVerifySleepPytorchjob(t *testing.T) {
}

func createSleepPyTorchJob(test Test, namespace string) *kftov1.PyTorchJob {
// Does PyTorchJob already exist?
_, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Get(test.Ctx(), sleepPyTorchJobName, metav1.GetOptions{})
if err == nil {
// If yes then delete it and wait until there are no PyTorchJobs in the namespace
err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), sleepPyTorchJobName, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.Eventually(kftocore.PytorchJobs(test, namespace), TestTimeoutShort).Should(BeEmpty())
} else if !errors.IsNotFound(err) {
test.T().Fatalf("Error retrieving PyTorchJob with name `%s`: %v", sleepPyTorchJobName, err)
}

tuningJob := &kftov1.PyTorchJob{
ObjectMeta: metav1.ObjectMeta{
Name: sleepPyTorchJobName,
Expand All @@ -102,7 +108,7 @@ func createSleepPyTorchJob(test Test, namespace string) *kftov1.PyTorchJob {
},
}

tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
tuningJob, err = test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)

Expand Down

0 comments on commit f0df855

Please sign in to comment.