From 8a503e4fe8937084880fa15b1d815856b926729f Mon Sep 17 00:00:00 2001 From: Tanay Vakharia Date: Tue, 3 Dec 2024 15:50:05 -0800 Subject: [PATCH] add renewal for scanner --- hrpc/hrpc_test.go | 58 ++++++++++++++++++---- hrpc/scan.go | 30 +++++++++++- integration_test.go | 115 ++++++++++++++++++++++++++++++++++++++++++++ scanner.go | 87 ++++++++++++++++++++++++++++++++- 4 files changed, 277 insertions(+), 13 deletions(-) diff --git a/hrpc/hrpc_test.go b/hrpc/hrpc_test.go index c8934abf..c76796ed 100644 --- a/hrpc/hrpc_test.go +++ b/hrpc/hrpc_test.go @@ -218,38 +218,43 @@ func TestNewScan(t *testing.T) { stopb := []byte("100") scan, err := NewScan(ctx, tableb) if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, - DefaultNumberOfRows) { + DefaultNumberOfRows, false) { t.Errorf("Scan1 didn't set attributes correctly.") } scan, err = NewScanRange(ctx, tableb, startb, stopb) if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, nil, nil, - DefaultNumberOfRows) { + DefaultNumberOfRows, false) { t.Errorf("Scan2 didn't set attributes correctly.") } scan, err = NewScanStr(ctx, table) if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, - DefaultNumberOfRows) { + DefaultNumberOfRows, false) { t.Errorf("Scan3 didn't set attributes correctly.") } scan, err = NewScanRangeStr(ctx, table, start, stop) if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, nil, nil, - DefaultNumberOfRows) { + DefaultNumberOfRows, false) { t.Errorf("Scan4 didn't set attributes correctly.") } scan, err = NewScanRange(ctx, tableb, startb, stopb, Families(fam), Filters(filter1)) if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, fam, filter1, - DefaultNumberOfRows) { + DefaultNumberOfRows, false) { t.Errorf("Scan5 didn't set attributes correctly.") } scan, err = NewScan(ctx, tableb, Filters(filter1), Families(fam)) if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, fam, filter1, - DefaultNumberOfRows) { + DefaultNumberOfRows, false) { t.Errorf("Scan6 didn't set attributes correctly.") } scan, err = NewScan(ctx, tableb, NumberOfRows(1)) - if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, 1) { + if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, 1, false) { t.Errorf("Scan7 didn't set number of versions correctly") } + scan, err = NewScan(ctx, tableb, Renew()) + if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, + DefaultNumberOfRows, true) { + t.Errorf("Scan8 didn't set renew correctly") + } } func TestScanToProto(t *testing.T) { @@ -286,6 +291,7 @@ func TestScanToProto(t *testing.T) { TimeRange: &pb.TimeRange{}, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // explicitly set configurable attributes to default values @@ -319,6 +325,7 @@ func TestScanToProto(t *testing.T) { CacheBlocks: nil, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // set configurable attributes to non-default values @@ -331,6 +338,7 @@ func TestScanToProto(t *testing.T) { MaxVersions(89), CacheBlocks(!DefaultCacheBlocks), TimeRangeUint64(1024, 1738), + Renew(), ) return s }(), @@ -353,6 +361,7 @@ func TestScanToProto(t *testing.T) { CacheBlocks: proto.Bool(!DefaultCacheBlocks), }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(true), }, }, { // test that pb.ScanRequest.Scan is nil when scanner id is specificed @@ -378,6 +387,7 @@ func TestScanToProto(t *testing.T) { ClientHandlesHeartbeats: proto.Bool(true), Scan: nil, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // set reversed attribute @@ -398,6 +408,7 @@ func TestScanToProto(t *testing.T) { Reversed: proto.Bool(true), }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // set scan attribute @@ -424,6 +435,7 @@ func TestScanToProto(t *testing.T) { }, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // scan key range @@ -445,6 +457,7 @@ func TestScanToProto(t *testing.T) { StopRow: stopRow, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // set filters and families @@ -467,6 +480,7 @@ func TestScanToProto(t *testing.T) { Filter: pbFilter, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), } }(), }, @@ -487,6 +501,7 @@ func TestScanToProto(t *testing.T) { TimeRange: &pb.TimeRange{}, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, // set TrackScanMetrics @@ -507,6 +522,29 @@ func TestScanToProto(t *testing.T) { TimeRange: &pb.TimeRange{}, }, TrackScanMetrics: proto.Bool(true), + Renew: proto.Bool(false), + } + }(), + }, + // set Renew + { + s: func() *Scan { + s, _ := NewScanStr(ctx, "", Renew()) + return s + }(), + expProto: func() *pb.ScanRequest { + return &pb.ScanRequest{ + Region: rs, + NumberOfRows: proto.Uint32(DefaultNumberOfRows), + CloseScanner: proto.Bool(false), + ClientHandlesPartials: proto.Bool(true), + ClientHandlesHeartbeats: proto.Bool(true), + Scan: &pb.Scan{ + MaxResultSize: proto.Uint64(DefaultMaxResultSize), + TimeRange: &pb.TimeRange{}, + }, + TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(true), } }(), }, @@ -1573,7 +1611,8 @@ func TestDeserializeCellBlocksScan(t *testing.T) { } func confirmScanAttributes(ctx context.Context, s *Scan, table, start, stop []byte, - fam map[string][]string, fltr filter.Filter, numberOfRows uint32) bool { + fam map[string][]string, fltr filter.Filter, numberOfRows uint32, + renew bool) bool { if fltr == nil && s.filter != nil { return false } @@ -1582,7 +1621,8 @@ func confirmScanAttributes(ctx context.Context, s *Scan, table, start, stop []by bytes.Equal(s.StartRow(), start) && bytes.Equal(s.StopRow(), stop) && reflect.DeepEqual(s.families, fam) && - s.numberOfRows == numberOfRows + s.numberOfRows == numberOfRows && + s.renew == renew } func BenchmarkMutateToProtoWithNestedMaps(b *testing.B) { diff --git a/hrpc/scan.go b/hrpc/scan.go index b3bd98a5..78ebcd89 100644 --- a/hrpc/scan.go +++ b/hrpc/scan.go @@ -33,6 +33,8 @@ const ( DefaultMaxResultsPerColumnFamily = math.MaxInt32 // DefaultCacheBlocks is the default setting to enable the block cache for get/scan queries DefaultCacheBlocks = true + // DefaultRenew is the default setting to enable renewing for scanners + DefaultRenew = false ) // Scanner is used to read data sequentially from HBase. @@ -77,6 +79,8 @@ type Scan struct { closeScanner bool allowPartialResults bool + + renew bool } // baseScan returns a Scan struct with default values set. @@ -93,6 +97,7 @@ func baseScan(ctx context.Context, table []byte, maxResultSize: DefaultMaxResultSize, numberOfRows: DefaultNumberOfRows, reversed: false, + renew: DefaultRenew, } err := applyOptions(s, options...) if err != nil { @@ -104,10 +109,10 @@ func baseScan(ctx context.Context, table []byte, func (s *Scan) String() string { return fmt.Sprintf("Scan{Table=%q StartRow=%q StopRow=%q TimeRange=(%d, %d) "+ "MaxVersions=%d NumberOfRows=%d MaxResultSize=%d Familes=%v Filter=%v "+ - "StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v}", + "StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v Renew=%v}", s.table, s.startRow, s.stopRow, s.fromTimestamp, s.toTimestamp, s.maxVersions, s.numberOfRows, s.maxResultSize, s.families, s.filter, - s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner) + s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner, s.renew) } // NewScan creates a scanner for the given table. @@ -189,6 +194,12 @@ func (s *Scan) TrackScanMetrics() bool { return s.trackScanMetrics } +// Renew return true if the scanner will be renewed at an interval to +// keep it alive +func (s *Scan) Renew() bool { + return s.renew +} + // ToProto converts this Scan into a protobuf message func (s *Scan) ToProto() proto.Message { scan := &pb.ScanRequest{ @@ -201,6 +212,7 @@ func (s *Scan) ToProto() proto.Message { // since we don't really time out our scans (unless context was cancelled) ClientHandlesHeartbeats: proto.Bool(true), TrackScanMetrics: &s.trackScanMetrics, + Renew: &s.renew, } if s.scannerID != math.MaxUint64 { scan.ScannerId = &s.scannerID @@ -387,3 +399,17 @@ func Attribute(key string, val []byte) func(Call) error { return nil } } + +// Renew is a an option for scan requests. +// Enables renewal of scanners at an interval to prevent timeout of scanners due to +// waiting/starvation +func Renew() func(Call) error { + return func(g Call) error { + scan, ok := g.(*Scan) + if !ok { + return errors.New("'Renew' option can only be used with Scan queries") + } + scan.renew = true + return nil + } +} diff --git a/integration_test.go b/integration_test.go index d41ff453..93584722 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2634,3 +2634,118 @@ func TestNewTableFromSnapshot(t *testing.T) { t.Fatalf("expected no cells after RestoreSnapshot in table %s key %s", table, key) } } + +// TestScannerTimeout tests whether there is a lease timeout between Next calls +// if the wait between them is too long. This is fixed by the renewal process +func TestScannerTimeout(t *testing.T) { + c := gohbase.NewClient(*host) + defer c.Close() + // Insert test data + keyPrefix := "scanner_timeout_test_" + numRows := 2 + for i := 0; i < numRows; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := []byte(strconv.Itoa(i)) + err := insertKeyValue(c, key, "cf", value) + if err != nil { + t.Fatalf("Failed to insert test data: %v", err) + } + } + + // Create a scan request + // renewal is default set to false + // We set result size to 1 to force a lease timeout between Next calls + scan, err := hrpc.NewScanStr(context.Background(), table, + hrpc.Families(map[string][]string{"cf": nil}), + hrpc.Filters(filter.NewPrefixFilter([]byte(keyPrefix))), + hrpc.NumberOfRows(1), + ) + if err != nil { + t.Fatalf("Failed to create scan request: %v", err) + } + + scanner := c.Scan(scan) + defer scanner.Close() + rsp, err := scanner.Next() + if err != nil { + t.Fatalf("Scanner.Next() returned error: %v", err) + } + if rsp == nil { + t.Fatalf("Unexpected end of scanner") + } + expectedValue := []byte(strconv.Itoa(0)) + if !bytes.Equal(rsp.Cells[0].Value, expectedValue) { + t.Errorf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue) + } + + // force lease timeout + time.Sleep(60 * time.Second) + + _, err = scanner.Next() + + // lease timeout should return an UnknownScannerException + if err != nil && strings.Contains(err.Error(), "org.apache.hadoop.hbase.UnknownScannerException") { + fmt.Println("Error matches: UnknownScannerException") + } else { + t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException") + } +} + +// TestScannerRenewal tests for the renewal process of scanners +// if the renew flag is enabled for a scan requset. If there is a long +// period of waiting between Next calls, the latter Next call should +// still succeed because we are renewing every lease timeout / 2 seconds +func TestScannerRenewal(t *testing.T) { + c := gohbase.NewClient(*host) + defer c.Close() + // Insert test data + keyPrefix := "scanner_renewal_test_" + numRows := 8 + for i := 0; i < numRows; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := []byte(strconv.Itoa(i)) + err := insertKeyValue(c, key, "cf", value) + if err != nil { + t.Fatalf("Failed to insert test data: %v", err) + } + } + + // Create a scan request + // Turn on renewal + // We set result size to 1 to force a lease timeout between Next calls + scan, err := hrpc.NewScanStr(context.Background(), table, + hrpc.Families(map[string][]string{"cf": nil}), + hrpc.Filters(filter.NewPrefixFilter([]byte(keyPrefix))), + hrpc.NumberOfRows(1), + hrpc.Renew(), + ) + if err != nil { + t.Fatalf("Failed to create scan request: %v", err) + } + + scanner := c.Scan(scan) + defer scanner.Close() + for i := 0; i < numRows; i++ { + rsp, err := scanner.Next() + // Sleep for 60 secs to trigger renewal + time.Sleep(60 * time.Second) + if err != nil { + t.Fatalf("Scanner.Next() returned error: %v", err) + } + if rsp == nil { + t.Fatalf("Unexpected end of scanner") + } + expectedValue := []byte(strconv.Itoa(i)) + if !bytes.Equal(rsp.Cells[0].Value, expectedValue) { + t.Errorf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue) + } + } + // Ensure scanner is exhausted + rsp, err := scanner.Next() + if err != io.EOF { + t.Fatalf("Expected EOF error, got: %v", err) + } + if rsp != nil { + t.Fatalf("Expected nil response at end of scan, got: %v", rsp) + } +} diff --git a/scanner.go b/scanner.go index 4b7bc071..c6c055fd 100644 --- a/scanner.go +++ b/scanner.go @@ -12,6 +12,7 @@ import ( "fmt" "io" "math" + "time" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" @@ -39,6 +40,9 @@ type scanner struct { results []*pb.Result closed bool scanMetrics map[string]int64 + + renewTimer *time.Timer + renewInterval time.Duration } func (s *scanner) fetch() ([]*pb.Result, error) { @@ -57,7 +61,6 @@ func (s *scanner) fetch() ([]*pb.Result, error) { } s.update(resp, region) - if s.isDone(resp, region) { s.Close() } @@ -137,6 +140,8 @@ func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner { startRow: rpc.StartRow(), curRegionScannerID: noScannerID, scanMetrics: sm, + // value is lease timeout / 2 + renewInterval: 10 * time.Second, } } @@ -149,6 +154,15 @@ func toLocalResult(r *pb.Result) *hrpc.Result { } func (s *scanner) Next() (*hrpc.Result, error) { + s.stopRenewTimer() + res, err := s.nextInternal() + if err == nil && s.rpc.Renew() { + s.startRenewTimer() + } + return res, err +} + +func (s *scanner) nextInternal() (*hrpc.Result, error) { var ( result, partial *pb.Result err error @@ -202,13 +216,26 @@ func (s *scanner) request() (*pb.ScanResponse, hrpc.RegionInfo, error) { ) if s.isRegionScannerClosed() { + // filter out the renew option since this is not for renewing + // we do this because the response when setting renew=true does + // not create a new Scan + options := s.rpc.Options() + filteredOptions := make([]func(hrpc.Call) error, 0, len(options)) + for _, opt := range options { + dummyScan := &hrpc.Scan{} + opt(dummyScan) + if !dummyScan.Renew() { + filteredOptions = append(filteredOptions, opt) + } + } + // open a new region scan to scan on a new region rpc, err = hrpc.NewScanRange( s.rpc.Context(), s.rpc.Table(), s.startRow, s.rpc.StopRow(), - s.rpc.Options()...) + filteredOptions...) } else { // continuing to scan current region rpc, err = hrpc.NewScanRange(s.rpc.Context(), @@ -277,6 +304,7 @@ func (s *scanner) Close() error { if s.closed { return nil } + s.stopRenewTimer() s.closed = true // close the last region scanner s.closeRegionScanner() @@ -359,3 +387,58 @@ func (s *scanner) closeRegionScanner() { } s.curRegionScannerID = noScannerID } + +// renews a scanner by resending scan request with renew = true +func (s *scanner) renew() error { + select { + case <-s.rpc.Context().Done(): + return errors.New("context was canceled") + default: + rpc, err := hrpc.NewScanRange(s.rpc.Context(), + s.rpc.Table(), + s.startRow, + nil, + hrpc.ScannerID(s.curRegionScannerID), + hrpc.NumberOfRows(s.rpc.NumberOfRows()), + hrpc.Priority(s.rpc.Priority()), + hrpc.Renew(), + ) + if err != nil { + return err + } + res, sendErr := s.SendRPC(rpc) + if sendErr != nil { + return sendErr + } + resp, ok := res.(*pb.ScanResponse) + if !ok { + return errors.New("got non-ScanResponse for scan request") + } + // save new scanner id + s.curRegionScannerID = resp.GetScannerId() + return nil + } +} + +// starts the renew timer, after it fires, renewScanner is called +func (s *scanner) startRenewTimer() { + if s.rpc.Renew() { + s.renewTimer = time.AfterFunc(s.renewInterval, s.renewScanner) + } +} + +// stops the renewal timer +func (s *scanner) stopRenewTimer() { + if s.rpc.Renew() && s.renewTimer != nil { + s.renewTimer.Stop() + } +} + +// renews the scanner and restarts timer after renewal +func (s *scanner) renewScanner() { + err := s.renew() + if err != nil { + return + } + s.startRenewTimer() +}