Skip to content

Commit

Permalink
SendBatch: Don't start goroutines
Browse files Browse the repository at this point in the history
Instead of starting goroutines we can queue each region server's
collection of RPCs and only after queueing them all wait for all
responses. We get a similar amount of parallelism, the only potential
slow down is that we can't queue one region server's RPCs until the
previous is able to be queued.

By not starting goroutines, the resource usage of SendBatch is more
predictable. And not starting goroutines is likely to be less
expensive overall.
  • Loading branch information
aaronbee committed Sep 14, 2023
1 parent 701886e commit 0480073
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"io"
"math"
"strconv"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -242,28 +241,29 @@ func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
}

// Send each group of RPCs to region client to be executed.
var (
wg sync.WaitGroup

mu sync.Mutex
fail bool
)
wg.Add(len(rpcByClient))
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
go func(client hrpc.RegionClient, rpcs []hrpc.Call) {
defer wg.Done()
client.QueueBatch(ctx, rpcs)
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
ok := c.waitForCompletion(ctx, client, rpcs, res, rpcToRes)
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}

var fail bool
func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
mu.Lock()
fail = true
mu.Unlock()
}
}(client, rpcs)
}
wg.Wait()
}
}()
allOK = !fail

return res, allOK
Expand Down

0 comments on commit 0480073

Please sign in to comment.