Skip to content

Commit

Permalink
hrpc: enable ScanMetrics (#254)
Browse files Browse the repository at this point in the history
Enable tracking scan metrics in the ScanResponse. Clients can access
the metrics via calls to scanner.GetScanMetrics().
  • Loading branch information
ciacono authored Jun 14, 2024
1 parent c100991 commit c8b69fd
Show file tree
Hide file tree
Showing 22 changed files with 703 additions and 221 deletions.
4 changes: 2 additions & 2 deletions hrpc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ type Result struct {
Cells []*Cell
Stale bool
Partial bool
// Exists is only set if existance_only was set in the request query.
// Exists is only set if existence_only was set in the request query.
Exists *bool
}

func (c *Result) String() string {
return fmt.Sprintf("cells:%v stale:%v partial:%v exists:%v ",
return fmt.Sprintf("cells:%v stale:%v partial:%v exists:%v",
c.Cells, c.Stale, c.Partial, c.Exists)
}

Expand Down
30 changes: 30 additions & 0 deletions hrpc/hrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestScanToProto(t *testing.T) {
Column: []*pb.Column{},
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // explicitly set configurable attributes to default values
Expand Down Expand Up @@ -317,6 +318,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions: nil,
CacheBlocks: nil,
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set configurable attributes to non-default values
Expand Down Expand Up @@ -350,6 +352,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions: proto.Uint32(89),
CacheBlocks: proto.Bool(!DefaultCacheBlocks),
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // test that pb.ScanRequest.Scan is nil when scanner id is specificed
Expand All @@ -374,6 +377,7 @@ func TestScanToProto(t *testing.T) {
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: nil,
TrackScanMetrics: proto.Bool(false),
},
},
{ // set reversed attribute
Expand All @@ -393,6 +397,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
Reversed: proto.Bool(true),
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set scan attribute
Expand All @@ -418,6 +423,7 @@ func TestScanToProto(t *testing.T) {
{Name: proto.String("key2"), Value: []byte("value2")},
},
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // scan key range
Expand All @@ -438,6 +444,7 @@ func TestScanToProto(t *testing.T) {
StartRow: startRow,
StopRow: stopRow,
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set filters and families
Expand All @@ -459,6 +466,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
Filter: pbFilter,
},
TrackScanMetrics: proto.Bool(false),
}
}(),
},
Expand All @@ -478,8 +486,30 @@ func TestScanToProto(t *testing.T) {
Column: []*pb.Column{},
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
},
},
// set TrackScanMetrics
{
s: func() *Scan {
s, _ := NewScanStr(ctx, "", TrackScanMetrics())
return s
}(),
expProto: func() *pb.ScanRequest {
return &pb.ScanRequest{
Region: rs,
NumberOfRows: proto.Uint32(DefaultNumberOfRows),
CloseScanner: proto.Bool(false),
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: &pb.Scan{
MaxResultSize: proto.Uint64(DefaultMaxResultSize),
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(true),
}
}(),
},
}

