diff --git a/region/client.go b/region/client.go index 064fa808..4753b4af 100644 --- a/region/client.go +++ b/region/client.go @@ -338,6 +338,7 @@ func (c *client) unregisterRPC(id uint32) hrpc.Call { } func (c *client) processRPCs() { + // TODO: flush when the size is too large // TODO: if multi has only one call, send that call instead m := newMulti(c.rpcQueueSize) defer func() { @@ -365,62 +366,67 @@ func (c *client) processRPCs() { m = newMulti(c.rpcQueueSize) } - var t *time.Timer - if c.flushInterval > 0 { - // Initialize timer and then stop it. It will be Reset later - t = time.NewTimer(0) - if !t.Stop() { - <-t.C + for { + // first loop is to accomodate request heavy workload + // it will batch as long as conccurent writers are sending + // new rpcs or until multi is filled up + for { + select { + case <-c.done: + return + case rpcs := <-c.rpcs: + // have things queued up, batch them + if !m.add(rpcs) { + // can still put more rpcs into batch + continue + } + default: + // no more rpcs queued up + } + break } - } -batchloop: - for { - // wait for first rpc - select { - case <-c.done: - return - case rpcs := <-c.rpcs: - if m.add(rpcs) { - flush("queue full") - continue + if l := m.len(); l == 0 { + // wait for the next batch + select { + case <-c.done: + return + case rpcs := <-c.rpcs: + m.add(rpcs) } + continue + } else if l >= c.rpcQueueSize || c.flushInterval == 0 { + // batch is full, flush + flush("queue full") + continue } - if c.flushInterval > 0 { - // Add rpcs to batch until full or timer runs out - t.Reset(c.flushInterval) - for { - select { - case <-c.done: - return - case rpcs := <-c.rpcs: - if m.add(rpcs) { - if !t.Stop() { - <-t.C - } - flush("queue full") - continue batchloop - } - case <-t.C: - flush("timeout") - continue batchloop + + // second loop is to accomodate less frequent callers + // that would like to maximize their batches at the expense + // of waiting for flushInteval + timer := time.NewTimer(c.flushInterval) + reason := "" + for { + select { + case <-c.done: + return + case <-timer.C: + reason = "timeout" + // time to flush + case rpcs := <-c.rpcs: + if !m.add(rpcs) { + // can still put more rpcs into batch + continue } - } - } else { - // Add rpcs to batch until chan is no longer readable - for { - select { - case rpcs := <-c.rpcs: - if m.add(rpcs) { - flush("queue full") - continue batchloop - } - default: - flush("timeout") - continue batchloop + reason = "queue full" + // batch is full + if !timer.Stop() { + <-timer.C } } + break } + flush(reason) } }