Skip to content

Commit

Permalink
add max lifetime for round-tripper, which allows for recycling leaf t…
Browse files Browse the repository at this point in the history
…ransports
  • Loading branch information
jhump committed Feb 28, 2024
1 parent ac9cc34 commit b15d69d
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 43 deletions.
151 changes: 122 additions & 29 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,23 @@ func newBalancer(
picker func(prev picker.Picker, allConns conn.Conns) picker.Picker,
checker health.Checker,
pool connPool,
roundTripperMaxLifetime time.Duration,
) *balancer {
ctx, cancel := context.WithCancel(ctx)
balancer := &balancer{
ctx: ctx,
cancel: cancel,
pool: pool,
newPicker: picker,
healthChecker: checker,
resolverUpdates: make(chan struct{}, 1),
closed: make(chan struct{}),
connInfo: map[conn.Conn]connInfo{},
clock: internal.NewRealClock(),
ctx: ctx,
cancel: cancel,
pool: pool,
newPicker: picker,
healthChecker: checker,
roundTripperMaxLifetime: roundTripperMaxLifetime,
resolverUpdates: make(chan struct{}, 1),
recycleConns: make(chan struct{}, 1),
closed: make(chan struct{}),
connInfo: map[conn.Conn]connInfo{},
clock: internal.NewRealClock(),
}
balancer.connManager.updateFunc = balancer.updateConns
return balancer
}

