Skip to content

Commit

Permalink
fix(prov): add controller to avoid potential volume leaks (#16)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashpal Choudhary <[email protected]>
  • Loading branch information
iyashu authored Jun 8, 2021
1 parent ca58e58 commit f45aec9
Show file tree
Hide file tree
Showing 3 changed files with 411 additions and 29 deletions.
80 changes: 51 additions & 29 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
k8sapi "github.com/openebs/lib-csi/pkg/client/k8s"
"github.com/openebs/lib-csi/pkg/common/errors"
"github.com/openebs/lib-csi/pkg/common/helpers"
"github.com/openebs/lib-csi/pkg/csipv"
schd "github.com/openebs/lib-csi/pkg/scheduler"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -65,6 +68,8 @@ type controller struct {

k8sNodeInformer cache.SharedIndexInformer
deviceNodeInformer cache.SharedIndexInformer

leakProtection *csipv.LeakProtectionController
}

// NewController returns a new instance
Expand Down Expand Up @@ -194,6 +199,22 @@ func (cs *controller) init() error {
cs.k8sNodeInformer.HasSynced,
cs.deviceNodeInformer.HasSynced)
klog.Info("synced k8s & device node informer caches")

klog.Infof("initializing csi provisioning leak protection controller")
pvcInformer := kubeInformerFactory.Core().V1().PersistentVolumeClaims()
go pvcInformer.Informer().Run(stopCh)
if cs.leakProtection, err = csipv.NewLeakProtectionController(kubeClient,
pvcInformer, cs.driver.config.DriverName,
func(pvc *corev1.PersistentVolumeClaim, volumeName string) error {
// use default timeout of 10s for deletion.
ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelCtx()
return cs.deleteVolume(ctx, volumeName)
},
); err != nil {
return errors.Wrap(err, "failed to init leak protection controller")
}
go cs.leakProtection.Run(2, stopCh)
return nil
}

Expand Down Expand Up @@ -294,6 +315,14 @@ func (cs *controller) CreateVolume(
return nil, status.Error(codes.Unimplemented, "")
}

// mark volume for leak protection if pvc gets deleted
// before the creation of pv.
var finishCreateVolume func()
if finishCreateVolume, err = cs.leakProtection.BeginCreateVolume(volName,
params.PVCNamespace, params.PVCName); err != nil {
return nil, err
}
defer finishCreateVolume()
vol, err = CreateDeviceVolume(ctx, req, params)

if err != nil {
Expand All @@ -317,47 +346,40 @@ func (cs *controller) DeleteVolume(
ctx context.Context,
req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {

klog.Infof("received request to delete volume {%s}", req.VolumeId)

var (
err error
)

var err error
if err = cs.validateDeleteVolumeReq(req); err != nil {
return nil, err
}

volumeID := strings.ToLower(req.GetVolumeId())

// verify if the volume has already been deleted
vol, err := device.GetDeviceVolume(volumeID)
if vol != nil && vol.DeletionTimestamp != nil {
goto deleteResponse
if err = cs.deleteVolume(ctx, volumeID); err != nil {
return nil, err
}
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

func (cs *controller) deleteVolume(ctx context.Context, volumeID string) error {
klog.Infof("received request to delete volume %q", volumeID)
vol, err := device.GetDeviceVolume(volumeID)
if err != nil {
if k8serror.IsNotFound(err) {
goto deleteResponse
return nil
}
return nil, errors.Wrapf(
err,
"failed to get volume for {%s}",
volumeID,
)
return errors.Wrapf(err,
"failed to get volume for {%s}", volumeID)
}

// Delete the corresponding Device Volume CR
err = device.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
// if volume is not already triggered for deletion, delete the volume.
// otherwise, just wait for the existing deletion operation to complete.
if vol.GetDeletionTimestamp() == nil {
if err = device.DeleteVolume(volumeID); err != nil {
return errors.Wrapf(err,
"failed to handle delete volume request for {%s}", volumeID)
}
}

deleteResponse:
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
if err = device.WaitForDeviceVolumeDestroy(ctx, volumeID); err != nil {
return err
}
return nil
}

func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
Expand Down
Loading

0 comments on commit f45aec9

Please sign in to comment.