Skip to content

Commit

Permalink
fix issue with context being cancelled prematurely
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump committed Nov 18, 2024
1 parent bb123e9 commit 0e8afb8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
19 changes: 12 additions & 7 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
b.pool.RemoveConn(c)
}
}()
var checkClosers []io.Closer
defer func() {
for _, c := range checkClosers {
_ = c.Close()
}
}()
b.mu.Lock()
defer b.mu.Unlock()
newConns := make([]conn.Conn, 0, len(b.conns)+numAdded-numRemoved)
Expand All @@ -312,7 +318,7 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
delete(b.connInfo, existing)
info.cancel()
if info.closeChecker != nil {
_ = info.closeChecker.Close()
checkClosers = append(checkClosers, info.closeChecker)
}
continue
}
Expand All @@ -331,12 +337,6 @@ func (b *balancer) initConnInfoLocked(conns []conn.Conn) {
connection := conns[i]
connCtx, connCancel := context.WithCancel(b.ctx)
healthChecker := b.healthChecker.New(connCtx, connection, b)
go func() {
defer connCancel()
if err := connection.Prewarm(connCtx); err == nil {
b.warmedUp(connection)
}
}()
cancel := connCancel
if b.roundTripperMaxLifetime != 0 {
timer := b.clock.AfterFunc(b.roundTripperMaxLifetime, func() {
Expand All @@ -348,6 +348,11 @@ func (b *balancer) initConnInfoLocked(conns []conn.Conn) {
}
}
b.connInfo[connection] = connInfo{closeChecker: healthChecker, cancel: cancel}
go func() {
if err := connection.Prewarm(connCtx); err == nil {
b.warmedUp(connection)
}
}()
}
}

Expand Down
26 changes: 24 additions & 2 deletions internal/balancertesting/balancertesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func NewFakeHealthChecker() *FakeHealthChecker {
// New implements the healthchecker.Checker interface. It will use the
// given tracker to mark the given connection with the currently configured
// initial health state (which defaults to health).
func (hc *FakeHealthChecker) New(_ context.Context, connection conn.Conn, tracker health.Tracker) io.Closer {
func (hc *FakeHealthChecker) New(ctx context.Context, connection conn.Conn, tracker health.Tracker) io.Closer {
hc.mu.Lock()
defer hc.mu.Unlock()
state := hc.initialState
Expand All @@ -357,6 +357,14 @@ func (hc *FakeHealthChecker) New(_ context.Context, connection conn.Conn, tracke
if ch := hc.initialized[connection]; ch != nil {
close(ch)
}
context.AfterFunc(ctx, func() {
// Automatically force state to unhealthy after context is cancelled.
tracker := hc.updateHealthState(connection, health.StateUnhealthy, true)
if tracker == nil {
return
}
tracker.UpdateHealthState(connection, health.StateUnhealthy)
})
}()
select {
case hc.checkersUpdated <- struct{}{}:
Expand All @@ -378,10 +386,24 @@ func (hc *FakeHealthChecker) New(_ context.Context, connection conn.Conn, tracke

// UpdateHealthState allows the state of a connection to be changed.
func (hc *FakeHealthChecker) UpdateHealthState(connection conn.Conn, state health.State) {
tracker := hc.updateHealthState(connection, state, false)
if tracker != nil {
tracker.UpdateHealthState(connection, state)
}
}

func (hc *FakeHealthChecker) updateHealthState(connection conn.Conn, state health.State, removeTracker bool) health.Tracker {
hc.mu.Lock()
defer hc.mu.Unlock()
hc.trackers[connection].UpdateHealthState(connection, state)
tracker := hc.trackers[connection]
if tracker == nil {
return nil
}
hc.conns[connection] = state
if removeTracker {
delete(hc.trackers, connection)
}
return tracker
}

// SetInitialState sets the state that new connections will be put into
Expand Down

0 comments on commit 0e8afb8

Please sign in to comment.