Skip to content

Commit

Permalink
add renewal for scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
tanay-vakharia committed Dec 4, 2024
1 parent 74ec557 commit 8a503e4
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 13 deletions.
58 changes: 49 additions & 9 deletions hrpc/hrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,38 +218,43 @@ func TestNewScan(t *testing.T) {
stopb := []byte("100")
scan, err := NewScan(ctx, tableb)
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil,
DefaultNumberOfRows) {
DefaultNumberOfRows, false) {
t.Errorf("Scan1 didn't set attributes correctly.")
}
scan, err = NewScanRange(ctx, tableb, startb, stopb)
if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, nil, nil,
DefaultNumberOfRows) {
DefaultNumberOfRows, false) {
t.Errorf("Scan2 didn't set attributes correctly.")
}
scan, err = NewScanStr(ctx, table)
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil,
DefaultNumberOfRows) {
DefaultNumberOfRows, false) {
t.Errorf("Scan3 didn't set attributes correctly.")
}
scan, err = NewScanRangeStr(ctx, table, start, stop)
if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, nil, nil,
DefaultNumberOfRows) {
DefaultNumberOfRows, false) {
t.Errorf("Scan4 didn't set attributes correctly.")
}
scan, err = NewScanRange(ctx, tableb, startb, stopb, Families(fam), Filters(filter1))
if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, fam, filter1,
DefaultNumberOfRows) {
DefaultNumberOfRows, false) {
t.Errorf("Scan5 didn't set attributes correctly.")
}
scan, err = NewScan(ctx, tableb, Filters(filter1), Families(fam))
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, fam, filter1,
DefaultNumberOfRows) {
DefaultNumberOfRows, false) {
t.Errorf("Scan6 didn't set attributes correctly.")
}
scan, err = NewScan(ctx, tableb, NumberOfRows(1))
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, 1) {
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, 1, false) {
t.Errorf("Scan7 didn't set number of versions correctly")
}
scan, err = NewScan(ctx, tableb, Renew())
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil,
DefaultNumberOfRows, true) {
t.Errorf("Scan8 didn't set renew correctly")
}
}

