diff --git a/test/e2e/nomad/e2e_test.go b/test/e2e/nomad/e2e_test.go index 140241b5..9f0abeab 100644 --- a/test/e2e/nomad/e2e_test.go +++ b/test/e2e/nomad/e2e_test.go @@ -15,9 +15,8 @@ import ( ) const ( - initialCapacity = 10737418240 // 10GiB - resizedCapacity = 11811160064 // 11GiB - resizedCapacityGB = 11 + ResizedCapacity = 11811160064 // 11GiB + ResizedCapacityGB = 11 ) var cluster *Cluster @@ -41,44 +40,25 @@ func TestMain(m *testing.M) { } func TestGetPluginInfo(t *testing.T) { - plugin, _, err := cluster.nomadClient.CSIPlugins().Info(driver.PluginName, &nomad.QueryOptions{}) - if err != nil { - t.Error(err) - } - - assert.NotNil(t, plugin, "Expected plugin from Nomad to be not nil") - - assert.Equalf( - t, - plugin.Version, - driver.PluginVersion, - "Expected plugin version %s, but got %s", - driver.PluginVersion, - plugin.Version, - ) + plugin, _, err := cluster.nomadClient.CSIPlugins().Info(driver.PluginName, &nomad.QueryOptions{}) + if err != nil { + t.Error(err) + } + + assert.NotNil(t, plugin, "Expected plugin from Nomad to be not nil") + + assert.Equalf( + t, + plugin.Version, + driver.PluginVersion, + "Expected plugin version %s, but got %s", + driver.PluginVersion, + plugin.Version, + ) } func TestVolumeLifecycle(t *testing.T) { - volReq := &nomad.CSIVolume{ - ID: "db-vol", - Name: "db-vol", - Namespace: "default", - PluginID: "csi.hetzner.cloud", - RequestedCapacityMin: initialCapacity, - RequestedCapabilities: []*nomad.CSIVolumeCapability{ - { - AccessMode: "single-node-writer", - AttachmentMode: "file-system", - }, - }, - MountOptions: &nomad.CSIMountOptions{ - FSType: "ext4", - MountFlags: []string{ - "discard", - "defaults", - }, - }, - } + volReq := createVolumeSpec("db-vol") var hcloudVolID int64 t.Run("volume creation", func(t *testing.T) { @@ -101,30 +81,30 @@ func TestVolumeLifecycle(t *testing.T) { assert.NotNilf(t, hcloudVolume, "could not find volume with ID %d on hcloud", hcloudVolID) }) - t.Run("volume resize", func(t *testing.T) { - volReq.RequestedCapacityMin = resizedCapacity + t.Run("volume resize", func(t *testing.T) { + volReq.RequestedCapacityMin = ResizedCapacity - _, _, err := cluster.nomadClient.CSIVolumes().Create(volReq, &nomad.WriteOptions{}) - if err != nil { - t.Error(err) - } + _, _, err := cluster.nomadClient.CSIVolumes().Create(volReq, &nomad.WriteOptions{}) + if err != nil { + t.Error(err) + } - hcloudVolume, _, err := cluster.hcloudClient.Volume.GetByID(context.Background(), hcloudVolID) - if err != nil { - t.Error(err) - } + hcloudVolume, _, err := cluster.hcloudClient.Volume.GetByID(context.Background(), hcloudVolID) + if err != nil { + t.Error(err) + } if assert.NotNilf(t, hcloudVolume, "could not find volume with ID %d on hcloud", hcloudVolID) { - assert.Equalf( - t, - hcloudVolume.Size, - resizedCapacityGB, - "Expected vol size %d, but got %d", - resizedCapacityGB, - hcloudVolume.Size, - ) - } - }) + assert.Equalf( + t, + hcloudVolume.Size, + ResizedCapacityGB, + "Expected vol size %d, but got %d", + ResizedCapacityGB, + hcloudVolume.Size, + ) + } + }) t.Run("volume deletion", func(t *testing.T) { err := cluster.DeleteVolume(volReq.ID, &nomad.WriteOptions{}) @@ -139,6 +119,115 @@ func TestVolumeLifecycle(t *testing.T) { assert.Nil(t, hcloudVolume, "hcloud volume was deleted in nomad, but still exists") }) - } +func TestVolumeWrite(t *testing.T) { + volID := "test-vol" + jobID := "test-writer" + volReq := createVolumeSpec(volID) + job := createBusyboxWithVolumeJobSpec(jobID, volID, "/test") + + t.Run("create volume", func(t *testing.T) { + vol, _, err := cluster.CreateVolume(volReq, &nomad.WriteOptions{}) + if err != nil { + t.Error(err) + } + assert.Len(t, vol, 1) + }) + + // Used to ensure that the job for verifying the data is scheduled on another node + var previousNodeID string + t.Run("write to volume", func(t *testing.T) { + _, _, err := cluster.nomadClient.Jobs().Register(job, &nomad.WriteOptions{}) + if err != nil { + t.Error(err) + } + defer func() { + _, _, err = cluster.nomadClient.Jobs().Deregister(*job.ID, true, &nomad.WriteOptions{}) + if err != nil { + t.Error(err) + } + }() + + allocStub, err := cluster.WaitForRunningJob(*job.ID) + if err != nil { + t.Error(err) + return + } + + previousNodeID = allocStub.NodeID + + alloc, _, err := cluster.nomadClient.Allocations().Info(allocStub.ID, &nomad.QueryOptions{}) + if err != nil { + t.Error(err) + return + } + + exitCode, err := cluster.ExecInAlloc(alloc, jobID, []string{ + "dd", + "if=/dev/random", + "of=/test/data", + "bs=1M", + "count=1", + }) + if err != nil { + t.Error(err) + } + assert.Equalf(t, 0, exitCode, "could not write test data - exit code: %d", exitCode) + }) + + t.Run("verify volume data", func(t *testing.T) { + // try to schedule job on another node + constraint := &nomad.Affinity{ + LTarget: "${node.unique.id}", + RTarget: previousNodeID, + Operand: "!=", + } + job.Affinities = append(job.Affinities, constraint) + + _, _, err := cluster.nomadClient.Jobs().Register(job, &nomad.WriteOptions{}) + if err != nil { + t.Error(err) + } + defer func() { + _, _, err = cluster.nomadClient.Jobs().Deregister(*job.ID, true, &nomad.WriteOptions{}) + if err != nil { + t.Error(err) + } + }() + + allocStub, err := cluster.WaitForRunningJob(*job.ID) + if err != nil { + t.Error(err) + return + } + + alloc, _, err := cluster.nomadClient.Allocations().Info(allocStub.ID, &nomad.QueryOptions{}) + if err != nil { + t.Error(err) + return + } + + // verify that file exists and has a size greater than zero + exitCode, err := cluster.ExecInAlloc(alloc, jobID, []string{ + "test", + "-s", + "/test/data", + }) + if err != nil { + t.Error(err) + } + assert.Equalf(t, 0, exitCode, "could not verify test data - exit code: %d", exitCode) + }) + + t.Run("delete volume", func(t *testing.T) { + // with retries, as volume can still be in use for a couple of seconds after job got deleted, + // which results in a internal server error + for i := range 10 { + if err := cluster.DeleteVolume(volReq.ID, &nomad.WriteOptions{}); err == nil { + break + } + backoffSleep(i) + } + }) +} diff --git a/test/e2e/nomad/utils.go b/test/e2e/nomad/utils.go index 90bbe117..864c04ca 100644 --- a/test/e2e/nomad/utils.go +++ b/test/e2e/nomad/utils.go @@ -3,6 +3,7 @@ package e2e import ( "context" "fmt" + "math" "os" "sync" "time" @@ -12,6 +13,8 @@ import ( "github.com/hetznercloud/hcloud-go/v2/hcloud" ) +const InitialVolumeCapacity = 10737418240 // 10GiB + type Cluster struct { hcloudClient *hcloud.Client nomadClient *nomad.Client @@ -82,6 +85,11 @@ func (cluster *Cluster) Cleanup() []error { vol, _, err := cluster.hcloudClient.Volume.GetByName(context.Background(), volName) if err != nil { cleanupErrors = append(cleanupErrors, err) + continue + } + if vol == nil { + cleanupErrors = append(cleanupErrors, fmt.Errorf("volume %s not found on hcloud", volName)) + continue } _, err = cluster.hcloudClient.Volume.Delete(context.Background(), vol) if err != nil { @@ -119,3 +127,105 @@ func (cluster *Cluster) DeleteVolume(externalVolID string, w *nomad.WriteOptions return nil } + +func (cluster *Cluster) ExecInAlloc(alloc *nomad.Allocation, task string, command []string) (int, error) { + exitCode, err := cluster.nomadClient.Allocations().Exec( + context.Background(), + alloc, + task, + true, + command, + os.Stdin, + os.Stdout, + os.Stderr, + make(<-chan nomad.TerminalSize), + &nomad.QueryOptions{}, + ) + if err != nil { + return exitCode, err + } + return exitCode, nil +} + +func (cluster *Cluster) WaitForRunningJob(jobID string) (*nomad.AllocationListStub, error) { + for retry := range 10 { + allocs, _, err := cluster.nomadClient.Jobs().Allocations( + jobID, + false, + &nomad.QueryOptions{}, + ) + if err != nil { + return nil, err + } + + for _, alloc := range allocs { + if alloc.ClientStatus == "running" { + return alloc, nil + } + } + + backoffSleep(retry) + } + return nil, fmt.Errorf("no running allocation for job %s", jobID) +} + +func createVolumeSpec(id string) *nomad.CSIVolume { + return &nomad.CSIVolume{ + ID: id, + Name: id, + Namespace: "default", + PluginID: "csi.hetzner.cloud", + RequestedCapacityMin: InitialVolumeCapacity, + RequestedCapabilities: []*nomad.CSIVolumeCapability{ + { + AccessMode: "single-node-writer", + AttachmentMode: "file-system", + }, + }, + MountOptions: &nomad.CSIMountOptions{ + FSType: "ext4", + MountFlags: []string{ + "discard", + "defaults", + }, + }, + } + +} + +func createBusyboxWithVolumeJobSpec(id string, volumeID string, mountPath string) *nomad.Job { + job := nomad.NewServiceJob(id, id, "global", 50) + taskGroup := nomad.NewTaskGroup(id, 1) + + taskGroup.Volumes = map[string]*nomad.VolumeRequest{ + volumeID: { + Name: volumeID, + ReadOnly: false, + Type: "csi", + Source: volumeID, + AttachmentMode: "file-system", + AccessMode: "single-node-writer", + PerAlloc: false, + }, + } + + task := nomad.NewTask(id, "docker") + task = task.SetConfig("image", "busybox:stable") + task = task.SetConfig("command", "sleep") + task = task.SetConfig("args", []string{"3600"}) + + task.VolumeMounts = append(task.VolumeMounts, &nomad.VolumeMount{ + Volume: &volumeID, + Destination: &mountPath, + }) + + taskGroup = taskGroup.AddTask(task) + job = job.AddTaskGroup(taskGroup) + return job +} + +func backoffSleep(retry int) { + delay := math.Pow(2, float64(retry)) + delay = math.Min(delay, 16) + time.Sleep(time.Second * time.Duration(delay)) +}