diff --git a/all_channels_test.go b/all_channels_test.go index 5f506b58..29f13481 100644 --- a/all_channels_test.go +++ b/all_channels_test.go @@ -18,13 +18,17 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package tchannel +package tchannel_test import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + . "github.com/uber/tchannel-go" + "github.com/uber/tchannel-go/testutils" ) func TestAllChannelsRegistered(t *testing.T) { @@ -42,6 +46,8 @@ func TestAllChannelsRegistered(t *testing.T) { assert.Equal(t, 1, len(state.OtherChannels["ch2"])) ch1_2.Close() + // TODO: replace this sleep with a callback hook. + time.Sleep(testutils.Timeout(10 * time.Millisecond)) state = ch1_1.IntrospectState(introspectOpts) assert.Equal(t, 0, len(state.OtherChannels["ch1"])) @@ -57,6 +63,8 @@ func TestAllChannelsRegistered(t *testing.T) { ch1_1.Close() ch2_1.Close() ch2_2.Close() + // TODO: replace this sleep with a callback hook. + time.Sleep(testutils.Timeout(10 * time.Millisecond)) state = ch1_1.IntrospectState(introspectOpts) assert.Equal(t, 0, len(state.OtherChannels["ch1"])) diff --git a/channel.go b/channel.go index e62e3dfa..de7d12e8 100644 --- a/channel.go +++ b/channel.go @@ -29,6 +29,7 @@ import ( "sync" "time" + "github.com/uber/tchannel-go/timers" "github.com/uber/tchannel-go/tnet" "github.com/opentracing/opentracing-go" @@ -70,6 +71,10 @@ type ChannelOptions struct { // clamped to this value). Passing zero uses the default of 2m. RelayMaxTimeout time.Duration + // RelayTimeoutTick is the granularity of the timer wheel used to manage + // timeouts. + RelayTimeoutTick time.Duration + // The reporter to use for reporting stats for this channel. StatsReporter StatsReporter @@ -121,6 +126,8 @@ type Channel struct { peers *PeerList relayHost RelayHost relayMaxTimeout time.Duration + relayTimeoutTick time.Duration + relayTimeoutWheel *timers.Wheel // mutable contains all the members of Channel which are mutable. mutable struct { @@ -209,6 +216,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { connectionOptions: opts.DefaultConnectionOptions, relayHost: opts.RelayHost, relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger), + relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger), } ch.peers = newRootPeerList(ch).newChild() @@ -230,6 +238,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { if opts.RelayHost != nil { opts.RelayHost.SetChannel(ch) + ch.startWheel() } return ch, nil } @@ -665,13 +674,17 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) { chState, minState) if updatedToState == ChannelClosed { - ch.onClosed() + go ch.onClosed() } } func (ch *Channel) onClosed() { removeClosedChannel(ch) - ch.log.Infof("Channel closed.") + if ch.relayTimeoutWheel != nil { + ch.relayTimeoutWheel.Stop() + ch.log.Info("Timer wheel stopped.") + } + ch.log.Info("Channel closed.") } // Closed returns whether this channel has been closed with .Close() @@ -717,7 +730,7 @@ func (ch *Channel) Close() { } if channelClosed { - ch.onClosed() + go ch.onClosed() } } @@ -726,6 +739,10 @@ func (ch *Channel) RelayHost() RelayHost { return ch.relayHost } +func (ch *Channel) startWheel() { + ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout) +} + func toStringSet(ss []string) map[string]struct{} { set := make(map[string]struct{}, len(ss)) for _, s := range ss { diff --git a/introspection.go b/introspection.go index 8539d224..362de6f2 100644 --- a/introspection.go +++ b/introspection.go @@ -156,6 +156,7 @@ type RelayerRuntimeState struct { InboundItems RelayItemSetState `json:"inboundItems"` OutboundItems RelayItemSetState `json:"outboundItems"` MaxTimeout time.Duration `json:"maxTimeout"` + TimeoutTick time.Duration `json:"timeoutTick"` } // ExchangeSetRuntimeState is the runtime state for a message exchange set. @@ -360,6 +361,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat InboundItems: r.inbound.IntrospectState(opts, "inbound"), OutboundItems: r.outbound.IntrospectState(opts, "outbound"), MaxTimeout: r.maxTimeout, + TimeoutTick: r.timeoutTick, } } diff --git a/relay.go b/relay.go index c366c2ea..15635f04 100644 --- a/relay.go +++ b/relay.go @@ -28,6 +28,7 @@ import ( "time" "github.com/uber/tchannel-go/relay" + "github.com/uber/tchannel-go/timers" "github.com/uber-go/atomic" ) @@ -40,6 +41,9 @@ const ( _relayTombTTL = 3 * time.Second // _defaultRelayMaxTimeout is the default max TTL for relayed calls. _defaultRelayMaxTimeout = 2 * time.Minute + // _defaultRelayTimeoutTick is the default tick duration for processing + // relay timeouts. + _defaultRelayTimeoutTick = 5 * time.Millisecond ) var ( @@ -53,7 +57,7 @@ var ( type relayConn Connection type relayItem struct { - *time.Timer + *timers.Timer remapID uint32 tomb bool @@ -67,14 +71,16 @@ type relayItems struct { sync.RWMutex logger Logger + wheel *timers.Wheel tombs uint64 items map[uint32]relayItem } -func newRelayItems(logger Logger) *relayItems { +func newRelayItems(logger Logger, wheel *timers.Wheel) *relayItems { return &relayItems{ items: make(map[uint32]relayItem), logger: logger, + wheel: wheel, } } @@ -150,9 +156,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo r.items[id] = item r.Unlock() - // TODO: We should be clearing these out in batches, rather than creating - // individual timers for each item. - time.AfterFunc(deleteAfter, func() { r.Delete(id) }) + r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) }) return item, true } @@ -165,8 +169,10 @@ const ( // A Relayer forwards frames. type Relayer struct { - relayHost RelayHost - maxTimeout time.Duration + relayHost RelayHost + maxTimeout time.Duration + timeoutTick time.Duration + wheel *timers.Wheel // localHandlers is the set of service names that are handled by the local // channel. @@ -190,12 +196,15 @@ type Relayer struct { // NewRelayer constructs a Relayer. func NewRelayer(ch *Channel, conn *Connection) *Relayer { + wheel := ch.relayTimeoutWheel return &Relayer{ relayHost: ch.RelayHost(), maxTimeout: ch.relayMaxTimeout, + timeoutTick: ch.relayTimeoutTick, + wheel: wheel, localHandler: ch.relayLocal, - outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})), - inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})), + outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"}), wheel), + inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"}), wheel), peers: ch.rootPeers(), conn: conn, logger: conn.log, @@ -455,7 +464,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio if isOriginator { items = r.outbound } - item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) }) + item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) }) items.Add(id, item) return item } @@ -600,3 +609,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration { ).Warn("Configured RelayMaxTimeout is invalid, using default instead.") return _defaultRelayMaxTimeout } + +func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration { + if d > 0 { + return d + } + if d == 0 { + return _defaultRelayTimeoutTick + } + logger.WithFields( + LogField{"configuredTimeoutTick", d}, + LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick}, + ).Warn("Configured RelayTimeoutTick is invalid, using default instead.") + return _defaultRelayTimeoutTick +} diff --git a/testutils/channel_opts.go b/testutils/channel_opts.go index 9a2ab3cd..d5a2b356 100644 --- a/testutils/channel_opts.go +++ b/testutils/channel_opts.go @@ -199,6 +199,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts { return o } +// SetRelayTimeoutTick sets the coarseness of relay timeouts. +func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts { + o.ChannelOptions.RelayTimeoutTick = d + return o +} + func defaultString(v string, defaultValue string) string { if v == "" { return defaultValue diff --git a/testutils/test_server.go b/testutils/test_server.go index e67fe72f..948b2fc2 100644 --- a/testutils/test_server.go +++ b/testutils/test_server.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "runtime" + "runtime/debug" "strings" "testing" "time" @@ -121,6 +122,7 @@ func WithTestServer(t testing.TB, chanOpts *ChannelOpts, f func(*TestServer)) { withServer(t, chanOpts.Copy(), f) } } + forceReleaseUnusedMemory() } // SetVerifyOpts specifies the options we'll use during teardown to verify that @@ -446,3 +448,15 @@ func withServer(t testing.TB, chanOpts *ChannelOpts, f func(*TestServer)) { ts.Server().Logger().Debugf("TEST: Test function complete") ts.CloseAndVerify() } + +// The timer wheels we allocate on relay channels are large enough that the Go +// runtime doesn't release their memory back to the OS immediately. Since we +// create and destroy them so quickly during tests, we quickly exceed the +// memory Travis allows each container to consume, which leads to mysterious +// test errors. +// +// Work around this issue by forcing a GC and free. +func forceReleaseUnusedMemory() { + runtime.GC() + debug.FreeOSMemory() +}