Skip to content

Commit

Permalink
fixing deadlock, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Hoffman committed Aug 31, 2017
1 parent 698ff7f commit dac251c
Showing 1 changed file with 76 additions and 66 deletions.
142 changes: 76 additions & 66 deletions vault/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,15 @@ func (c *Core) setupExpiration() error {
// Link the token store to this
c.tokenStore.SetExpirationManager(mgr)

go c.expiration.Restore(c.Shutdown, 0)
// Restore the existing state
c.logger.Info("expiration: restoring leases")
errorFunc := func() {
c.logger.Fatal("shutting down")
if err := c.Shutdown(); err != nil {
c.logger.Error("core: error shutting down")
}
}
go c.expiration.Restore(errorFunc, 0)

return nil
}
Expand All @@ -124,22 +132,6 @@ func (c *Core) stopExpiration() error {
return nil
}

// restoreLock checks if we are in "restore" mode and takes out the restore
// lock
func (m *ExpirationManager) restoreLock() {
if m.inRestoreMode() {
m.restoreMutex.Lock()
}
}

// restoreUnlock checks if we are in "restore" mode and unlocks the restore
// lock
func (m *ExpirationManager) restoreUnlock() {
if m.inRestoreMode() {
m.restoreMutex.Unlock()
}
}

// inRestoreMode returns if we are currently in restore mode
func (m *ExpirationManager) inRestoreMode() bool {
return atomic.LoadInt64(&m.restoreMode) == 1
Expand Down Expand Up @@ -262,22 +254,17 @@ func (m *ExpirationManager) Tidy() error {

// Restore is used to recover the lease states when starting.
// This is used after starting the vault.
func (m *ExpirationManager) Restore(errorFunc func() error, loadDelay time.Duration) (retErr error) {
func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) (retErr error) {
defer func() {
if retErr != nil {
m.logger.Error("expiration: error restoring leases, shutting down", "error", retErr)
if err := errorFunc(); err != nil {
m.logger.Error("expiration: error shutting down", "error", err)
}
m.logger.Error("expiration: error restoring leases", "error", retErr)
errorFunc()
}
}()

atomic.StoreInt64(&m.restoreMode, 1)
defer atomic.StoreInt64(&m.restoreMode, 0)

// Restore the existing state
m.logger.Info("expiration: restoring leases")

// Accumulate existing leases
m.logger.Debug("expiration: collecting leases")
existing, err := logical.CollectKeys(m.idView)
Expand All @@ -289,40 +276,21 @@ func (m *ExpirationManager) Restore(errorFunc func() error, loadDelay time.Durat
// Restore each key by pulling from the result chan
restoredCount := 0
for i, leaseID := range existing {
m.restoreMutex.Lock()

if i > 0 && i%500 == 0 {
m.logger.Trace("expiration: leases loading", "progress", i)
}

// If we loaded a lease elsewhere in the expiration manager while in
// restore, skip the entry since it may have been updated since we
// loaded it
if _, ok := m.restoreLoaded[leaseID]; ok {
continue
}

if loadDelay > 0 {
time.Sleep(loadDelay)
}

le, err := m.loadEntryOnly(leaseID)
restored, err := m.loadAndRestoreLease(leaseID)
if err != nil {
m.restoreMutex.Unlock()
return err
}
if restored {
restoredCount++
}

m.restoreLease(le)
restoredCount++
m.restoreMutex.Unlock()
}

if restoredCount > 0 {
if m.logger.IsInfo() {
m.logger.Info("expiration: leases restored", "restored_lease_count", restoredCount)
m.pendingLock.RLock()
m.logger.Info("expiration: pending leases", "lease_count", len(m.pending))
m.pendingLock.RUnlock()
// This exists for test only to add latency to load calls
if loadDelay > 0 {
time.Sleep(loadDelay)
}
}

Expand All @@ -332,9 +300,30 @@ func (m *ExpirationManager) Restore(errorFunc func() error, loadDelay time.Durat
atomic.StoreInt64(&m.restoreMode, 0)
m.restoreMutex.Unlock()

m.logger.Info("expiration: leases restored", "restored_lease_count", restoredCount)
return nil
}

func (m *ExpirationManager) loadAndRestoreLease(leaseID string) (bool, error) {
m.restoreMutex.Lock()
defer m.restoreMutex.Unlock()

// If we loaded a lease elsewhere in the expiration manager while in
// restore, skip the entry since it may have been updated since we
// loaded it
if _, ok := m.restoreLoaded[leaseID]; ok {
return false, nil
}

le, err := m.loadEntryOnly(leaseID)
if err != nil {
return false, err
}

m.restoreLease(le)
return true, nil
}

// restoreLease takes a lease entry that has not been added to the expiration
// manager and adds it back in
func (m *ExpirationManager) restoreLease(le *leaseEntry) {
Expand Down Expand Up @@ -379,8 +368,11 @@ func (m *ExpirationManager) Stop() error {
// Revoke is used to revoke a secret named by the given LeaseID
func (m *ExpirationManager) Revoke(leaseID string) error {
defer metrics.MeasureSince([]string{"expire", "revoke"}, time.Now())
m.restoreLock()
defer m.restoreUnlock()

if m.inRestoreMode() {
m.restoreMutex.Lock()
defer m.restoreMutex.Unlock()
}

return m.revokeCommon(leaseID, false, false)
}
Expand Down Expand Up @@ -439,8 +431,11 @@ func (m *ExpirationManager) revokeCommon(leaseID string, force, skipToken bool)
// revocation error; this is mostly meant for recovery operations
func (m *ExpirationManager) RevokeForce(prefix string) error {
defer metrics.MeasureSince([]string{"expire", "revoke-force"}, time.Now())
m.restoreLock()
defer m.restoreUnlock()

if m.inRestoreMode() {
m.restoreMutex.Lock()
defer m.restoreMutex.Unlock()
}

return m.revokePrefixCommon(prefix, true)
}
Expand All @@ -450,8 +445,11 @@ func (m *ExpirationManager) RevokeForce(prefix string) error {
// to reason about.
func (m *ExpirationManager) RevokePrefix(prefix string) error {
defer metrics.MeasureSince([]string{"expire", "revoke-prefix"}, time.Now())
m.restoreLock()
defer m.restoreUnlock()

if m.inRestoreMode() {
m.restoreMutex.Lock()
defer m.restoreMutex.Unlock()
}

return m.revokePrefixCommon(prefix, false)
}
Expand All @@ -462,8 +460,11 @@ func (m *ExpirationManager) RevokePrefix(prefix string) error {
// token store's revokeSalted function.
func (m *ExpirationManager) RevokeByToken(te *TokenEntry) error {
defer metrics.MeasureSince([]string{"expire", "revoke-by-token"}, time.Now())
m.restoreLock()
defer m.restoreUnlock()

if m.inRestoreMode() {
m.restoreMutex.Lock()
defer m.restoreMutex.Unlock()
}

// Lookup the leases
existing, err := m.lookupByToken(te.ID)
Expand All @@ -473,7 +474,7 @@ func (m *ExpirationManager) RevokeByToken(te *TokenEntry) error {

// Revoke all the keys
for idx, leaseID := range existing {
if err := m.Revoke(leaseID); err != nil {
if err := m.revokeCommon(leaseID, false, false); err != nil {
return fmt.Errorf("failed to revoke '%s' (%d / %d): %v",
leaseID, idx+1, len(existing), err)
}
Expand Down Expand Up @@ -527,8 +528,11 @@ func (m *ExpirationManager) revokePrefixCommon(prefix string, force bool) error
// and a renew interval. The increment may be ignored.
func (m *ExpirationManager) Renew(leaseID string, increment time.Duration) (*logical.Response, error) {
defer metrics.MeasureSince([]string{"expire", "renew"}, time.Now())
m.restoreLock()
defer m.restoreUnlock()

if m.inRestoreMode() {
m.restoreMutex.Lock()
defer m.restoreMutex.Unlock()
}

// Load the entry
le, err := m.loadEntry(leaseID)
Expand Down Expand Up @@ -588,8 +592,11 @@ func (m *ExpirationManager) Renew(leaseID string, increment time.Duration) (*log
func (m *ExpirationManager) RenewToken(req *logical.Request, source string, token string,
increment time.Duration) (*logical.Response, error) {
defer metrics.MeasureSince([]string{"expire", "renew-token"}, time.Now())
m.restoreLock()
defer m.restoreUnlock()

if m.inRestoreMode() {
m.restoreMutex.Lock()
defer m.restoreMutex.Unlock()
}

// Compute the Lease ID
saltedID, err := m.tokenStore.SaltID(token)
Expand Down Expand Up @@ -787,8 +794,11 @@ func (m *ExpirationManager) FetchLeaseTimesByToken(source, token string) (*lease
// those values copied over.
func (m *ExpirationManager) FetchLeaseTimes(leaseID string) (*leaseEntry, error) {
defer metrics.MeasureSince([]string{"expire", "fetch-lease-times"}, time.Now())
m.restoreLock()
defer m.restoreUnlock()

if m.inRestoreMode() {
m.restoreMutex.Lock()
defer m.restoreMutex.Unlock()
}

// Load the entry
le, err := m.loadEntry(leaseID)
Expand Down

0 comments on commit dac251c

Please sign in to comment.