Skip to content

Commit

Permalink
test(nomad): write test data and verify on another node
Browse files Browse the repository at this point in the history
In this test a job is created which writes a test file of 1M to a Hetzner Cloud volume. Afterwards, this job is deleted and another job on another node is started, that verifies that the file is present and has a size greater than zero.
  • Loading branch information
lukasmetzner committed Dec 13, 2024
1 parent f0b888d commit c181740
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 59 deletions.
207 changes: 148 additions & 59 deletions test/e2e/nomad/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ import (
)

const (
initialCapacity = 10737418240 // 10GiB
resizedCapacity = 11811160064 // 11GiB
resizedCapacityGB = 11
ResizedCapacity = 11811160064 // 11GiB
ResizedCapacityGB = 11
)

var cluster *Cluster
Expand All @@ -41,44 +40,25 @@ func TestMain(m *testing.M) {
}

func TestGetPluginInfo(t *testing.T) {
plugin, _, err := cluster.nomadClient.CSIPlugins().Info(driver.PluginName, &nomad.QueryOptions{})
if err != nil {
t.Error(err)
}

assert.NotNil(t, plugin, "Expected plugin from Nomad to be not nil")

assert.Equalf(
t,
plugin.Version,
driver.PluginVersion,
"Expected plugin version %s, but got %s",
driver.PluginVersion,
plugin.Version,
)
plugin, _, err := cluster.nomadClient.CSIPlugins().Info(driver.PluginName, &nomad.QueryOptions{})
if err != nil {
t.Error(err)
}

assert.NotNil(t, plugin, "Expected plugin from Nomad to be not nil")

assert.Equalf(
t,
plugin.Version,
driver.PluginVersion,
"Expected plugin version %s, but got %s",
driver.PluginVersion,
plugin.Version,
)
}

