Skip to content

Commit

Permalink
add max lifetime for round-tripper, which allows for recycling leaf t…
Browse files Browse the repository at this point in the history
…ransports
  • Loading branch information
jhump committed Feb 28, 2024
1 parent 1bbe543 commit fcfa1d5
Show file tree
Hide file tree
Showing 4 changed files with 376 additions and 56 deletions.
155 changes: 126 additions & 29 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,23 @@ func newBalancer(
picker func(prev picker.Picker, allConns conn.Conns) picker.Picker,
checker health.Checker,
pool connPool,
roundTripperMaxLifetime time.Duration,
) *balancer {
ctx, cancel := context.WithCancel(ctx)
balancer := &balancer{
ctx: ctx,
cancel: cancel,
pool: pool,
newPicker: picker,
healthChecker: checker,
resolverUpdates: make(chan struct{}, 1),
closed: make(chan struct{}),
connInfo: map[conn.Conn]connInfo{},
clock: internal.NewRealClock(),
ctx: ctx,
cancel: cancel,
pool: pool,
newPicker: picker,
healthChecker: checker,
roundTripperMaxLifetime: roundTripperMaxLifetime,
resolverUpdates: make(chan struct{}, 1),
recycleConns: make(chan struct{}, 1),
closed: make(chan struct{}),
connInfo: map[conn.Conn]connInfo{},
clock: internal.NewRealClock(),
}
balancer.connManager.updateFunc = balancer.updateConns
return balancer
}

