Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SendBatch: Don't start goroutines #232

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,28 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

var operationDurationSeconds = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "gohbase",
Name: "operation_duration_seconds",
Help: "Time in seconds for operation to complete",
// >>> [0.04*(2**i) for i in range(11)]
// [0.04, 0.08, 0.16, 0.32, 0.64, 1.28, 2.56, 5.12, 10.24, 20.48, 40.96]
// Meaning 40ms, 80ms, 160ms, 320ms, 640ms, 1.28s, ... max 40.96s
// (most requests have a 30s timeout by default at the Envoy level)
Buckets: prometheus.ExponentialBuckets(0.04, 2, 11),
},
[]string{"operation", "result"},
var (
operationDurationSeconds = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "gohbase",
Name: "operation_duration_seconds",
Help: "Time in seconds for operation to complete",
// >>> [0.04*(2**i) for i in range(11)]
// [0.04, 0.08, 0.16, 0.32, 0.64, 1.28, 2.56, 5.12, 10.24, 20.48, 40.96]
// Meaning 40ms, 80ms, 160ms, 320ms, 640ms, 1.28s, ... max 40.96s
// (most requests have a 30s timeout by default at the Envoy level)
Buckets: prometheus.ExponentialBuckets(0.04, 2, 11),
},
[]string{"operation", "result"},
)

sendBatchSplitCount = promauto.NewHistogram(
prometheus.HistogramOpts{
Namespace: "gohbase",
Name: "sendbatch_split_count",
Help: "Count of Region Servers hit per SendBatch",
// 1, 2, 4, 8, 16, 32, 64, 128, 256, 512
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
},
)
)
39 changes: 20 additions & 19 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"io"
"math"
"strconv"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -240,30 +239,32 @@ func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
if !ok {
return res, false
}
sendBatchSplitCount.Observe(float64(len(rpcByClient)))

// Send each group of RPCs to region client to be executed.
var (
wg sync.WaitGroup

mu sync.Mutex
fail bool
)
wg.Add(len(rpcByClient))
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
go func(client hrpc.RegionClient, rpcs []hrpc.Call) {
defer wg.Done()
client.QueueBatch(ctx, rpcs)
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
ok := c.waitForCompletion(ctx, client, rpcs, res, rpcToRes)
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}

var fail bool
func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
mu.Lock()
fail = true
mu.Unlock()
}
}(client, rpcs)
}
wg.Wait()
}
}()
allOK = !fail

return res, allOK
Expand Down
Loading