Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coalesce timers to improve relay performance #545

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
sudo: false
sudo: true
language: go

cache:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ endif
PATH := $(GOPATH)/bin:$(PATH)
EXAMPLES=./examples/bench/server ./examples/bench/client ./examples/ping ./examples/thrift ./examples/hyperbahn/echo-server
ALL_PKGS := $(shell glide nv)
PROD_PKGS := . ./http ./hyperbahn ./json ./pprof ./raw ./relay ./stats ./thrift $(EXAMPLES)
PROD_PKGS := . ./http ./hyperbahn ./json ./pprof ./raw ./relay ./stats ./thrift ./timers $(EXAMPLES)
TEST_ARG ?= -race -v -timeout 5m
BUILD := ./build
THRIFT_GEN_RELEASE := ./thrift-gen-release
Expand Down
10 changes: 9 additions & 1 deletion all_channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tchannel
package tchannel_test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes only for the Sleep after closing the channels?

If we need to keep this, can we dot import "tchannel-go" like in other tests, so that the test code itself doesn't change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just for the sleep with testutils. Happy to dot-import.


import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

. "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/testutils"
)

func TestAllChannelsRegistered(t *testing.T) {
Expand All @@ -42,6 +46,8 @@ func TestAllChannelsRegistered(t *testing.T) {
assert.Equal(t, 1, len(state.OtherChannels["ch2"]))

ch1_2.Close()
// TODO: replace this sleep with a callback hook.
time.Sleep(testutils.Timeout(10 * time.Millisecond))

state = ch1_1.IntrospectState(introspectOpts)
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
Expand All @@ -57,6 +63,8 @@ func TestAllChannelsRegistered(t *testing.T) {
ch1_1.Close()
ch2_1.Close()
ch2_2.Close()
// TODO: replace this sleep with a callback hook.
time.Sleep(testutils.Timeout(10 * time.Millisecond))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the sleeps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we moved Channel.onClosed to run async, we can't guarantee that Close immediately removes the channel from the introspection results.

Since this is the only place we're relying upon the current behavior, I opted to make these tests uglier instead of making the production code more complex. Alternatively, I can keep the call to removeClosedChannel in the Close method instead of putting it in onClosed, but that seems weird - introspection doesn't match reality.


state = ch1_1.IntrospectState(introspectOpts)
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
Expand Down
23 changes: 20 additions & 3 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

"github.com/uber/tchannel-go/timers"
"github.com/uber/tchannel-go/tnet"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -70,6 +71,10 @@ type ChannelOptions struct {
// clamped to this value). Passing zero uses the default of 2m.
RelayMaxTimeout time.Duration

// RelayTimeoutTick is the granularity of the timer wheel used to manage
// timeouts.
RelayTimeoutTick time.Duration

// The reporter to use for reporting stats for this channel.
StatsReporter StatsReporter

Expand Down Expand Up @@ -121,6 +126,8 @@ type Channel struct {
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
relayTimeoutTick time.Duration
relayTimeoutWheel *timers.Wheel

// mutable contains all the members of Channel which are mutable.
mutable struct {
Expand Down Expand Up @@ -209,6 +216,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
connectionOptions: opts.DefaultConnectionOptions,
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger),
}
ch.peers = newRootPeerList(ch).newChild()

Expand All @@ -230,6 +238,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {

if opts.RelayHost != nil {
opts.RelayHost.SetChannel(ch)
ch.startWheel()
}
return ch, nil
}
Expand Down Expand Up @@ -665,13 +674,17 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) {
chState, minState)

if updatedToState == ChannelClosed {
ch.onClosed()
go ch.onClosed()
}
}

func (ch *Channel) onClosed() {
removeClosedChannel(ch)
ch.log.Infof("Channel closed.")
if ch.relayTimeoutWheel != nil {
ch.relayTimeoutWheel.Stop()
ch.log.Info("Timer wheel stopped.")
}
ch.log.Info("Channel closed.")
}

// Closed returns whether this channel has been closed with .Close()
Expand Down Expand Up @@ -717,7 +730,7 @@ func (ch *Channel) Close() {
}

if channelClosed {
ch.onClosed()
go ch.onClosed()
}
}

Expand All @@ -726,6 +739,10 @@ func (ch *Channel) RelayHost() RelayHost {
return ch.relayHost
}

func (ch *Channel) startWheel() {
ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout)
}

func toStringSet(ss []string) map[string]struct{} {
set := make(map[string]struct{}, len(ss))
for _, s := range ss {
Expand Down
34 changes: 19 additions & 15 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ import:
- package: github.com/opentracing/opentracing-go
- package: github.com/uber/jaeger-client-go
- package: github.com/crossdock/crossdock-go
- package: github.com/andres-erbsen/clock
2 changes: 2 additions & 0 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type RelayerRuntimeState struct {
InboundItems RelayItemSetState `json:"inboundItems"`
OutboundItems RelayItemSetState `json:"outboundItems"`
MaxTimeout time.Duration `json:"maxTimeout"`
TimeoutTick time.Duration `json:"timeoutTick"`
}

// ExchangeSetRuntimeState is the runtime state for a message exchange set.
Expand Down Expand Up @@ -360,6 +361,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat
InboundItems: r.inbound.IntrospectState(opts, "inbound"),
OutboundItems: r.outbound.IntrospectState(opts, "outbound"),
MaxTimeout: r.maxTimeout,
TimeoutTick: r.timeoutTick,
}
}