Expand All @@ -81,6 +85,8 @@ type balancer struct {
healthChecker health.Checker // +checklocksignore: mu is not required, but happens to always be held.
connManager connManager

roundTripperMaxLifetime time.Duration // +checklocksignore: mu is not required, but happens to always be held.

// NB: only set from tests
updateHook func([]resolver.Address, []conn.Conn)

Expand All @@ -93,6 +99,7 @@ type balancer struct {
latestAddrs atomic.Pointer[[]resolver.Address]
latestErr atomic.Pointer[error]
resolverUpdates chan struct{}
recycleConns chan struct{}
clock internal.Clock

mu sync.Mutex
Expand All @@ -106,6 +113,8 @@ type balancer struct {
connInfo map[conn.Conn]connInfo
// +checklocks:mu
reresolveLastCall time.Time
// +checklocks:mu
connsToRecycle []conn.Conn
}

func (b *balancer) UpdateHealthState(connection conn.Conn, state health.State) {
Expand Down Expand Up @@ -195,7 +204,7 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
for key, info := range b.connInfo {
delete(b.connInfo, key)
closer := info.closeChecker
info.cancelWarm()
info.cancel()
if closer != nil {
grp.Go(doClose(closer))
}
Expand All @@ -213,6 +222,15 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-b.recycleConns:
b.mu.Lock()
connsToRecycle := b.connsToRecycle
b.connsToRecycle = nil
b.mu.Unlock()
if len(connsToRecycle) > 0 {
b.connManager.recycleConns(connsToRecycle)
}

case <-b.resolverUpdates:
addrs := b.latestAddrs.Load()
if addrs == nil {
Expand Down Expand Up @@ -242,7 +260,7 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
if len(*addrs) > 0 {
addrsClone := make([]resolver.Address, len(*addrs))
copy(addrsClone, *addrs)
b.connManager.reconcileAddresses(addrsClone, b.updateConns)
b.connManager.reconcileAddresses(addrsClone)
}
}
}
Expand Down Expand Up @@ -282,7 +300,7 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
// and omit it from newConns
info := b.connInfo[existing]
delete(b.connInfo, existing)
info.cancelWarm()
info.cancel()
if info.closeChecker != nil {
_ = info.closeChecker.Close()
}
Expand All @@ -291,8 +309,16 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
newConns = append(newConns, existing)
}
newConns = append(newConns, addConns...)
for i := range addConns {
connection := addConns[i]
b.initConnInfoLocked(addConns)
b.conns = newConns
b.newPickerLocked()
return addConns
}

// +checklocks:b.mu
func (b *balancer) initConnInfoLocked(conns []conn.Conn) {
for i := range conns {
connection := conns[i]
connCtx, connCancel := context.WithCancel(b.ctx)
healthChecker := b.healthChecker.New(connCtx, connection, b)
go func() {
Expand All @@ -301,11 +327,18 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
b.warmedUp(connection)
}
}()
b.connInfo[connection] = connInfo{closeChecker: healthChecker, cancelWarm: connCancel}
cancel := connCancel
if b.roundTripperMaxLifetime != 0 {
timer := b.clock.AfterFunc(b.roundTripperMaxLifetime, func() {
b.recycle(connection)
})
cancel = func() {
connCancel()
timer.Stop()
}
}
b.connInfo[connection] = connInfo{closeChecker: healthChecker, cancel: cancel}
}
b.conns = newConns
b.newPickerLocked()
return addConns
}

// +checklocks:b.mu
Expand Down Expand Up @@ -392,19 +425,36 @@ func (b *balancer) setErrorPickerLocked(err error) {
b.pool.UpdatePicker(picker.ErrorPicker(err), false)
}

func (b *balancer) recycle(c conn.Conn) {
b.mu.Lock()
defer b.mu.Unlock()
b.connsToRecycle = append(b.connsToRecycle, c)
// Notify goroutine that there is a connection to recycle.
select {
case b.recycleConns <- struct{}{}:
default:
}
}

type connInfo struct {
state health.State
warm bool
cancelWarm context.CancelFunc
state health.State
warm bool

// Cancels any in-progress warm-up and also cancels any timer
// for recycling the connection. Invoked when the connection
// is closed.
cancel context.CancelFunc
closeChecker io.Closer
}

type connManager struct {
// only modified by a single goroutine, so mu is not necessary
// only used by a single goroutine, so no mutex necessary
connsByAddr map[string][]conn.Conn

updateFunc func([]resolver.Address, []conn.Conn) []conn.Conn
}

func (c *connManager) reconcileAddresses(addrs []resolver.Address, updateFunc func([]resolver.Address, []conn.Conn) []conn.Conn) {
func (c *connManager) reconcileAddresses(addrs []resolver.Address) {
// TODO: future extension: make connection establishing strategy configurable
// (which would allow more sophisticated connection strategies in the face
// of, for example, layer-4 load balancers)
Expand Down Expand Up @@ -446,13 +496,60 @@ func (c *connManager) reconcileAddresses(addrs []resolver.Address, updateFunc fu
newAddrs = append(newAddrs, want...)
}

c.connsByAddr = remaining
c.doUpdate(newAddrs, toRemove)
}

func (c *connManager) doUpdate(newAddrs []resolver.Address, toRemove []conn.Conn) {
// we make a single call to update connections in batch to create a single
// new picker (avoids potential picker churn from making one change at a time)
newConns := updateFunc(newAddrs, toRemove)
// add newConns to remaining to compute new set of connections
for _, c := range newConns {
hostPort := c.Address().HostPort
remaining[hostPort] = append(remaining[hostPort], c)
newConns := c.updateFunc(newAddrs, toRemove)
// add newConns to set of connections
for _, cn := range newConns {
hostPort := cn.Address().HostPort
c.connsByAddr[hostPort] = append(c.connsByAddr[hostPort], cn)
}
c.connsByAddr = remaining
}

func (c *connManager) recycleConns(connsToRecycle []conn.Conn) {
var needToCompact bool
for i, cn := range connsToRecycle {
addr := cn.Address().HostPort
existing := c.connsByAddr[addr]
var found bool
for i, existingConn := range existing {
if existingConn == cn {
found = true
// remove cn from the slice
copy(existing[i:], existing[i+1:])
c.connsByAddr[addr] = existing[:len(existing)-1]
break
}
}
if !found {
// this connection has already been closed/removed
connsToRecycle[i] = nil
needToCompact = true
}
}
if needToCompact {
i := 0
for _, cn := range connsToRecycle {
if cn != nil {
connsToRecycle[i] = cn
i++
}
}
if i == 0 {
// nothing to actually recycle
return
}
connsToRecycle = connsToRecycle[:i]
}
newAddrs := make([]resolver.Address, len(connsToRecycle))
for i := range connsToRecycle {
newAddrs[i] = connsToRecycle[i].Address()
}

c.doUpdate(newAddrs, connsToRecycle)
}
Loading

0 comments on commit fcfa1d5

Please sign in to comment.