func TestVolumeLifecycle(t *testing.T) {
volReq := &nomad.CSIVolume{
ID: "db-vol",
Name: "db-vol",
Namespace: "default",
PluginID: "csi.hetzner.cloud",
RequestedCapacityMin: initialCapacity,
RequestedCapabilities: []*nomad.CSIVolumeCapability{
{
AccessMode: "single-node-writer",
AttachmentMode: "file-system",
},
},
MountOptions: &nomad.CSIMountOptions{
FSType: "ext4",
MountFlags: []string{
"discard",
"defaults",
},
},
}
volReq := createVolumeSpec("db-vol")

var hcloudVolID int64
t.Run("volume creation", func(t *testing.T) {
Expand All @@ -101,30 +81,30 @@ func TestVolumeLifecycle(t *testing.T) {
assert.NotNilf(t, hcloudVolume, "could not find volume with ID %d on hcloud", hcloudVolID)
})

t.Run("volume resize", func(t *testing.T) {
volReq.RequestedCapacityMin = resizedCapacity
t.Run("volume resize", func(t *testing.T) {
volReq.RequestedCapacityMin = ResizedCapacity

_, _, err := cluster.nomadClient.CSIVolumes().Create(volReq, &nomad.WriteOptions{})
if err != nil {
t.Error(err)
}
_, _, err := cluster.nomadClient.CSIVolumes().Create(volReq, &nomad.WriteOptions{})
if err != nil {
t.Error(err)
}

hcloudVolume, _, err := cluster.hcloudClient.Volume.GetByID(context.Background(), hcloudVolID)
if err != nil {
t.Error(err)
}
hcloudVolume, _, err := cluster.hcloudClient.Volume.GetByID(context.Background(), hcloudVolID)
if err != nil {
t.Error(err)
}

if assert.NotNilf(t, hcloudVolume, "could not find volume with ID %d on hcloud", hcloudVolID) {
assert.Equalf(
t,
hcloudVolume.Size,
resizedCapacityGB,
"Expected vol size %d, but got %d",
resizedCapacityGB,
hcloudVolume.Size,
)
}
})
assert.Equalf(
t,
hcloudVolume.Size,
ResizedCapacityGB,
"Expected vol size %d, but got %d",
ResizedCapacityGB,
hcloudVolume.Size,
)
}
})

t.Run("volume deletion", func(t *testing.T) {
err := cluster.DeleteVolume(volReq.ID, &nomad.WriteOptions{})
Expand All @@ -139,6 +119,115 @@ func TestVolumeLifecycle(t *testing.T) {

assert.Nil(t, hcloudVolume, "hcloud volume was deleted in nomad, but still exists")
})

}

func TestVolumeWrite(t *testing.T) {
volID := "test-vol"
jobID := "test-writer"
volReq := createVolumeSpec(volID)
job := createBusyboxWithVolumeJobSpec(jobID, volID, "/test")

t.Run("create volume", func(t *testing.T) {
vol, _, err := cluster.CreateVolume(volReq, &nomad.WriteOptions{})
if err != nil {
t.Error(err)
}
assert.Len(t, vol, 1)
})

// Used to ensure that the job for verifying the data is scheduled on another node
var previousNodeID string
t.Run("write to volume", func(t *testing.T) {
_, _, err := cluster.nomadClient.Jobs().Register(job, &nomad.WriteOptions{})
if err != nil {
t.Error(err)
}
defer func() {
_, _, err = cluster.nomadClient.Jobs().Deregister(*job.ID, true, &nomad.WriteOptions{})
if err != nil {
t.Error(err)
}
}()

allocStub, err := cluster.WaitForRunningJob(*job.ID)
if err != nil {
t.Error(err)
return
}

previousNodeID = allocStub.NodeID

alloc, _, err := cluster.nomadClient.Allocations().Info(allocStub.ID, &nomad.QueryOptions{})
if err != nil {
t.Error(err)
return
}

exitCode, err := cluster.ExecInAlloc(alloc, jobID, []string{
"dd",
"if=/dev/random",
"of=/test/data",
"bs=1M",
"count=1",
})
if err != nil {
t.Error(err)
}
assert.Equalf(t, 0, exitCode, "could not write test data - exit code: %d", exitCode)
})

t.Run("verify volume data", func(t *testing.T) {
// try to schedule job on another node
constraint := &nomad.Affinity{
LTarget: "${node.unique.id}",
RTarget: previousNodeID,
Operand: "!=",
}
job.Affinities = append(job.Affinities, constraint)

_, _, err := cluster.nomadClient.Jobs().Register(job, &nomad.WriteOptions{})
if err != nil {
t.Error(err)
}
defer func() {
_, _, err = cluster.nomadClient.Jobs().Deregister(*job.ID, true, &nomad.WriteOptions{})
if err != nil {
t.Error(err)
}
}()

allocStub, err := cluster.WaitForRunningJob(*job.ID)
if err != nil {
t.Error(err)
return
}

alloc, _, err := cluster.nomadClient.Allocations().Info(allocStub.ID, &nomad.QueryOptions{})
if err != nil {
t.Error(err)
return
}

// verify that file exists and has a size greater than zero
exitCode, err := cluster.ExecInAlloc(alloc, jobID, []string{
"test",
"-s",
"/test/data",
})
if err != nil {
t.Error(err)
}
assert.Equalf(t, 0, exitCode, "could not verify test data - exit code: %d", exitCode)
})

t.Run("delete volume", func(t *testing.T) {
// with retries, as volume can still be in use for a couple of seconds after job got deleted,
// which results in a internal server error
for i := range 10 {
if err := cluster.DeleteVolume(volReq.ID, &nomad.WriteOptions{}); err == nil {
break
}
backoffSleep(i)
}
})
}
110 changes: 110 additions & 0 deletions test/e2e/nomad/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"context"
"fmt"
"math"
"os"
"sync"
"time"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/hetznercloud/hcloud-go/v2/hcloud"
)

const InitialVolumeCapacity = 10737418240 // 10GiB

