diff --git a/hrpc/hrpc_test.go b/hrpc/hrpc_test.go index c8934abf..79a10fc2 100644 --- a/hrpc/hrpc_test.go +++ b/hrpc/hrpc_test.go @@ -218,38 +218,48 @@ func TestNewScan(t *testing.T) { stopb := []byte("100") scan, err := NewScan(ctx, tableb) if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, - DefaultNumberOfRows) { + DefaultNumberOfRows, 0, false) { t.Errorf("Scan1 didn't set attributes correctly.") } scan, err = NewScanRange(ctx, tableb, startb, stopb) if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, nil, nil, - DefaultNumberOfRows) { + DefaultNumberOfRows, 0, false) { t.Errorf("Scan2 didn't set attributes correctly.") } scan, err = NewScanStr(ctx, table) if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, - DefaultNumberOfRows) { + DefaultNumberOfRows, 0, false) { t.Errorf("Scan3 didn't set attributes correctly.") } scan, err = NewScanRangeStr(ctx, table, start, stop) if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, nil, nil, - DefaultNumberOfRows) { + DefaultNumberOfRows, 0, false) { t.Errorf("Scan4 didn't set attributes correctly.") } scan, err = NewScanRange(ctx, tableb, startb, stopb, Families(fam), Filters(filter1)) if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, fam, filter1, - DefaultNumberOfRows) { + DefaultNumberOfRows, 0, false) { t.Errorf("Scan5 didn't set attributes correctly.") } scan, err = NewScan(ctx, tableb, Filters(filter1), Families(fam)) if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, fam, filter1, - DefaultNumberOfRows) { + DefaultNumberOfRows, 0, false) { t.Errorf("Scan6 didn't set attributes correctly.") } scan, err = NewScan(ctx, tableb, NumberOfRows(1)) - if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, 1) { + if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, 1, 0, false) { t.Errorf("Scan7 didn't set number of versions correctly") } + scan, err = NewScan(ctx, tableb, RenewInterval(10*time.Second)) + if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, + DefaultNumberOfRows, 10*time.Second, false) { + t.Errorf("Scan8 didn't set renew correctly") + } + scan, err = NewScan(ctx, tableb, RenewalScan()) + if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, + DefaultNumberOfRows, 0, true) { + t.Errorf("Scan8 didn't set renew correctly") + } } func TestScanToProto(t *testing.T) { @@ -286,6 +296,7 @@ func TestScanToProto(t *testing.T) { TimeRange: &pb.TimeRange{}, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // explicitly set configurable attributes to default values @@ -319,6 +330,7 @@ func TestScanToProto(t *testing.T) { CacheBlocks: nil, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // set configurable attributes to non-default values @@ -331,6 +343,8 @@ func TestScanToProto(t *testing.T) { MaxVersions(89), CacheBlocks(!DefaultCacheBlocks), TimeRangeUint64(1024, 1738), + RenewInterval(10*time.Second), + RenewalScan(), ) return s }(), @@ -353,6 +367,7 @@ func TestScanToProto(t *testing.T) { CacheBlocks: proto.Bool(!DefaultCacheBlocks), }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(true), }, }, { // test that pb.ScanRequest.Scan is nil when scanner id is specificed @@ -378,6 +393,7 @@ func TestScanToProto(t *testing.T) { ClientHandlesHeartbeats: proto.Bool(true), Scan: nil, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // set reversed attribute @@ -398,6 +414,7 @@ func TestScanToProto(t *testing.T) { Reversed: proto.Bool(true), }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // set scan attribute @@ -424,6 +441,7 @@ func TestScanToProto(t *testing.T) { }, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // scan key range @@ -445,6 +463,7 @@ func TestScanToProto(t *testing.T) { StopRow: stopRow, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, { // set filters and families @@ -467,6 +486,7 @@ func TestScanToProto(t *testing.T) { Filter: pbFilter, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), } }(), }, @@ -487,6 +507,7 @@ func TestScanToProto(t *testing.T) { TimeRange: &pb.TimeRange{}, }, TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), }, }, // set TrackScanMetrics @@ -507,6 +528,51 @@ func TestScanToProto(t *testing.T) { TimeRange: &pb.TimeRange{}, }, TrackScanMetrics: proto.Bool(true), + Renew: proto.Bool(false), + } + }(), + }, + // set RenewInterval, this shouldn't affect the protobuf + { + s: func() *Scan { + s, _ := NewScanStr(ctx, "", RenewInterval(10*time.Second)) + return s + }(), + expProto: func() *pb.ScanRequest { + return &pb.ScanRequest{ + Region: rs, + NumberOfRows: proto.Uint32(DefaultNumberOfRows), + CloseScanner: proto.Bool(false), + ClientHandlesPartials: proto.Bool(true), + ClientHandlesHeartbeats: proto.Bool(true), + Scan: &pb.Scan{ + MaxResultSize: proto.Uint64(DefaultMaxResultSize), + TimeRange: &pb.TimeRange{}, + }, + TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(false), + } + }(), + }, + // set RenewalScan + { + s: func() *Scan { + s, _ := NewScanStr(ctx, "", RenewalScan()) + return s + }(), + expProto: func() *pb.ScanRequest { + return &pb.ScanRequest{ + Region: rs, + NumberOfRows: proto.Uint32(DefaultNumberOfRows), + CloseScanner: proto.Bool(false), + ClientHandlesPartials: proto.Bool(true), + ClientHandlesHeartbeats: proto.Bool(true), + Scan: &pb.Scan{ + MaxResultSize: proto.Uint64(DefaultMaxResultSize), + TimeRange: &pb.TimeRange{}, + }, + TrackScanMetrics: proto.Bool(false), + Renew: proto.Bool(true), } }(), }, @@ -1573,7 +1639,8 @@ func TestDeserializeCellBlocksScan(t *testing.T) { } func confirmScanAttributes(ctx context.Context, s *Scan, table, start, stop []byte, - fam map[string][]string, fltr filter.Filter, numberOfRows uint32) bool { + fam map[string][]string, fltr filter.Filter, numberOfRows uint32, + renewInterval time.Duration, renewalScan bool) bool { if fltr == nil && s.filter != nil { return false } @@ -1582,7 +1649,9 @@ func confirmScanAttributes(ctx context.Context, s *Scan, table, start, stop []by bytes.Equal(s.StartRow(), start) && bytes.Equal(s.StopRow(), stop) && reflect.DeepEqual(s.families, fam) && - s.numberOfRows == numberOfRows + s.numberOfRows == numberOfRows && + s.renewInterval == renewInterval && + s.renewalScan == renewalScan } func BenchmarkMutateToProtoWithNestedMaps(b *testing.B) { diff --git a/hrpc/scan.go b/hrpc/scan.go index b3bd98a5..d83e1423 100644 --- a/hrpc/scan.go +++ b/hrpc/scan.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math" + "time" "github.com/tsuna/gohbase/pb" "google.golang.org/protobuf/proto" @@ -77,6 +78,9 @@ type Scan struct { closeScanner bool allowPartialResults bool + + renewInterval time.Duration + renewalScan bool } // baseScan returns a Scan struct with default values set. @@ -93,6 +97,8 @@ func baseScan(ctx context.Context, table []byte, maxResultSize: DefaultMaxResultSize, numberOfRows: DefaultNumberOfRows, reversed: false, + renewInterval: 0 * time.Second, + renewalScan: false, } err := applyOptions(s, options...) if err != nil { @@ -104,10 +110,12 @@ func baseScan(ctx context.Context, table []byte, func (s *Scan) String() string { return fmt.Sprintf("Scan{Table=%q StartRow=%q StopRow=%q TimeRange=(%d, %d) "+ "MaxVersions=%d NumberOfRows=%d MaxResultSize=%d Familes=%v Filter=%v "+ - "StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v}", + "StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v RenewInterval=%v"+ + "RenewalScan=%v}", s.table, s.startRow, s.stopRow, s.fromTimestamp, s.toTimestamp, s.maxVersions, s.numberOfRows, s.maxResultSize, s.families, s.filter, - s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner) + s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner, s.renewInterval, + s.renewalScan) } // NewScan creates a scanner for the given table. @@ -189,6 +197,18 @@ func (s *Scan) TrackScanMetrics() bool { return s.trackScanMetrics } +// RenewInterval returns the interval at which the scanner will be renewed +// which is usually lease timeout / 2 secs +func (s *Scan) RenewInterval() time.Duration { + return s.renewInterval +} + +// RenewalScan returns whether this scan is to be used only a renewal request +// to hbase +func (s *Scan) RenewalScan() bool { + return s.renewalScan +} + // ToProto converts this Scan into a protobuf message func (s *Scan) ToProto() proto.Message { scan := &pb.ScanRequest{ @@ -201,6 +221,11 @@ func (s *Scan) ToProto() proto.Message { // since we don't really time out our scans (unless context was cancelled) ClientHandlesHeartbeats: proto.Bool(true), TrackScanMetrics: &s.trackScanMetrics, + Renew: proto.Bool(false), + } + // Tells hbase whether this request is for scanner renewal + if s.renewalScan { + scan.Renew = &s.renewalScan } if s.scannerID != math.MaxUint64 { scan.ScannerId = &s.scannerID @@ -387,3 +412,30 @@ func Attribute(key string, val []byte) func(Call) error { return nil } } + +// RenewInterval is a an option for scan requests. +// Enables renewal of scanners at an interval to prevent timeout of scanners due to +// waiting/starvation +func RenewInterval(interval time.Duration) func(Call) error { + return func(g Call) error { + scan, ok := g.(*Scan) + if !ok { + return errors.New("'RenewInterval' option can only be used with Scan queries") + } + scan.renewInterval = interval + return nil + } +} + +// RenewalScan is an option for scan requests. +// Indicates that this Scan request will be used for the renewal of a scanner only +func RenewalScan() func(Call) error { + return func(g Call) error { + scan, ok := g.(*Scan) + if !ok { + return errors.New("'RenewScan' option can only be used with Scan queries") + } + scan.renewalScan = true + return nil + } +} diff --git a/integration_test.go b/integration_test.go index d41ff453..f8c7887b 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2634,3 +2634,177 @@ func TestNewTableFromSnapshot(t *testing.T) { t.Fatalf("expected no cells after RestoreSnapshot in table %s key %s", table, key) } } + +// TestScannerTimeout makes sure that without the Renew flag on we get +// a lease timeout between Next calls if the wait between them is too long. +func TestScannerTimeout(t *testing.T) { + c := gohbase.NewClient(*host) + defer c.Close() + // Insert test data + keyPrefix := "scanner_timeout_test_" + numRows := 2 + for i := 0; i < numRows; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := []byte(strconv.Itoa(i)) + err := insertKeyValue(c, key, "cf", value) + if err != nil { + t.Fatalf("Failed to insert test data: %v", err) + } + } + + // Create a scan request + // renewal is default set to false + // We set result size to 1 to force a lease timeout between Next calls + scan, err := hrpc.NewScanStr(context.Background(), table, + hrpc.Families(map[string][]string{"cf": nil}), + hrpc.Filters(filter.NewPrefixFilter([]byte(keyPrefix))), + hrpc.NumberOfRows(1), + ) + if err != nil { + t.Fatalf("Failed to create scan request: %v", err) + } + + scanner := c.Scan(scan) + defer scanner.Close() + rsp, err := scanner.Next() + if err != nil { + t.Fatalf("Scanner.Next() returned error: %v", err) + } + if rsp == nil { + t.Fatalf("Unexpected end of scanner") + } + expectedValue := []byte(strconv.Itoa(0)) + if !bytes.Equal(rsp.Cells[0].Value, expectedValue) { + t.Errorf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue) + } + + // force lease timeout + time.Sleep(60 * time.Second) + + _, err = scanner.Next() + + // lease timeout should return an UnknownScannerException + if err != nil && strings.Contains(err.Error(), "org.apache.hadoop.hbase.UnknownScannerException") { + fmt.Println("Error matches: UnknownScannerException") + } else { + t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException") + } +} + +// TestScannerRenewal tests for the renewal process of scanners +// if the renew flag is enabled for a scan requset. If there is a long +// period of waiting between Next calls, the latter Next call should +// still succeed because we are renewing every lease timeout / 2 seconds +func TestScannerRenewal(t *testing.T) { + c := gohbase.NewClient(*host) + defer c.Close() + // Insert test data + keyPrefix := "scanner_renewal_test_" + numRows := 8 + for i := 0; i < numRows; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := []byte(strconv.Itoa(i)) + err := insertKeyValue(c, key, "cf", value) + if err != nil { + t.Fatalf("Failed to insert test data: %v", err) + } + } + + // Create a scan request + // Turn on renewal + // We set result size to 1 to force a lease timeout between Next calls + scan, err := hrpc.NewScanStr(context.Background(), table, + hrpc.Families(map[string][]string{"cf": nil}), + hrpc.Filters(filter.NewPrefixFilter([]byte(keyPrefix))), + hrpc.NumberOfRows(1), + hrpc.RenewInterval(10*time.Second), + ) + if err != nil { + t.Fatalf("Failed to create scan request: %v", err) + } + + scanner := c.Scan(scan) + defer scanner.Close() + for i := 0; i < numRows; i++ { + rsp, err := scanner.Next() + // Sleep for 60 secs to trigger renewal + time.Sleep(60 * time.Second) + if err != nil { + t.Fatalf("Scanner.Next() returned error: %v", err) + } + if rsp == nil { + t.Fatalf("Unexpected end of scanner") + } + expectedValue := []byte(strconv.Itoa(i)) + if !bytes.Equal(rsp.Cells[0].Value, expectedValue) { + t.Fatalf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue) + } + } + // Ensure scanner is exhausted + rsp, err := scanner.Next() + if err != io.EOF { + t.Fatalf("Expected EOF error, got: %v", err) + } + if rsp != nil { + t.Fatalf("Expected nil response at end of scan, got: %v", rsp) + } +} + +func TestScannerRenewalCancellation(t *testing.T) { + c := gohbase.NewClient(*host) + defer c.Close() + + // Insert test data + keyPrefix := "scanner_renewal_cancel_test_" + numRows := 2 + for i := 0; i < numRows; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := []byte(strconv.Itoa(i)) + err := insertKeyValue(c, key, "cf", value) + if err != nil { + t.Fatalf("Failed to insert test data: %v", err) + } + } + + // Create a context with cancellation + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + scan, err := hrpc.NewScanStr(ctx, table, + hrpc.Families(map[string][]string{"cf": nil}), + hrpc.Filters(filter.NewPrefixFilter([]byte(keyPrefix))), + hrpc.NumberOfRows(1), + hrpc.RenewInterval(10*time.Second), + ) + if err != nil { + t.Fatalf("Failed to create scan request: %v", err) + } + + scanner := c.Scan(scan) + defer scanner.Close() + + rsp, err := scanner.Next() + if err != nil { + t.Fatalf("Scanner.Next() returned error: %v", err) + } + if rsp == nil { + t.Fatalf("Unexpected end of scanner") + } + expectedValue := []byte(strconv.Itoa(0)) + if !bytes.Equal(rsp.Cells[0].Value, expectedValue) { + t.Errorf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue) + } + + // Cancel the context + cancel() + + // Next call should return an error + _, err = scanner.Next() + + if err == nil { + t.Fatal("Expected error after context cancellation, got nil") + } + if err != context.Canceled { + t.Fatalf("Expected context.Canceled error, got: %v", err) + } +} diff --git a/scanner.go b/scanner.go index 4b7bc071..db330cd4 100644 --- a/scanner.go +++ b/scanner.go @@ -11,7 +11,9 @@ import ( "errors" "fmt" "io" + "log/slog" "math" + "time" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" @@ -39,6 +41,9 @@ type scanner struct { results []*pb.Result closed bool scanMetrics map[string]int64 + + logger *slog.Logger + renewCancel context.CancelFunc } func (s *scanner) fetch() ([]*pb.Result, error) { @@ -57,7 +62,6 @@ func (s *scanner) fetch() ([]*pb.Result, error) { } s.update(resp, region) - if s.isDone(resp, region) { s.Close() } @@ -137,6 +141,7 @@ func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner { startRow: rpc.StartRow(), curRegionScannerID: noScannerID, scanMetrics: sm, + logger: slog.Default(), } } @@ -149,6 +154,21 @@ func toLocalResult(r *pb.Result) *hrpc.Result { } func (s *scanner) Next() (*hrpc.Result, error) { + if s.rpc.RenewInterval() > 0 && s.renewCancel != nil { + s.renewCancel() + } + + res, err := s.nextInternal() + + renewCtx, cancel := context.WithCancel(s.rpc.Context()) + s.renewCancel = cancel + if err == nil && s.rpc.RenewInterval() > 0 { + go s.renewLoop(renewCtx, s.startRow) + } + return res, err +} + +func (s *scanner) nextInternal() (*hrpc.Result, error) { var ( result, partial *pb.Result err error @@ -208,6 +228,7 @@ func (s *scanner) request() (*pb.ScanResponse, hrpc.RegionInfo, error) { s.rpc.Table(), s.startRow, s.rpc.StopRow(), + // filteredOptions...) s.rpc.Options()...) } else { // continuing to scan current region @@ -218,6 +239,7 @@ func (s *scanner) request() (*pb.ScanResponse, hrpc.RegionInfo, error) { hrpc.ScannerID(s.curRegionScannerID), hrpc.NumberOfRows(s.rpc.NumberOfRows()), hrpc.Priority(s.rpc.Priority()), + hrpc.RenewInterval(s.rpc.RenewInterval()), ) } if err != nil { @@ -277,6 +299,9 @@ func (s *scanner) Close() error { if s.closed { return nil } + if s.renewCancel != nil { + s.renewCancel() + } s.closed = true // close the last region scanner s.closeRegionScanner() @@ -359,3 +384,50 @@ func (s *scanner) closeRegionScanner() { } s.curRegionScannerID = noScannerID } + +// renews a scanner by resending scan request with renew = true +func (s *scanner) renew(startRow []byte) error { + if err := s.rpc.Context().Err(); err != nil { + return err + } + rpc, err := hrpc.NewScanRange(s.rpc.Context(), + s.rpc.Table(), + startRow, + nil, + hrpc.ScannerID(s.curRegionScannerID), + hrpc.Priority(s.rpc.Priority()), + hrpc.RenewalScan(), + ) + if err != nil { + return err + } + res, err := s.SendRPC(rpc) + if err != nil { + return err + } + resp, ok := res.(*pb.ScanResponse) + if !ok { + return errors.New("got non-ScanResponse for scan request") + } + // save new scanner id + s.curRegionScannerID = resp.GetScannerId() + return nil + +} + +func (s *scanner) renewLoop(ctx context.Context, startRow []byte) { + t := time.NewTicker(s.rpc.RenewInterval()) + defer t.Stop() + + for { + select { + case <-t.C: + if err := s.renew(startRow); err != nil { + s.logger.Error("error renewing scanner", "err", err) + return + } + case <-ctx.Done(): + return + } + } +}