From 424e35b9e32a90c11b4887a3de58d32eef92b0c3 Mon Sep 17 00:00:00 2001 From: Luca Berneking Date: Tue, 17 Sep 2024 17:15:16 +0200 Subject: [PATCH 1/3] Fix race of csi controller calls When a CreateVolume requests takes longer then 10 seconds the request runs into a client-side timeout and gets retied by kubelet. While processing the new request, the old request is still being canceled on the server-side and the created ZFSVolume CR gets deleted. 1|2 CreateVolume| ZFSVolume CR created| waiting for volume creation| client timeout + context cancel| |CreateVolume |ZFSVolume CR already exists ZFSVolume CR deleted| Return ERR|Return OK without creating a ZFSVolume ``` I0916 15:01:49.227721 1 grpc.go:72] GRPC call: /csi.v1.Controller/CreateVolume requests {"accessibility_requirements":{"preferred":[{"segments":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/os":"linux","kubernetes.io/arch":"amd64","kubernetes.io/hostname":"i04-1-b-node-i041b-0-0","kubernetes.io/os":"linux","m3.services/cluster":"i04-1-b","m3.services/worker-group":"i041b-0","node.systems.mittwald.cloud/placement-group":"i041b-0","openebs.io/nodeid":"i04-1-b-node-i041b-0-0","openebs.io/nodename":"i04-1-b-node-i041b-0-0"}}],"requisite":[{"segments":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/os":"linux","kubernetes.io/arch":"amd64","kubernetes.io/hostname":"i04-1-b-node-i041b-0-0","kubernetes.io/os":"linux","m3.services/cluster":"i04-1-b","m3.services/worker-group":"i041b-0","node.systems.mittwald.cloud/placement-group":"i041b-0","openebs.io/nodeid":"i04-1-b-node-i041b-0-0","openebs.io/nodename":"i04-1-b-node-i041b-0-0"}}]},"capacity_range":{"required_bytes":5368709120},"name":"pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457","parameters":{"compression":"off","csi.storage.k8s.io/pv/name":"pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457","csi.storage.k8s.io/pvc/name":"test-pvc0","csi.storage.k8s.io/pvc/namespace":"default","dedup":"off","fstype":"zfs","poolname":"kluster-pool/i04-1-b-128k","recordsize":"128k","shared":"yes","thinprovision":"yes"},"volume_capabilities":[{"AccessType":{"Mount":{"fs_type":"zfs"}},"access_mode":{"mode":1}}]} I0916 15:01:58.853180 1 controller.go:289] zfs: trying volume creation kluster-pool/i04-1-b-128k/pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457 on node [i04-1-b-node-i041b-0-0] I0916 15:01:59.104207 1 volume.go:139] zfs: waiting for volume kluster-pool/i04-1-b-128k/pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457 to be created on nodeid i04-1-b-node-i041b-0-0 I0916 15:02:00.105273 1 volume.go:170] zfs: volume kluster-pool/i04-1-b-128k/pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457 provisioning failed on nodeid i04-1-b-node-i041b-0-0 err: zfs: context deadline reached I0916 15:02:00.235113 1 grpc.go:72] GRPC call: /csi.v1.Controller/CreateVolume requests {"accessibility_requirements":{"preferred":[{"segments":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/os":"linux","kubernetes.io/arch":"amd64","kubernetes.io/hostname":"i04-1-b-node-i041b-0-0","kubernetes.io/os":"linux","m3.services/cluster":"i04-1-b","m3.services/worker-group":"i041b-0","node.systems.mittwald.cloud/placement-group":"i041b-0","openebs.io/nodeid":"i04-1-b-node-i041b-0-0","openebs.io/nodename":"i04-1-b-node-i041b-0-0"}}],"requisite":[{"segments":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/os":"linux","kubernetes.io/arch":"amd64","kubernetes.io/hostname":"i04-1-b-node-i041b-0-0","kubernetes.io/os":"linux","m3.services/cluster":"i04-1-b","m3.services/worker-group":"i041b-0","node.systems.mittwald.cloud/placement-group":"i041b-0","openebs.io/nodeid":"i04-1-b-node-i041b-0-0","openebs.io/nodename":"i04-1-b-node-i041b-0-0"}}]},"capacity_range":{"required_bytes":5368709120},"name":"pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457","parameters":{"compression":"off","csi.storage.k8s.io/pv/name":"pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457","csi.storage.k8s.io/pvc/name":"test-pvc0","csi.storage.k8s.io/pvc/namespace":"default","dedup":"off","fstype":"zfs","poolname":"kluster-pool/i04-1-b-128k","recordsize":"128k","shared":"yes","thinprovision":"yes"},"volume_capabilities":[{"AccessType":{"Mount":{"fs_type":"zfs"}},"access_mode":{"mode":1}}]} I0916 15:02:00.449153 1 volume.go:221] zfs: deleted the volume pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457 I0916 15:02:00.449299 1 controller.go:466] created the volume kluster-pool/i04-1-b-128k/pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457 on node i04-1-b-node-i041b-0-0 I0916 15:02:00.553778 1 grpc.go:81] GRPC response: {"volume":{"accessible_topology":[{"segments":{"openebs.io/nodeid":"i04-1-b-node-i041b-0-0"}}],"capacity_bytes":5368709120,"volume_context":{"openebs.io/cas-type":"localpv-zfs","openebs.io/poolname":"kluster-pool/i04-1-b-128k"},"volume_id":"pvc-eb11c446-f26c-4333-b3da-4ecb9c5b2457"}} ``` Signed-off-by: Luca Berneking --- pkg/driver/controller.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index b6c400ac..4cb599b7 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" "github.com/container-storage-interface/spec/lib/go/csi" @@ -63,6 +64,15 @@ type controller struct { k8sNodeInformer cache.SharedIndexInformer zfsNodeInformer cache.SharedIndexInformer + + volMutexes sync.Map +} + +func (cs *controller) LockVolume(volume string) func() { + value, _ := cs.volMutexes.LoadOrStore(volume, &sync.Mutex{}) + mtx := value.(*sync.Mutex) + mtx.Lock() + return func() { mtx.Unlock() } } // NewController returns a new instance @@ -450,6 +460,9 @@ func (cs *controller) CreateVolume( contentSource := req.GetVolumeContentSource() pvcName := helpers.GetInsensitiveParameter(¶meters, "csi.storage.k8s.io/pvc/name") + unlock := cs.LockVolume(volName) + defer unlock() + if contentSource != nil && contentSource.GetSnapshot() != nil { snapshotID := contentSource.GetSnapshot().GetSnapshotId() @@ -493,6 +506,8 @@ func (cs *controller) DeleteVolume( } volumeID := strings.ToLower(req.GetVolumeId()) + unlock := cs.LockVolume(volumeID) + defer unlock() // verify if the volume has already been deleted vol, err := zfs.GetVolume(volumeID) @@ -611,6 +626,8 @@ func (cs *controller) ControllerExpandVolume( "ControllerExpandVolume: no volumeID provided", ) } + unlock := cs.LockVolume(volumeID) + defer unlock() /* round off the new size */ updatedSize := getRoundedCapacity(req.GetCapacityRange().GetRequiredBytes()) @@ -707,6 +724,10 @@ func (cs *controller) CreateSnapshot( if err != nil { return nil, err } + unlockVol := cs.LockVolume(volumeID) + defer unlockVol() + unlockSnap := cs.LockVolume(snapName) + defer unlockSnap() snapTimeStamp := time.Now().Unix() var state string @@ -803,6 +824,10 @@ func (cs *controller) DeleteSnapshot( // should succeed when an invalid snapshot id is used return &csi.DeleteSnapshotResponse{}, nil } + unlockVol := cs.LockVolume(snapshotID[0]) + defer unlockVol() + unlockSnap := cs.LockVolume(snapshotID[1]) + defer unlockSnap() if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil { return nil, status.Errorf( codes.Internal, From bee0aa6d46ee8f4256ac15ee9fa214db0829a1c8 Mon Sep 17 00:00:00 2001 From: Luca Berneking Date: Wed, 25 Sep 2024 14:12:11 +0200 Subject: [PATCH 2/3] Add LockVolumeWithSnapshot function to prevent future deadlocks Signed-off-by: Luca Berneking --- pkg/driver/controller.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 4cb599b7..812abf73 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -75,6 +75,15 @@ func (cs *controller) LockVolume(volume string) func() { return func() { mtx.Unlock() } } +func (cs *controller) LockVolumeWithSnapshot(volume string, snapshot string) func() { + unlockVol := cs.LockVolume(volume) + unlockSnap := cs.LockVolume(snapshot) + return func() { + unlockVol() + unlockSnap() + } +} + // NewController returns a new instance // of CSI controller func NewController(d *CSIDriver) csi.ControllerServer { @@ -724,10 +733,8 @@ func (cs *controller) CreateSnapshot( if err != nil { return nil, err } - unlockVol := cs.LockVolume(volumeID) - defer unlockVol() - unlockSnap := cs.LockVolume(snapName) - defer unlockSnap() + unlock := cs.LockVolumeWithSnapshot(volumeID, snapName) + defer unlock() snapTimeStamp := time.Now().Unix() var state string @@ -824,10 +831,8 @@ func (cs *controller) DeleteSnapshot( // should succeed when an invalid snapshot id is used return &csi.DeleteSnapshotResponse{}, nil } - unlockVol := cs.LockVolume(snapshotID[0]) - defer unlockVol() - unlockSnap := cs.LockVolume(snapshotID[1]) - defer unlockSnap() + unlock := cs.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1]) + defer unlock() if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil { return nil, status.Errorf( codes.Internal, From 27ec79f94d01a4fbe062d643c4fd1e120bebe777 Mon Sep 17 00:00:00 2001 From: Luca Berneking Date: Fri, 4 Oct 2024 16:00:57 +0200 Subject: [PATCH 3/3] Use single mutex for volume locks to prevent memory leak Signed-off-by: Luca Berneking --- pkg/driver/controller.go | 30 ++++++------------------ pkg/driver/volume_lock.go | 49 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 23 deletions(-) create mode 100644 pkg/driver/volume_lock.go diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 812abf73..e1ef8fdf 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -4,7 +4,6 @@ import ( "fmt" "strconv" "strings" - "sync" "time" "github.com/container-storage-interface/spec/lib/go/csi" @@ -65,23 +64,7 @@ type controller struct { k8sNodeInformer cache.SharedIndexInformer zfsNodeInformer cache.SharedIndexInformer - volMutexes sync.Map -} - -func (cs *controller) LockVolume(volume string) func() { - value, _ := cs.volMutexes.LoadOrStore(volume, &sync.Mutex{}) - mtx := value.(*sync.Mutex) - mtx.Lock() - return func() { mtx.Unlock() } -} - -func (cs *controller) LockVolumeWithSnapshot(volume string, snapshot string) func() { - unlockVol := cs.LockVolume(volume) - unlockSnap := cs.LockVolume(snapshot) - return func() { - unlockVol() - unlockSnap() - } + volumeLock *volumeLock } // NewController returns a new instance @@ -90,6 +73,7 @@ func NewController(d *CSIDriver) csi.ControllerServer { ctrl := &controller{ driver: d, capabilities: newControllerCapabilities(), + volumeLock: newVolumeLock(), } if err := ctrl.init(); err != nil { klog.Fatalf("init controller: %v", err) @@ -469,7 +453,7 @@ func (cs *controller) CreateVolume( contentSource := req.GetVolumeContentSource() pvcName := helpers.GetInsensitiveParameter(¶meters, "csi.storage.k8s.io/pvc/name") - unlock := cs.LockVolume(volName) + unlock := cs.volumeLock.LockVolume(volName) defer unlock() if contentSource != nil && contentSource.GetSnapshot() != nil { @@ -515,7 +499,7 @@ func (cs *controller) DeleteVolume( } volumeID := strings.ToLower(req.GetVolumeId()) - unlock := cs.LockVolume(volumeID) + unlock := cs.volumeLock.LockVolume(volumeID) defer unlock() // verify if the volume has already been deleted @@ -635,7 +619,7 @@ func (cs *controller) ControllerExpandVolume( "ControllerExpandVolume: no volumeID provided", ) } - unlock := cs.LockVolume(volumeID) + unlock := cs.volumeLock.LockVolume(volumeID) defer unlock() /* round off the new size */ @@ -733,7 +717,7 @@ func (cs *controller) CreateSnapshot( if err != nil { return nil, err } - unlock := cs.LockVolumeWithSnapshot(volumeID, snapName) + unlock := cs.volumeLock.LockVolumeWithSnapshot(volumeID, snapName) defer unlock() snapTimeStamp := time.Now().Unix() @@ -831,7 +815,7 @@ func (cs *controller) DeleteSnapshot( // should succeed when an invalid snapshot id is used return &csi.DeleteSnapshotResponse{}, nil } - unlock := cs.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1]) + unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1]) defer unlock() if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil { return nil, status.Errorf( diff --git a/pkg/driver/volume_lock.go b/pkg/driver/volume_lock.go new file mode 100644 index 00000000..c3b140a8 --- /dev/null +++ b/pkg/driver/volume_lock.go @@ -0,0 +1,49 @@ +package driver + +import ( + "sync" +) + +type volumeLock struct { + cond sync.Cond + locked map[string]struct{} +} + +func newVolumeLock() *volumeLock { + return &volumeLock{ + cond: *sync.NewCond(&sync.Mutex{}), + locked: map[string]struct{}{}, + } +} + +func (l *volumeLock) LockVolume(volume string) func() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + for { + if _, locked := l.locked[volume]; !locked { + break + } + + l.cond.Wait() + } + + l.locked[volume] = struct{}{} + + return func() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + delete(l.locked, volume) + l.cond.Broadcast() + } +} + +func (l *volumeLock) LockVolumeWithSnapshot(volume string, snapshot string) func() { + unlockVol := l.LockVolume(volume) + unlockSnap := l.LockVolume(snapshot) + return func() { + unlockVol() + unlockSnap() + } +}