type Cluster struct {
hcloudClient *hcloud.Client
nomadClient *nomad.Client
Expand Down Expand Up @@ -82,6 +85,11 @@ func (cluster *Cluster) Cleanup() []error {
vol, _, err := cluster.hcloudClient.Volume.GetByName(context.Background(), volName)
if err != nil {
cleanupErrors = append(cleanupErrors, err)
continue
}
if vol == nil {
cleanupErrors = append(cleanupErrors, fmt.Errorf("volume %s not found on hcloud", volName))
continue
}
_, err = cluster.hcloudClient.Volume.Delete(context.Background(), vol)
if err != nil {
Expand Down Expand Up @@ -119,3 +127,105 @@ func (cluster *Cluster) DeleteVolume(externalVolID string, w *nomad.WriteOptions

return nil
}

func (cluster *Cluster) ExecInAlloc(alloc *nomad.Allocation, task string, command []string) (int, error) {
exitCode, err := cluster.nomadClient.Allocations().Exec(
context.Background(),
alloc,
task,
true,
command,
os.Stdin,
os.Stdout,
os.Stderr,
make(<-chan nomad.TerminalSize),
&nomad.QueryOptions{},
)
if err != nil {
return exitCode, err
}
return exitCode, nil
}

func (cluster *Cluster) WaitForRunningJob(jobID string) (*nomad.AllocationListStub, error) {
for retry := range 10 {
allocs, _, err := cluster.nomadClient.Jobs().Allocations(
jobID,
false,
&nomad.QueryOptions{},
)
if err != nil {
return nil, err
}

for _, alloc := range allocs {
if alloc.ClientStatus == "running" {
return alloc, nil
}
}

backoffSleep(retry)
}
return nil, fmt.Errorf("no running allocation for job %s", jobID)
}

func createVolumeSpec(id string) *nomad.CSIVolume {

Check failure on line 172 in test/e2e/nomad/utils.go

View workflow job for this annotation

GitHub Actions / lint

func `createVolumeSpec` is unused (unused)
return &nomad.CSIVolume{
ID: id,
Name: id,
Namespace: "default",
PluginID: "csi.hetzner.cloud",
RequestedCapacityMin: InitialVolumeCapacity,
RequestedCapabilities: []*nomad.CSIVolumeCapability{
{
AccessMode: "single-node-writer",
AttachmentMode: "file-system",
},
},
MountOptions: &nomad.CSIMountOptions{
FSType: "ext4",
MountFlags: []string{
"discard",
"defaults",
},
},
}

}

Check failure on line 194 in test/e2e/nomad/utils.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

func createBusyboxWithVolumeJobSpec(id string, volumeID string, mountPath string) *nomad.Job {

Check failure on line 196 in test/e2e/nomad/utils.go

View workflow job for this annotation

GitHub Actions / lint

func `createBusyboxWithVolumeJobSpec` is unused (unused)
job := nomad.NewServiceJob(id, id, "global", 50)
taskGroup := nomad.NewTaskGroup(id, 1)

taskGroup.Volumes = map[string]*nomad.VolumeRequest{
volumeID: {
Name: volumeID,
ReadOnly: false,
Type: "csi",
Source: volumeID,
AttachmentMode: "file-system",
AccessMode: "single-node-writer",
PerAlloc: false,
},
}

task := nomad.NewTask(id, "docker")
task = task.SetConfig("image", "busybox:stable")
task = task.SetConfig("command", "sleep")
task = task.SetConfig("args", []string{"3600"})

task.VolumeMounts = append(task.VolumeMounts, &nomad.VolumeMount{
Volume: &volumeID,
Destination: &mountPath,
})

taskGroup = taskGroup.AddTask(task)
job = job.AddTaskGroup(taskGroup)
return job
}

func backoffSleep(retry int) {
delay := math.Pow(2, float64(retry))
delay = math.Min(delay, 16)
time.Sleep(time.Second * time.Duration(delay))
}

0 comments on commit c181740

Please sign in to comment.