func TestScanToProto(t *testing.T) {
Expand Down Expand Up @@ -286,6 +291,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // explicitly set configurable attributes to default values
Expand Down Expand Up @@ -319,6 +325,7 @@ func TestScanToProto(t *testing.T) {
CacheBlocks: nil,
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // set configurable attributes to non-default values
Expand All @@ -331,6 +338,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions(89),
CacheBlocks(!DefaultCacheBlocks),
TimeRangeUint64(1024, 1738),
Renew(),
)
return s
}(),
Expand All @@ -353,6 +361,7 @@ func TestScanToProto(t *testing.T) {
CacheBlocks: proto.Bool(!DefaultCacheBlocks),
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(true),
},
},
{ // test that pb.ScanRequest.Scan is nil when scanner id is specificed
Expand All @@ -378,6 +387,7 @@ func TestScanToProto(t *testing.T) {
ClientHandlesHeartbeats: proto.Bool(true),
Scan: nil,
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // set reversed attribute
Expand All @@ -398,6 +408,7 @@ func TestScanToProto(t *testing.T) {
Reversed: proto.Bool(true),
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // set scan attribute
Expand All @@ -424,6 +435,7 @@ func TestScanToProto(t *testing.T) {
},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // scan key range
Expand All @@ -445,6 +457,7 @@ func TestScanToProto(t *testing.T) {
StopRow: stopRow,
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // set filters and families
Expand All @@ -467,6 +480,7 @@ func TestScanToProto(t *testing.T) {
Filter: pbFilter,
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
}
}(),
},
Expand All @@ -487,6 +501,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
// set TrackScanMetrics
Expand All @@ -507,6 +522,29 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(true),
Renew: proto.Bool(false),
}
}(),
},
// set Renew
{
s: func() *Scan {
s, _ := NewScanStr(ctx, "", Renew())
return s
}(),
expProto: func() *pb.ScanRequest {
return &pb.ScanRequest{
Region: rs,
NumberOfRows: proto.Uint32(DefaultNumberOfRows),
CloseScanner: proto.Bool(false),
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: &pb.Scan{
MaxResultSize: proto.Uint64(DefaultMaxResultSize),
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(true),
}
}(),
},
Expand Down Expand Up @@ -1573,7 +1611,8 @@ func TestDeserializeCellBlocksScan(t *testing.T) {
}

func confirmScanAttributes(ctx context.Context, s *Scan, table, start, stop []byte,
fam map[string][]string, fltr filter.Filter, numberOfRows uint32) bool {
fam map[string][]string, fltr filter.Filter, numberOfRows uint32,
renew bool) bool {
if fltr == nil && s.filter != nil {
return false
}
Expand All @@ -1582,7 +1621,8 @@ func confirmScanAttributes(ctx context.Context, s *Scan, table, start, stop []by
bytes.Equal(s.StartRow(), start) &&
bytes.Equal(s.StopRow(), stop) &&
reflect.DeepEqual(s.families, fam) &&
s.numberOfRows == numberOfRows
s.numberOfRows == numberOfRows &&
s.renew == renew
}

func BenchmarkMutateToProtoWithNestedMaps(b *testing.B) {
Expand Down
30 changes: 28 additions & 2 deletions hrpc/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
DefaultMaxResultsPerColumnFamily = math.MaxInt32
// DefaultCacheBlocks is the default setting to enable the block cache for get/scan queries
DefaultCacheBlocks = true
// DefaultRenew is the default setting to enable renewing for scanners
DefaultRenew = false
)

// Scanner is used to read data sequentially from HBase.
Expand Down Expand Up @@ -77,6 +79,8 @@ type Scan struct {

closeScanner bool
allowPartialResults bool

renew bool
}

// baseScan returns a Scan struct with default values set.
Expand All @@ -93,6 +97,7 @@ func baseScan(ctx context.Context, table []byte,
maxResultSize: DefaultMaxResultSize,
numberOfRows: DefaultNumberOfRows,
reversed: false,
renew: DefaultRenew,
}
err := applyOptions(s, options...)
if err != nil {
Expand All @@ -104,10 +109,10 @@ func baseScan(ctx context.Context, table []byte,
func (s *Scan) String() string {
return fmt.Sprintf("Scan{Table=%q StartRow=%q StopRow=%q TimeRange=(%d, %d) "+
"MaxVersions=%d NumberOfRows=%d MaxResultSize=%d Familes=%v Filter=%v "+
"StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v}",
"StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v Renew=%v}",
s.table, s.startRow, s.stopRow, s.fromTimestamp, s.toTimestamp,
s.maxVersions, s.numberOfRows, s.maxResultSize, s.families, s.filter,
s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner)
s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner, s.renew)
}

// NewScan creates a scanner for the given table.
Expand Down Expand Up @@ -189,6 +194,12 @@ func (s *Scan) TrackScanMetrics() bool {
return s.trackScanMetrics
}

// Renew return true if the scanner will be renewed at an interval to
// keep it alive
func (s *Scan) Renew() bool {
return s.renew
}

// ToProto converts this Scan into a protobuf message
func (s *Scan) ToProto() proto.Message {
scan := &pb.ScanRequest{
Expand All @@ -201,6 +212,7 @@ func (s *Scan) ToProto() proto.Message {
// since we don't really time out our scans (unless context was cancelled)
ClientHandlesHeartbeats: proto.Bool(true),
TrackScanMetrics: &s.trackScanMetrics,
Renew: &s.renew,
}
if s.scannerID != math.MaxUint64 {
scan.ScannerId = &s.scannerID
Expand Down Expand Up @@ -387,3 +399,17 @@ func Attribute(key string, val []byte) func(Call) error {
return nil
}
}

// Renew is a an option for scan requests.
// Enables renewal of scanners at an interval to prevent timeout of scanners due to
// waiting/starvation
func Renew() func(Call) error {
return func(g Call) error {
scan, ok := g.(*Scan)
if !ok {
return errors.New("'Renew' option can only be used with Scan queries")
}
scan.renew = true
return nil
}
}
115 changes: 115 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2634,3 +2634,118 @@ func TestNewTableFromSnapshot(t *testing.T) {
t.Fatalf("expected no cells after RestoreSnapshot in table %s key %s", table, key)
}
}

// TestScannerTimeout tests whether there is a lease timeout between Next calls
// if the wait between them is too long. This is fixed by the renewal process
func TestScannerTimeout(t *testing.T) {
c := gohbase.NewClient(*host)
defer c.Close()
// Insert test data
keyPrefix := "scanner_timeout_test_"
numRows := 2
for i := 0; i < numRows; i++ {
key := fmt.Sprintf("%s%d", keyPrefix, i)
value := []byte(strconv.Itoa(i))
err := insertKeyValue(c, key, "cf", value)
if err != nil {
t.Fatalf("Failed to insert test data: %v", err)
}
}

// Create a scan request
// renewal is default set to false
// We set result size to 1 to force a lease timeout between Next calls
scan, err := hrpc.NewScanStr(context.Background(), table,
hrpc.Families(map[string][]string{"cf": nil}),
hrpc.Filters(filter.NewPrefixFilter([]byte(keyPrefix))),
hrpc.NumberOfRows(1),
)
if err != nil {
t.Fatalf("Failed to create scan request: %v", err)
}

scanner := c.Scan(scan)
defer scanner.Close()
rsp, err := scanner.Next()
if err != nil {
t.Fatalf("Scanner.Next() returned error: %v", err)
}
if rsp == nil {
t.Fatalf("Unexpected end of scanner")
}
expectedValue := []byte(strconv.Itoa(0))
if !bytes.Equal(rsp.Cells[0].Value, expectedValue) {
t.Errorf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue)
}

// force lease timeout
time.Sleep(60 * time.Second)

_, err = scanner.Next()

// lease timeout should return an UnknownScannerException
if err != nil && strings.Contains(err.Error(), "org.apache.hadoop.hbase.UnknownScannerException") {
fmt.Println("Error matches: UnknownScannerException")
} else {
t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException")
}
}

// TestScannerRenewal tests for the renewal process of scanners
// if the renew flag is enabled for a scan requset. If there is a long
// period of waiting between Next calls, the latter Next call should
// still succeed because we are renewing every lease timeout / 2 seconds
func TestScannerRenewal(t *testing.T) {
c := gohbase.NewClient(*host)
defer c.Close()
// Insert test data
keyPrefix := "scanner_renewal_test_"
numRows := 8
for i := 0; i < numRows; i++ {
key := fmt.Sprintf("%s%d", keyPrefix, i)
value := []byte(strconv.Itoa(i))
err := insertKeyValue(c, key, "cf", value)
if err != nil {
t.Fatalf("Failed to insert test data: %v", err)
}
}

// Create a scan request
// Turn on renewal
// We set result size to 1 to force a lease timeout between Next calls
scan, err := hrpc.NewScanStr(context.Background(), table,
hrpc.Families(map[string][]string{"cf": nil}),
hrpc.Filters(filter.NewPrefixFilter([]byte(keyPrefix))),
hrpc.NumberOfRows(1),
hrpc.Renew(),
)
if err != nil {
t.Fatalf("Failed to create scan request: %v", err)
}

scanner := c.Scan(scan)
defer scanner.Close()
for i := 0; i < numRows; i++ {
rsp, err := scanner.Next()
// Sleep for 60 secs to trigger renewal
time.Sleep(60 * time.Second)
if err != nil {
t.Fatalf("Scanner.Next() returned error: %v", err)
}
if rsp == nil {
t.Fatalf("Unexpected end of scanner")
}
expectedValue := []byte(strconv.Itoa(i))
if !bytes.Equal(rsp.Cells[0].Value, expectedValue) {
t.Errorf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue)
}
}
// Ensure scanner is exhausted
rsp, err := scanner.Next()
if err != io.EOF {
t.Fatalf("Expected EOF error, got: %v", err)
}
if rsp != nil {
t.Fatalf("Expected nil response at end of scan, got: %v", rsp)
}
}
Loading

0 comments on commit 8a503e4

Please sign in to comment.