Skip to content

Commit

Permalink
Use single mutex for volume locks to prevent memory leak
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Berneking <[email protected]>
  • Loading branch information
Lucaber committed Oct 4, 2024
1 parent 32b9de3 commit 751deb2
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 23 deletions.
30 changes: 7 additions & 23 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -65,23 +64,7 @@ type controller struct {
k8sNodeInformer cache.SharedIndexInformer
zfsNodeInformer cache.SharedIndexInformer

volMutexes sync.Map
}

func (cs *controller) LockVolume(volume string) func() {
value, _ := cs.volMutexes.LoadOrStore(volume, &sync.Mutex{})
mtx := value.(*sync.Mutex)
mtx.Lock()
return func() { mtx.Unlock() }
}

func (cs *controller) LockVolumeWithSnapshot(volume string, snapshot string) func() {
unlockVol := cs.LockVolume(volume)
unlockSnap := cs.LockVolume(snapshot)
return func() {
unlockVol()
unlockSnap()
}
volumeLock *volumeLock
}

// NewController returns a new instance
Expand All @@ -90,6 +73,7 @@ func NewController(d *CSIDriver) csi.ControllerServer {
ctrl := &controller{
driver: d,
capabilities: newControllerCapabilities(),
volumeLock: newVolumeLock(),
}
if err := ctrl.init(); err != nil {
klog.Fatalf("init controller: %v", err)
Expand Down Expand Up @@ -467,7 +451,7 @@ func (cs *controller) CreateVolume(
contentSource := req.GetVolumeContentSource()
pvcName := helpers.GetInsensitiveParameter(&parameters, "csi.storage.k8s.io/pvc/name")

unlock := cs.LockVolume(volName)
unlock := cs.volumeLock.LockVolume(volName)
defer unlock()

if contentSource != nil && contentSource.GetSnapshot() != nil {
Expand Down Expand Up @@ -513,7 +497,7 @@ func (cs *controller) DeleteVolume(
}

volumeID := strings.ToLower(req.GetVolumeId())
unlock := cs.LockVolume(volumeID)
unlock := cs.volumeLock.LockVolume(volumeID)
defer unlock()

// verify if the volume has already been deleted
Expand Down Expand Up @@ -633,7 +617,7 @@ func (cs *controller) ControllerExpandVolume(
"ControllerExpandVolume: no volumeID provided",
)
}
unlock := cs.LockVolume(volumeID)
unlock := cs.volumeLock.LockVolume(volumeID)
defer unlock()

/* round off the new size */
Expand Down Expand Up @@ -731,7 +715,7 @@ func (cs *controller) CreateSnapshot(
if err != nil {
return nil, err
}
unlock := cs.LockVolumeWithSnapshot(volumeID, snapName)
unlock := cs.volumeLock.LockVolumeWithSnapshot(volumeID, snapName)
defer unlock()

snapTimeStamp := time.Now().Unix()
Expand Down Expand Up @@ -829,7 +813,7 @@ func (cs *controller) DeleteSnapshot(
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
unlock := cs.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1])
unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1])
defer unlock()
if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(
Expand Down
49 changes: 49 additions & 0 deletions pkg/driver/volume_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package driver

import (
"sync"
)

type volumeLock struct {
cond sync.Cond
locked map[string]struct{}
}

func newVolumeLock() *volumeLock {
return &volumeLock{
cond: *sync.NewCond(&sync.Mutex{}),
locked: map[string]struct{}{},
}
}

func (l *volumeLock) LockVolume(volume string) func() {
l.cond.L.Lock()
defer l.cond.L.Unlock()

for {
if _, locked := l.locked[volume]; !locked {
break
}

l.cond.Wait()
}

l.locked[volume] = struct{}{}

return func() {
l.cond.L.Lock()
defer l.cond.L.Unlock()

delete(l.locked, volume)
l.cond.Broadcast()
}
}

func (l *volumeLock) LockVolumeWithSnapshot(volume string, snapshot string) func() {
unlockVol := l.LockVolume(volume)
unlockSnap := l.LockVolume(snapshot)
return func() {
unlockVol()
unlockSnap()
}
}

0 comments on commit 751deb2

Please sign in to comment.