Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fn: Remove ctx from GoroutineManager constructor #9341

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ The underlying functionality between those two options remain the same.
* A code refactor that [moves all the graph related DB code out of the
`channeldb` package](https://github.com/lightningnetwork/lnd/pull/9236) and
into the `graph/db` package.

* [Improve the API](https://github.com/lightningnetwork/lnd/pull/9341) of the
[GoroutineManager](https://github.com/lightningnetwork/lnd/pull/9141) so that
its constructor does not take a context.
ellemouton marked this conversation as resolved.
Show resolved Hide resolved

## Tooling and Documentation

Expand Down
152 changes: 117 additions & 35 deletions fn/goroutine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,123 @@ package fn
import (
"context"
"sync"
"sync/atomic"
)

// GoroutineManager is used to launch goroutines until context expires or the
// manager is stopped. The Stop method blocks until all started goroutines stop.
type GoroutineManager struct {
wg sync.WaitGroup
mu sync.Mutex
ctx context.Context
cancel func()
// id is used to generate unique ids for each goroutine.
id atomic.Uint32

// cancelFns is a map of cancel functions that can be used to cancel the
// context of a goroutine. The mutex must be held when accessing this
// map. The key is the id of the goroutine.
cancelFns map[uint32]context.CancelFunc

mu sync.Mutex

stopped sync.Once
quit chan struct{}
wg sync.WaitGroup
}

// NewGoroutineManager constructs and returns a new instance of
// GoroutineManager.
func NewGoroutineManager(ctx context.Context) *GoroutineManager {
ctx, cancel := context.WithCancel(ctx)

func NewGoroutineManager() *GoroutineManager {
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
return &GoroutineManager{
ctx: ctx,
cancel: cancel,
cancelFns: make(map[uint32]context.CancelFunc),
quit: make(chan struct{}),
}
}

// addCancelFn adds a context cancel function to the manager and returns an id
// that can can be used to cancel the context later on when the goroutine is
// done.
func (g *GoroutineManager) addCancelFn(cancel context.CancelFunc) uint32 {
g.mu.Lock()
defer g.mu.Unlock()

id := g.id.Add(1)
g.cancelFns[id] = cancel

return id
}

// cancel cancels the context associated with the passed id.
func (g *GoroutineManager) cancel(id uint32) {
g.mu.Lock()
defer g.mu.Unlock()

g.cancelUnsafe(id)
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
}

// cancelUnsafe cancels the context associated with the passed id without
// acquiring the mutex.
func (g *GoroutineManager) cancelUnsafe(id uint32) {
fn, ok := g.cancelFns[id]
if !ok {
return
}

fn()

delete(g.cancelFns, id)
}

// Go tries to start a new goroutine and returns a boolean indicating its
// success. It fails iff the goroutine manager is stopping or its context passed
// to NewGoroutineManager has expired.
func (g *GoroutineManager) Go(f func(ctx context.Context)) bool {
// Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race
// condition, since it is not clear should Wait() block or not. This
// success. It returns true if the goroutine was successfully created and false
// otherwise. A goroutine will fail to be created iff the goroutine manager is
// stopping or the passed context has already expired. The passed call-back
// function must exit if the passed context expires.
func (g *GoroutineManager) Go(ctx context.Context,
f func(ctx context.Context)) bool {

// Derive a cancellable context from the passed context and store its
// cancel function in the manager. The context will be cancelled when
// either the parent context is cancelled or the quit channel is closed
// which will call the stored cancel function.
ctx, cancel := context.WithCancel(ctx)
id := g.addCancelFn(cancel)

// Calling wg.Add(1) and wg.Wait() when the wg's counter is 0 is a race
// condition, since it is not clear if Wait() should block or not. This
// kind of race condition is detected by Go runtime and results in a
// crash if running with `-race`. To prevent this, whole Go method is
// protected with a mutex. The call to wg.Wait() inside Stop() can still
// run in parallel with Go, but in that case g.ctx is in expired state,
// because cancel() was called in Stop, so Go returns before wg.Add(1)
// call.
// crash if running with `-race`. To prevent this, we protect the calls
// to wg.Add(1) and wg.Wait() with a mutex. If we block here because
// Stop is running first, then Stop will close the quit channel which
// will cause the context to be cancelled, and we will exit before
// calling wg.Add(1). If we grab the mutex here before Stop does, then
// Stop will block until after we call wg.Add(1).
g.mu.Lock()
defer g.mu.Unlock()

if g.ctx.Err() != nil {
// Before continuing to start the goroutine, we need to check if the
// context has already expired. This could be the case if the parent
// context has already expired or if Stop has been called.
if ctx.Err() != nil {
g.cancelUnsafe(id)

return false
}

// Ensure that the goroutine is not started if the manager has stopped.
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-g.quit:
g.cancelUnsafe(id)
ellemouton marked this conversation as resolved.
Show resolved Hide resolved

return false
default:
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
f(g.ctx)
defer func() {
g.cancel(id)
g.wg.Done()
}()

f(ctx)
}()

return true
Expand All @@ -56,20 +128,30 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) bool {
// Stop prevents new goroutines from being added and waits for all running
// goroutines to finish.
func (g *GoroutineManager) Stop() {
g.mu.Lock()
g.cancel()
g.mu.Unlock()

// Wait for all goroutines to finish. Note that this wg.Wait() call is
// safe, since it can't run in parallel with wg.Add(1) call in Go, since
// we just cancelled the context and even if Go call starts running here
// after acquiring the mutex, it would see that the context has expired
// and return false instead of calling wg.Add(1).
g.wg.Wait()
g.stopped.Do(func() {
// Closing the quit channel will prevent any new goroutines from
// starting.
g.mu.Lock()
close(g.quit)
for _, cancel := range g.cancelFns {
cancel()
}
Comment on lines +136 to +138
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to clear the map here, to free memory.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do, although i dont think it is necessary since this is not a restartable system. This will only be called when the system is shutting down right? if we set the map to nil here, then you could argue that in every Stop method we have in LND, we should go and set each object to nil.

Interested to hear what a second reviewer thinks too. But yeah, can defs add a g.cancelFns = nil line but cant do delete(g.cancelFns, id) since deleting from a map while iterating over it is not safe i think

g.mu.Unlock()

// Wait for all goroutines to finish. Note that this wg.Wait()
// call is safe, since it can't run in parallel with wg.Add(1)
// call in Go, since we just cancelled the context and even if
// Go call starts running here after acquiring the mutex, it
// would see that the context has expired and return false
// instead of calling wg.Add(1).
g.wg.Wait()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update changes the behavior of Stop(). Previously, if one goroutine called Stop() and another goroutine invoked Stop() concurrently, the second call would block, waiting for the first call to complete. Now, the second Stop() call returns immediately.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that a problem? is there a usecase for needing to support the blocking behaviour? afaiu, this will mostly be used within other sub-systems and Stop will be called by their Stop methods which typically will also only be called once

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous version Stop() actually did StopAndWait(). If we support multiple Stop() calls, I think they should behave the same way. If someone calls Stop() from multiple places, they are likely to expect that both calls block until all goroutines finish, not only the first call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just dont think more than 1 system will ever own the GoroutineManager right?

ie, what makes this Stop different from other Stop methods of other subsystems in LND which use a similar pattern to this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just dont think more than 1 system will ever own the GoroutineManager right?

I think, the intuition of Stop() method is that after Stop() call (successfully) returning, all the workers are down. If the second Stop() just returns immediately, this intuition is broken.

Can we panic or return error if a second Stop is detected?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see there is a Done() method - we could always update this to return a dedicated shutdown channel that is only closed after Wait() in Stop

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bhandras - perhaps a 3rd opinion here just to break the tie would help? 🙏

Copy link
Collaborator

@bhandras bhandras Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 3 sgtm! (Done with shutdown chan)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool - pushed a diff with option 3 included 🫡

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah - just realised we cannot do this since callers may wait on Done() inside a goroutine that was started within Go().

Undoing this change

})
}

// Done returns a channel which is closed when either the context passed to
// NewGoroutineManager expires or when Stop is called.
// Done returns a channel which is closed once Stop has been called and the
// quit channel closed. Note that the channel closing indicates that shutdown
// of the GoroutineManager has started but not necessarily that the Stop method
// has finished.
func (g *GoroutineManager) Done() <-chan struct{} {
return g.ctx.Done()
return g.quit
}
Loading
Loading