Skip to content

Commit

Permalink
Trid 15436 luks removal update
Browse files Browse the repository at this point in the history
Handle offline/deleted LUN and node device removal in nodeUnstageISCSIVolume
  • Loading branch information
jharrod authored Sep 23, 2024
1 parent 84b2cb2 commit 4b2544b
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 23 deletions.
33 changes: 28 additions & 5 deletions frontend/csi/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,7 @@ func (p *Plugin) nodeUnstageISCSIVolume(
// Remove Portal/LUN entries in self-healing map.
utils.RemoveLUNFromSessions(ctx, publishInfo, &publishedISCSISessions)

var luksMapperPath string
if utils.ParseBool(publishInfo.LUKSEncryption) {
fields := LogFields{"luksDevicePath": publishInfo.DevicePath, "lunID": publishInfo.IscsiLunNumber}

Expand All @@ -1255,9 +1256,17 @@ func (p *Plugin) nodeUnstageISCSIVolume(
}
fields["mapperPath"] = mapperPath

if err = utils.EnsureLUKSDeviceClosed(ctx, publishInfo.DevicePath); err != nil {
Logc(ctx).WithFields(fields).WithError(err).Error("Failed to close LUKS device.")
return err
err = utils.EnsureLUKSDeviceClosedWithMaxWaitLimit(ctx, publishInfo.DevicePath)
if err != nil {
if errors.IsMaxWaitExceededError(err) {
Logc(ctx).WithFields(LogFields{
"devicePath": publishInfo.DevicePath,
"lun": publishInfo.IscsiLunNumber,
"err": err,
}).Debug("LUKS close wait time exceeded, continuing with device removal.")
} else {
return err
}
}

// Get the underlying device mapper device for the block device used by LUKS.
Expand All @@ -1266,8 +1275,8 @@ func (p *Plugin) nodeUnstageISCSIVolume(
Logc(ctx).WithFields(fields).WithError(err).Error("Failed to determine dm device from device mapper.")
}

// At this point, the LUKs device path is no longer useful.
// However, the device mapper device path is (/dev/dm-#).
luksMapperPath = publishInfo.DevicePath
// Save device mapper path to publishInfo for the subsequent removal steps.
publishInfo.DevicePath = dmDevicePath
}

Expand Down Expand Up @@ -1350,6 +1359,20 @@ func (p *Plugin) nodeUnstageISCSIVolume(
return status.Error(codes.Internal, errStr)
}

// If the luks device still exists, it means the device was unable to be closed prior to removing the block
// device. This can happen if the LUN was deleted or offline. It should be removable by this point.
// It needs to be removed prior to removing the 'unmappedMpathDevice' device below.
if luksMapperPath != "" {
// EnsureLUKSDeviceClosed will not return an error if the device is already closed or removed.
if err = utils.EnsureLUKSDeviceClosed(ctx, luksMapperPath); err != nil {
Logc(ctx).WithFields(LogFields{
"devicePath": luksMapperPath,
}).WithError(err).Warning("Unable to remove LUKS mapper device.")
}
// Clear the time duration for the LUKS device.
delete(utils.LuksCloseDurations, luksMapperPath)
}

// If there is multipath device, flush(remove) mappings
if err := utils.RemoveMultipathDeviceMapping(ctx, unmappedMpathDevice); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions utils/devices_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,9 @@ func (d *LUKSDevice) Resize(ctx context.Context, luksPassphrase string) error {
defer Logc(ctx).Debug("<<<< devices_darwin.Resize")
return errors.UnsupportedError("Resize is not supported for darwin")
}

func EnsureLUKSDeviceClosedWithMaxWaitLimit(ctx context.Context, luksDevicePath string) error {
Logc(ctx).Debug(">>>> devices_darwin.EnsureLUKSDeviceClosedWithMaxWaitLimit")
defer Logc(ctx).Debug("<<<< devices_darwin.EnsureLUKSDeviceClosedWithMaxWaitLimit")
return errors.UnsupportedError("EnsureLUKSDeviceClosedWithMaxWaitLimit is not supported for darwin")
}
46 changes: 30 additions & 16 deletions utils/devices_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,27 +326,41 @@ func IsLUKSDeviceOpen(ctx context.Context, devicePath string) (bool, error) {
// EnsureLUKSDeviceClosed ensures there is no open LUKS device at the specified path (example: "/dev/mapper/<luks>").
func EnsureLUKSDeviceClosed(ctx context.Context, devicePath string) error {
GenerateRequestContextForLayer(ctx, LogLayerUtils)

_, err := osFs.Stat(devicePath)
if err != nil {
if os.IsNotExist(err) {
Logc(ctx).WithField("device", devicePath).Debug("LUKS device not found.")
return nil
}
Logc(ctx).WithField("device", devicePath).WithError(err).Debug("Failed to stat device")
return fmt.Errorf("could not stat device: %s %v", devicePath, err)
if err == nil {
return closeLUKSDevice(ctx, devicePath)
} else if !os.IsNotExist(err) {
Logc(ctx).WithFields(LogFields{
"device": devicePath,
"error": err.Error(),
}).Debug("Failed to stat device")
return fmt.Errorf("could not stat device: %s %v.", devicePath, err)
}
Logc(ctx).WithFields(LogFields{
"device": devicePath,
}).Debug("LUKS device not found.")

// Only invoke close on a valid LUKS device.
luksDevice, err := NewLUKSDeviceFromMappingPath(ctx, devicePath, "")
if err != nil {
return nil
}

func EnsureLUKSDeviceClosedWithMaxWaitLimit(ctx context.Context, luksDevicePath string) error {
if err := EnsureLUKSDeviceClosed(ctx, luksDevicePath); err != nil {
if LuksCloseDurations[luksDevicePath].IsZero() {
LuksCloseDurations[luksDevicePath] = time.Now()
}
elapsed := time.Since(LuksCloseDurations[luksDevicePath])
if elapsed > luksCloseMaxWaitDuration {
Logc(ctx).WithFields(
LogFields{
"device": luksDevicePath,
"elapsed": elapsed,
"maxWait": luksDevicePath,
}).Debug("LUKS close max wait time expired, continuing with removal.")
return errors.MaxWaitExceededError(fmt.Sprintf("LUKS close wait time expired. Elapsed: %v", elapsed))
}
return err
}
if isLuks, err := luksDevice.IsLUKSFormatted(ctx); err != nil || !isLuks {
return fmt.Errorf("device %s is not a LUKS device", devicePath)
}

return closeLUKSDevice(ctx, devicePath)
return nil
}

// getDeviceFSType returns the filesystem for the supplied device.
Expand Down
68 changes: 66 additions & 2 deletions utils/devices_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -360,8 +361,6 @@ func TestEnsureLUKSDeviceClosed_ClosesDevice(t *testing.T) {

// Setup mock calls and reassign any clients to their mock counterparts.
gomock.InOrder(
mockCryptsetupLuksStatus(mockCommand).Return([]byte("device: /dev/mapper/luks-test-dev"), nil),
mockCryptsetupIsLuks(mockCommand).Return([]byte(""), nil),
mockCryptsetupLuksClose(mockCommand).Return([]byte(""), nil),
)
command = mockCommand
Expand Down Expand Up @@ -750,3 +749,68 @@ func TestResize_FailsWithExecError(t *testing.T) {
unexpectedError := errors.IncorrectLUKSPassphraseError("")
assert.NotErrorIs(t, err, unexpectedError)
}

func TestEnsureLUKSDeviceClosedWithMaxWaitLimit(t *testing.T) {
defer func(previousCommand exec.Command) {
command = previousCommand
}(command)

defer func() {
osFs = afero.NewOsFs()
}()

osFs = afero.NewMemMapFs()
luksDevicePath := "/dev/mapper/luks-test"
osFs.Create(luksDevicePath)
client := mockexec.NewMockCommand(gomock.NewController(t))
command = client // Set package var to mock

type testCase struct {
name string
mockSetup func(*mockexec.MockCommand)
expectedError bool
expectedErrType error
}

testCases := []testCase{
{
name: "SucceedsWhenDeviceIsClosed",
mockSetup: func(mockCommand *mockexec.MockCommand) {
mockCryptsetupLuksClose(mockCommand).Return([]byte(""), nil)
},
expectedError: false,
},
{
name: "FailsBeforeMaxWaitLimit",
mockSetup: func(mockCommand *mockexec.MockCommand) {
mockCryptsetupLuksClose(mockCommand).Return([]byte(""), fmt.Errorf("close error"))
},
expectedError: true,
expectedErrType: fmt.Errorf("%w", errors.New("")),
},
{
name: "FailsWithMaxWaitExceededError",
mockSetup: func(mockCommand *mockexec.MockCommand) {
mockCryptsetupLuksClose(mockCommand).Return([]byte(""), fmt.Errorf("close error"))
LuksCloseDurations[luksDevicePath] = time.Now().Add(-luksCloseMaxWaitDuration - time.Second)
},
expectedError: true,
expectedErrType: errors.MaxWaitExceededError(""),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tc.mockSetup(client)
err := EnsureLUKSDeviceClosedWithMaxWaitLimit(context.TODO(), luksDevicePath)
if tc.expectedError {
assert.Error(t, err)
if tc.expectedErrType != nil {
assert.IsType(t, tc.expectedErrType, err)
}
} else {
assert.NoError(t, err)
}
})
}
}
6 changes: 6 additions & 0 deletions utils/devices_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,9 @@ func GetUnderlyingDevicePathForLUKSDevice(ctx context.Context, luksDevicePath st
defer Logc(ctx).Debug("<<<< devices_windows.GetUnderlyingDevicePathForLUKSDevice")
return "", errors.UnsupportedError("GetUnderlyingDevicePathForLUKSDevice is not supported for windows")
}

func EnsureLUKSDeviceClosedWithMaxWaitLimit(ctx context.Context, luksDevicePath string) error {
Logc(ctx).Debug(">>>> devices_windows.EnsureLUKSDeviceClosedWithMaxWaitLimit")
defer Logc(ctx).Debug("<<<< devices_windows.EnsureLUKSDeviceClosedWithMaxWaitLimit")
return errors.UnsupportedError("EnsureLUKSDeviceClosedWithMaxWaitLimit is not supported for windows")
}
22 changes: 22 additions & 0 deletions utils/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,28 @@ func IsTimeoutError(err error) bool {
return ok
}

// ///////////////////////////////////////////////////////////////////////////
// maxWaitExceededError
// ///////////////////////////////////////////////////////////////////////////

type maxWaitExceededError struct {
message string
}

func (e *maxWaitExceededError) Error() string { return e.message }

func MaxWaitExceededError(message string) error {
return &maxWaitExceededError{message}
}

func IsMaxWaitExceededError(err error) bool {
if err == nil {
return false
}
_, ok := err.(*maxWaitExceededError)
return ok
}

// ///////////////////////////////////////////////////////////////////////////
// reconcileDeferredError
// ///////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 4 additions & 0 deletions utils/iscsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
const (
temporaryMountDir = "/tmp_mnt"
iSCSIMaxFlushWaitDuration = 6 * time.Minute
luksCloseMaxWaitDuration = 2 * time.Minute
SessionSourceNodeStage = "nodeStage"
SessionSourceTrackingInfo = "trackingInfo"
SessionSourceCurrentStatus = "currentStatus"
Expand All @@ -40,6 +41,9 @@ var (
// Non-persistent map to maintain flush delays/errors if any, for device path(s).
iSCSIVolumeFlushExceptions = make(map[string]time.Time)

// Non-persistent map to maintain LUKS close durations.
LuksCloseDurations = make(map[string]time.Time)

// perNodeIgroupRegex is used to ensure an igroup meets the following format:
// <up to and including 59 characters of a container orchestrator node name>-<36 characters of trident version uuid>
// ex: Kubernetes-NodeA-01-ad1b8212-8095-49a0-82d4-ef4f8b5b620z
Expand Down

0 comments on commit 4b2544b

Please sign in to comment.