Skip to content

Commit

Permalink
Integrate timer wheel with TChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay Shah committed Dec 8, 2016
1 parent 66434ee commit de7a742
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ endif
PATH := $(GOPATH)/bin:$(PATH)
EXAMPLES=./examples/bench/server ./examples/bench/client ./examples/ping ./examples/thrift ./examples/hyperbahn/echo-server
ALL_PKGS := $(shell glide nv)
PROD_PKGS := . ./http ./hyperbahn ./json ./pprof ./raw ./relay ./stats ./thrift $(EXAMPLES)
PROD_PKGS := . ./http ./hyperbahn ./json ./pprof ./raw ./relay ./stats ./thrift ./timers $(EXAMPLES)
TEST_ARG ?= -race -v -timeout 5m
BUILD := ./build
THRIFT_GEN_RELEASE := ./thrift-gen-release
Expand Down
18 changes: 18 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

"github.com/uber/tchannel-go/timers"
"github.com/uber/tchannel-go/tnet"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -69,6 +70,10 @@ type ChannelOptions struct {
// clamped to this value). Passing zero uses the default of 2m.
RelayMaxTimeout time.Duration

// RelayTimeoutTick is the granularity of the timer wheel used to manage
// timeouts.
RelayTimeoutTick time.Duration

// The reporter to use for reporting stats for this channel.
StatsReporter StatsReporter

Expand Down Expand Up @@ -119,6 +124,8 @@ type Channel struct {
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
relayTimeoutTick time.Duration
relayTimeoutWheel *timers.Wheel

// mutable contains all the members of Channel which are mutable.
mutable struct {
Expand Down Expand Up @@ -200,6 +207,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
connectionOptions: opts.DefaultConnectionOptions,
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger),
}
ch.peers = newRootPeerList(ch).newChild()

Expand All @@ -221,6 +229,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {

if opts.RelayHost != nil {
opts.RelayHost.SetChannel(ch)
ch.startWheel()
}
return ch, nil
}
Expand Down Expand Up @@ -690,9 +699,14 @@ func (ch *Channel) Close() {
}
ch.mutable.Unlock()

if ch.relayTimeoutWheel != nil {
ch.relayTimeoutWheel.Stop()
}

for _, c := range connections {
c.Close()
}

removeClosedChannel(ch)
}

Expand All @@ -701,6 +715,10 @@ func (ch *Channel) RelayHost() RelayHost {
return ch.relayHost
}

func (ch *Channel) startWheel() {
ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout)
}

func toStringSet(ss []string) map[string]struct{} {
set := make(map[string]struct{}, len(ss))
for _, s := range ss {
Expand Down
2 changes: 2 additions & 0 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type RelayerRuntimeState struct {
InboundItems RelayItemSetState `json:"inboundItems"`
OutboundItems RelayItemSetState `json:"outboundItems"`
MaxTimeout time.Duration `json:"maxTimeout"`
TimeoutTick time.Duration `json:"timeoutTick"`
}

// ExchangeSetRuntimeState is the runtime state for a message exchange set.
Expand Down Expand Up @@ -355,6 +356,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat
InboundItems: r.inbound.IntrospectState(opts, "inbound"),
OutboundItems: r.outbound.IntrospectState(opts, "outbound"),
MaxTimeout: r.maxTimeout,
TimeoutTick: r.timeoutTick,
}
}

Expand Down
43 changes: 33 additions & 10 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/uber/tchannel-go/relay"
"github.com/uber/tchannel-go/timers"

"github.com/uber-go/atomic"
)
Expand All @@ -40,6 +41,9 @@ const (
_relayTombTTL = 3 * time.Second
// _defaultRelayMaxTimeout is the default max TTL for relayed calls.
_defaultRelayMaxTimeout = 2 * time.Minute
// _defaultRelayTimeoutTick is the default tick duration for processing
// relay timeouts.
_defaultRelayTimeoutTick = 5 * time.Millisecond
)

var (
Expand All @@ -53,7 +57,7 @@ var (
type relayConn Connection

type relayItem struct {
*time.Timer
*timers.Timer

remapID uint32
tomb bool
Expand All @@ -67,14 +71,16 @@ type relayItems struct {
sync.RWMutex

logger Logger
wheel *timers.Wheel
tombs uint64
items map[uint32]relayItem
}

func newRelayItems(logger Logger) *relayItems {
func newRelayItems(wheel *timers.Wheel, logger Logger) *relayItems {
return &relayItems{
items: make(map[uint32]relayItem),
logger: logger,
wheel: wheel,
}
}

Expand Down Expand Up @@ -150,9 +156,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo
r.items[id] = item
r.Unlock()

// TODO: We should be clearing these out in batches, rather than creating
// individual timers for each item.
time.AfterFunc(deleteAfter, func() { r.Delete(id) })
r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) })
return item, true
}

Expand All @@ -165,8 +169,10 @@ const (

// A Relayer forwards frames.
type Relayer struct {
relayHost RelayHost
maxTimeout time.Duration
relayHost RelayHost
maxTimeout time.Duration
timeoutTick time.Duration
wheel *timers.Wheel

// localHandlers is the set of service names that are handled by the local
// channel.
Expand All @@ -190,12 +196,15 @@ type Relayer struct {

// NewRelayer constructs a Relayer.
func NewRelayer(ch *Channel, conn *Connection) *Relayer {
wheel := ch.relayTimeoutWheel
return &Relayer{
relayHost: ch.RelayHost(),
maxTimeout: ch.relayMaxTimeout,
timeoutTick: ch.relayTimeoutTick,
wheel: wheel,
localHandler: ch.relayLocal,
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})),
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})),
outbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "outbound"})),
inbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "inbound"})),
peers: ch.rootPeers(),
conn: conn,
logger: conn.log,
Expand Down Expand Up @@ -455,7 +464,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio
if isOriginator {
items = r.outbound
}
item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
items.Add(id, item)
return item
}
Expand Down Expand Up @@ -600,3 +609,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration {
).Warn("Configured RelayMaxTimeout is invalid, using default instead.")
return _defaultRelayMaxTimeout
}

func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration {
if d > 0 {
return d
}
if d == 0 {
return _defaultRelayTimeoutTick
}
logger.WithFields(
LogField{"configuredTimeoutTick", d},
LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick},
).Warn("Configured RelayTimeoutTick is invalid, using default instead.")
return _defaultRelayTimeoutTick
}
6 changes: 6 additions & 0 deletions testutils/channel_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts {
return o
}

// SetRelayTimeoutTick sets the coarseness of relay timeouts.
func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts {
o.ChannelOptions.RelayTimeoutTick = d
return o
}

func defaultString(v string, defaultValue string) string {
if v == "" {
return defaultValue
Expand Down

0 comments on commit de7a742

Please sign in to comment.