Expand Down
43 changes: 33 additions & 10 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/uber/tchannel-go/relay"
"github.com/uber/tchannel-go/timers"

"github.com/uber-go/atomic"
)
Expand All @@ -40,6 +41,9 @@ const (
_relayTombTTL = 3 * time.Second
// _defaultRelayMaxTimeout is the default max TTL for relayed calls.
_defaultRelayMaxTimeout = 2 * time.Minute
// _defaultRelayTimeoutTick is the default tick duration for processing
// relay timeouts.
_defaultRelayTimeoutTick = 5 * time.Millisecond
)

var (
Expand All @@ -53,7 +57,7 @@ var (
type relayConn Connection

type relayItem struct {
*time.Timer
*timers.Timer

remapID uint32
tomb bool
Expand All @@ -67,14 +71,16 @@ type relayItems struct {
sync.RWMutex

logger Logger
wheel *timers.Wheel
tombs uint64
items map[uint32]relayItem
}

func newRelayItems(logger Logger) *relayItems {
func newRelayItems(logger Logger, wheel *timers.Wheel) *relayItems {
return &relayItems{
items: make(map[uint32]relayItem),
logger: logger,
wheel: wheel,
}
}

Expand Down Expand Up @@ -150,9 +156,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo
r.items[id] = item
r.Unlock()

// TODO: We should be clearing these out in batches, rather than creating
// individual timers for each item.
time.AfterFunc(deleteAfter, func() { r.Delete(id) })
r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) })
return item, true
}

Expand All @@ -165,8 +169,10 @@ const (

// A Relayer forwards frames.
type Relayer struct {
relayHost RelayHost
maxTimeout time.Duration
relayHost RelayHost
maxTimeout time.Duration
timeoutTick time.Duration
wheel *timers.Wheel

// localHandlers is the set of service names that are handled by the local
// channel.
Expand All @@ -190,12 +196,15 @@ type Relayer struct {

// NewRelayer constructs a Relayer.
func NewRelayer(ch *Channel, conn *Connection) *Relayer {
wheel := ch.relayTimeoutWheel
return &Relayer{
relayHost: ch.RelayHost(),
maxTimeout: ch.relayMaxTimeout,
timeoutTick: ch.relayTimeoutTick,
wheel: wheel,
localHandler: ch.relayLocal,
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})),
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})),
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"}), wheel),
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"}), wheel),
peers: ch.rootPeers(),
conn: conn,
logger: conn.log,
Expand Down Expand Up @@ -455,7 +464,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio
if isOriginator {
items = r.outbound
}
item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
items.Add(id, item)
return item
}
Expand Down Expand Up @@ -596,3 +605,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration {
).Warn("Configured RelayMaxTimeout is invalid, using default instead.")
return _defaultRelayMaxTimeout
}

func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration {
if d > 0 {
return d
}
if d == 0 {
return _defaultRelayTimeoutTick
}
logger.WithFields(
LogField{"configuredTimeoutTick", d},
LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick},
).Warn("Configured RelayTimeoutTick is invalid, using default instead.")
return _defaultRelayTimeoutTick
}
6 changes: 6 additions & 0 deletions testutils/channel_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts {
return o
}

// SetRelayTimeoutTick sets the coarseness of relay timeouts.
func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts {
o.ChannelOptions.RelayTimeoutTick = d
return o
}

func defaultString(v string, defaultValue string) string {
if v == "" {
return defaultValue
Expand Down
14 changes: 14 additions & 0 deletions testutils/test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"runtime"
"runtime/debug"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -121,6 +122,7 @@ func WithTestServer(t testing.TB, chanOpts *ChannelOpts, f func(*TestServer)) {
withServer(t, chanOpts.Copy(), f)
}
}
forceReleaseUnusedMemory()
}

// SetVerifyOpts specifies the options we'll use during teardown to verify that
Expand Down Expand Up @@ -446,3 +448,15 @@ func withServer(t testing.TB, chanOpts *ChannelOpts, f func(*TestServer)) {
ts.Server().Logger().Debugf("TEST: Test function complete")
ts.CloseAndVerify()
}

// The timer wheels we allocate on relay channels are large enough that the Go
// runtime doesn't release their memory back to the OS immediately. Since we
// create and destroy them so quickly during tests, we quickly exceed the
// memory Travis allows each container to consume, which leads to mysterious
// test errors.
//
// Work around this issue by forcing a GC and free.
func forceReleaseUnusedMemory() {
runtime.GC()
debug.FreeOSMemory()
}
Loading