for i, tcase := range tests {
Expand Down
35 changes: 30 additions & 5 deletions hrpc/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ type Scanner interface {

// Close should be called if it is desired to stop scanning before getting all of results.
// If you call Next() after calling Close() you might still get buffered results.
// Othwerwise, in case all results have been delivered or in case of an error, the Scanner
// Otherwise, in case all results have been delivered or in case of an error, the Scanner
// will be closed automatically. It's okay to close an already closed scanner.
Close() error
// GetScanMetrics returns the scan metrics for the scanner.
// The scan metrics are non-nil only if the Scan has TrackScanMetrics() enabled.
// GetScanMetrics should only be called after the scanner has been closed with an io.EOF
// (ie there are no more rows left to be returned by calls to Next()).
GetScanMetrics() map[string]int64
}

// Scan represents a scanner on an HBase table.
Expand All @@ -64,10 +69,11 @@ type Scan struct {

scannerID uint64

maxResultSize uint64
numberOfRows uint32
reversed bool
attribute []*pb.NameBytesPair
maxResultSize uint64
numberOfRows uint32
reversed bool
attribute []*pb.NameBytesPair
trackScanMetrics bool

closeScanner bool
allowPartialResults bool
Expand Down Expand Up @@ -178,6 +184,11 @@ func (s *Scan) NumberOfRows() uint32 {
return s.numberOfRows
}

// TrackScanMetrics returns true if the client is requesting to track scan metrics.
func (s *Scan) TrackScanMetrics() bool {
return s.trackScanMetrics
}

// ToProto converts this Scan into a protobuf message
func (s *Scan) ToProto() proto.Message {
scan := &pb.ScanRequest{
Expand All @@ -189,6 +200,7 @@ func (s *Scan) ToProto() proto.Message {
// tell server that we "handle" heartbeats by ignoring them
// since we don't really time out our scans (unless context was cancelled)
ClientHandlesHeartbeats: proto.Bool(true),
TrackScanMetrics: &s.trackScanMetrics,
}
if s.scannerID != math.MaxUint64 {
scan.ScannerId = &s.scannerID
Expand Down Expand Up @@ -334,6 +346,19 @@ func AllowPartialResults() func(Call) error {
}
}

// TrackScanMetrics is an option for scan requests.
// Enables tracking scan metrics from HBase, which will be returned in the scan response.
func TrackScanMetrics() func(Call) error {
return func(g Call) error {
scan, ok := g.(*Scan)
if !ok {
return errors.New("'TrackScanMetrics' option can only be used with Scan queries")
}
scan.trackScanMetrics = true
return nil
}
}

// Reversed is a Scan-only option which allows you to scan in reverse key order
// To use it the startKey would be greater than the end key
func Reversed() func(Call) error {
Expand Down
111 changes: 107 additions & 4 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"flag"
"fmt"
"io"
"math"
"os"
"os/exec"
"reflect"
Expand All @@ -25,8 +26,6 @@ import (
"testing"
"time"

"math"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/tsuna/gohbase"
Expand Down Expand Up @@ -1022,6 +1021,110 @@ func TestScanTimeRangeVersions(t *testing.T) {
}
}

func TestScanWithScanMetrics(t *testing.T) {
var (
key = "TestScanWithScanMetrics"
now = time.Now()
r1 = fmt.Sprintf("%s_%d", key, 1)
r2 = fmt.Sprintf("%s_%d", key, 2)
r3 = fmt.Sprintf("%s_%d", key, 3)
val = []byte("1")
family = "cf"
ctx = context.Background()
rowsScanned = "ROWS_SCANNED"
rowsFiltered = "ROWS_FILTERED"
)

c := gohbase.NewClient(*host)
defer c.Close()

for _, r := range []string{r1, r2, r3} {
err := insertKeyValue(c, r, family, val, hrpc.Timestamp(now))
if err != nil {
t.Fatalf("Put failed: %s", err)
}
}

tcases := []struct {
description string
filters func(call hrpc.Call) error
expectedRowsScanned int64
expectedRowsFiltered int64
noScanMetrics bool
}{
{
description: "scan metrics not enabled",
expectedRowsScanned: 0,
expectedRowsFiltered: 0,
noScanMetrics: true,
},
{
description: "2 rows scanned",
expectedRowsScanned: 2,
expectedRowsFiltered: 0,
},
{
description: "1 row scanned 1 row filtered",
filters: hrpc.Filters(filter.NewPrefixFilter([]byte(r1))),
expectedRowsScanned: 1,
expectedRowsFiltered: 1,
},
}

for _, tc := range tcases {
t.Run(tc.description, func(t *testing.T) {
var (
scan *hrpc.Scan
err error
)
if tc.noScanMetrics {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3)
} else if tc.filters == nil {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3, hrpc.TrackScanMetrics())
} else {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3, hrpc.TrackScanMetrics(),
tc.filters)
}
if err != nil {
t.Fatalf("Scan req failed: %s", err)
}

var results []*hrpc.Result
scanner := c.Scan(scan)
for {
var r *hrpc.Result
r, err = scanner.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
results = append(results, r)
}

actualMetrics := scanner.GetScanMetrics()

if tc.noScanMetrics && actualMetrics != nil {
t.Fatalf("Expected nil scan metrics, got %v", actualMetrics)
}

scanned := actualMetrics[rowsScanned]
if tc.expectedRowsScanned != scanned {
t.Errorf("Did not get expected rows scanned - expected: %d, actual %d",
tc.expectedRowsScanned, scanned)
}

filtered := actualMetrics[rowsFiltered]
if tc.expectedRowsFiltered != filtered {
t.Errorf("Did not get expected rows filtered - expected: %d, actual %d",
tc.expectedRowsFiltered, filtered)
}
})
}

}

func TestPutTTL(t *testing.T) {
key := "TestPutTTL"
c := gohbase.NewClient(*host)
Expand All @@ -1034,13 +1137,13 @@ func TestPutTTL(t *testing.T) {
t.Fatalf("Put failed: %s", err)
}

//Wait ttl duration and try to get the value
// Wait ttl duration and try to get the value
time.Sleep(ttl)

get, err := hrpc.NewGetStr(context.Background(), table, key,
hrpc.Families(map[string][]string{"cf": nil}))

//Make sure we dont get a result back
// Make sure we don't get a result back
res, err := c.Get(get)
if err != nil {
t.Fatalf("Get failed: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions pb/Cell.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c8b69fd

Please sign in to comment.