Expand All @@ -81,6 +85,8 @@ type balancer struct {
healthChecker health.Checker // +checklocksignore: mu is not required, but happens to always be held.
connManager connManager

roundTripperMaxLifetime time.Duration // +checklocksignore: mu is not required, but happens to always be held.

// NB: only set from tests
updateHook func([]resolver.Address, []conn.Conn)

Expand All @@ -93,6 +99,7 @@ type balancer struct {
latestAddrs atomic.Pointer[[]resolver.Address]
latestErr atomic.Pointer[error]
resolverUpdates chan struct{}
recycleConns chan struct{}
clock internal.Clock

mu sync.Mutex
Expand All @@ -106,6 +113,8 @@ type balancer struct {
connInfo map[conn.Conn]connInfo
// +checklocks:mu
reresolveLastCall time.Time
// +checklocks:mu
connsToRecycle []conn.Conn
}

func (b *balancer) UpdateHealthState(connection conn.Conn, state health.State) {
Expand Down Expand Up @@ -195,7 +204,7 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
for key, info := range b.connInfo {
delete(b.connInfo, key)
closer := info.closeChecker
info.cancelWarm()
info.cancel()
if closer != nil {
grp.Go(doClose(closer))
}
Expand All @@ -213,6 +222,15 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-b.recycleConns:
b.mu.Lock()
connsToRecycle := b.connsToRecycle
b.connsToRecycle = nil
b.mu.Unlock()
if len(connsToRecycle) > 0 {
b.connManager.recycleConns(connsToRecycle)
}

case <-b.resolverUpdates:
addrs := b.latestAddrs.Load()
if addrs == nil {
Expand Down Expand Up @@ -242,7 +260,7 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
if len(*addrs) > 0 {
addrsClone := make([]resolver.Address, len(*addrs))
copy(addrsClone, *addrs)
b.connManager.reconcileAddresses(addrsClone, b.updateConns)
b.connManager.reconcileAddresses(addrsClone)
}
}
}
Expand Down Expand Up @@ -282,7 +300,7 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
// and omit it from newConns
info := b.connInfo[existing]
delete(b.connInfo, existing)
info.cancelWarm()
info.cancel()
if info.closeChecker != nil {
_ = info.closeChecker.Close()
}
Expand All @@ -291,8 +309,16 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
newConns = append(newConns, existing)
}
newConns = append(newConns, addConns...)
for i := range addConns {
connection := addConns[i]
b.initConnInfoLocked(addConns)
b.conns = newConns
b.newPickerLocked()
return addConns
}

// +checklocks:b.mu
func (b *balancer) initConnInfoLocked(conns []conn.Conn) {
for i := range conns {
connection := conns[i]
connCtx, connCancel := context.WithCancel(b.ctx)
healthChecker := b.healthChecker.New(connCtx, connection, b)
go func() {
Expand All @@ -301,11 +327,18 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
b.warmedUp(connection)
}
}()
b.connInfo[connection] = connInfo{closeChecker: healthChecker, cancelWarm: connCancel}
cancel := connCancel
if b.roundTripperMaxLifetime != 0 {
timer := time.AfterFunc(b.roundTripperMaxLifetime, func() {
b.recycle(connection)
})
cancel = func() {
connCancel()
timer.Stop()
}
}
b.connInfo[connection] = connInfo{closeChecker: healthChecker, cancel: cancel}
}
b.conns = newConns
b.newPickerLocked()
return addConns
}

// +checklocks:b.mu
Expand Down Expand Up @@ -392,19 +425,36 @@ func (b *balancer) setErrorPickerLocked(err error) {
b.pool.UpdatePicker(picker.ErrorPicker(err), false)
}

func (b *balancer) recycle(c conn.Conn) {
b.mu.Lock()
defer b.mu.Unlock()
b.connsToRecycle = append(b.connsToRecycle, c)
// Notify goroutine that there is a connection to recycle.
select {
case b.recycleConns <- struct{}{}:
default:
}
}

type connInfo struct {
state health.State
warm bool
cancelWarm context.CancelFunc
state health.State
warm bool

// Cancels any in-progress warm-up and also cancels any timer
// for recycling the connection. Invoked when the connection
// is closed.
cancel context.CancelFunc
closeChecker io.Closer
}

type connManager struct {
// only modified by a single goroutine, so mu is not necessary
// only used by a single goroutine, so no mutex necessary
connsByAddr map[string][]conn.Conn

updateFunc func([]resolver.Address, []conn.Conn) []conn.Conn
}

func (c *connManager) reconcileAddresses(addrs []resolver.Address, updateFunc func([]resolver.Address, []conn.Conn) []conn.Conn) {
func (c *connManager) reconcileAddresses(addrs []resolver.Address) {
// TODO: future extension: make connection establishing strategy configurable
// (which would allow more sophisticated connection strategies in the face
// of, for example, layer-4 load balancers)
Expand Down Expand Up @@ -446,13 +496,56 @@ func (c *connManager) reconcileAddresses(addrs []resolver.Address, updateFunc fu
newAddrs = append(newAddrs, want...)
}

c.connsByAddr = remaining
c.doUpdate(newAddrs, toRemove)
}

func (c *connManager) doUpdate(newAddrs []resolver.Address, toRemove []conn.Conn) {
// we make a single call to update connections in batch to create a single
// new picker (avoids potential picker churn from making one change at a time)
newConns := updateFunc(newAddrs, toRemove)
// add newConns to remaining to compute new set of connections
for _, c := range newConns {
hostPort := c.Address().HostPort
remaining[hostPort] = append(remaining[hostPort], c)
newConns := c.updateFunc(newAddrs, toRemove)
// add newConns to set of connections
for _, cn := range newConns {
hostPort := cn.Address().HostPort
c.connsByAddr[hostPort] = append(c.connsByAddr[hostPort], cn)
}
c.connsByAddr = remaining
}

func (c *connManager) recycleConns(connsToRecycle []conn.Conn) {
var needToCompact bool
for i, cn := range connsToRecycle {
addr := cn.Address().HostPort
existing := c.connsByAddr[addr]
var found bool
for i, existingConn := range existing {
if existingConn == cn {
found = true
// remove cn from the slice
copy(existing[i:], existing[i+1:])
c.connsByAddr[addr] = existing[:len(existing)-1]
break
}
}
if !found {
// this connection has already been closed/removed
connsToRecycle[i] = nil
needToCompact = true
}
}
if needToCompact {
i := 0
for _, cn := range connsToRecycle {
if cn != nil {
connsToRecycle[i] = cn
i++
}
}
connsToRecycle = connsToRecycle[:i]
}
newAddrs := make([]resolver.Address, len(connsToRecycle))
for i := range connsToRecycle {
newAddrs[i] = connsToRecycle[i].Address()
}

c.doUpdate(newAddrs, connsToRecycle)
}
32 changes: 21 additions & 11 deletions balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestConnManager(t *testing.T) {
func TestConnManager_ReconcileAddresses(t *testing.T) {
t.Parallel()
type updateReq struct {
newAddrs []resolver.Address
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestConnManager(t *testing.T) {
return updateReq{}
}
}
var connMgr connManager
connMgr := connManager{updateFunc: testUpdate}
addrs := []resolver.Address{
{HostPort: "1.2.3.1"},
{HostPort: "1.2.3.2"},
Expand All @@ -81,7 +81,7 @@ func TestConnManager(t *testing.T) {
{HostPort: "1.2.3.5"},
{HostPort: "1.2.3.6"},
}
connMgr.reconcileAddresses(addrs, testUpdate)
connMgr.reconcileAddresses(addrs)
latestUpdate := getLatestUpdate()
require.Equal(t, addrs, latestUpdate.newAddrs)
require.Empty(t, latestUpdate.removeConns)
Expand All @@ -107,7 +107,7 @@ func TestConnManager(t *testing.T) {
{HostPort: "1.2.3.3"},
{HostPort: "1.2.3.3"},
}
connMgr.reconcileAddresses(addrs, testUpdate)
connMgr.reconcileAddresses(addrs)
latestUpdate = getLatestUpdate()
// 10 entries needed, and we start with 3. So we need
// 2x more of each, but 3x of the first
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestConnManager(t *testing.T) {
{HostPort: "1.2.3.3", Attributes: attrs3a},
{HostPort: "1.2.3.3", Attributes: attrs3b},
}
connMgr.reconcileAddresses(addrs, testUpdate)
connMgr.reconcileAddresses(addrs)
latestUpdate = getLatestUpdate()
require.Empty(t, latestUpdate.newAddrs)
require.Equal(t, []conn.Conn{conn1i8, conn1i9, conn2i11, conn3i13}, latestUpdate.removeConns)
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestConnManager(t *testing.T) {
{HostPort: "1.2.3.6"},
{HostPort: "1.2.3.8"},
}
connMgr.reconcileAddresses(addrs, testUpdate)
connMgr.reconcileAddresses(addrs)
// Wanted to create 1.2.3.4, 1.2.3.6, and 1.2.3.8, but only first two created.
latestUpdate = getLatestUpdate()
require.Equal(t, addrs[1:], latestUpdate.newAddrs)
Expand All @@ -197,16 +197,21 @@ func TestConnManager(t *testing.T) {
{HostPort: "1.2.3.6"},
{HostPort: "1.2.3.8"},
}
connMgr.reconcileAddresses(addrs, testUpdate)
connMgr.reconcileAddresses(addrs)
latestUpdate = getLatestUpdate()
require.Equal(t, addrs[3:], latestUpdate.newAddrs)
require.Empty(t, latestUpdate.removeConns)
}

func TestConnManager_RecycleConns(t *testing.T) {
t.Parallel()
// TODO
}

func TestBalancer_BasicConnManagement(t *testing.T) {
t.Parallel()
pool := balancertesting.NewFakeConnPool()
balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, health.NopChecker, pool)
balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, health.NopChecker, pool, 0)
balancer.updateHook = balancertesting.DeterministicReconciler
balancer.start()
// Initial resolve
Expand Down Expand Up @@ -285,7 +290,7 @@ func TestBalancer_HealthChecking(t *testing.T) {
return ctx.Err()
}
}
balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, checker, pool)
balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, checker, pool, 0)
balancer.updateHook = balancertesting.DeterministicReconciler
balancer.start()

Expand Down Expand Up @@ -384,13 +389,13 @@ func TestBalancer_HealthChecking(t *testing.T) {
require.Empty(t, checkers)
}

func TestDefaultBalancer_Reresolve(t *testing.T) {
func TestBalancer_Reresolve(t *testing.T) {
t.Parallel()
checker := balancertesting.NewFakeHealthChecker()
clock := clocktest.NewFakeClock()
pool := balancertesting.NewFakeConnPool()

balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, checker, pool)
balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, checker, pool, 0)
balancer.updateHook = balancertesting.DeterministicReconciler
balancer.clock = clock
balancer.start()
Expand Down Expand Up @@ -425,6 +430,11 @@ func TestDefaultBalancer_Reresolve(t *testing.T) {
require.Empty(t, checkers)
}

func TestBalancer_RoundTripperMaxLifetime(t *testing.T) {
t.Parallel()
// TODO
}

func awaitPickerUpdate(t *testing.T, pool *balancertesting.FakeConnPool, warm bool, addrs []resolver.Address, indexes []int) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down
Loading

0 comments on commit b15d69d

Please sign in to comment.