diff --git a/hrpc/call.go b/hrpc/call.go index a15e6e4c..04c618e5 100644 --- a/hrpc/call.go +++ b/hrpc/call.go @@ -61,6 +61,7 @@ type Call interface { ResultChan() chan RPCResult Description() string // Used for tracing and metrics Context() context.Context + SkipRetry() bool } type withOptions interface { @@ -90,6 +91,20 @@ func SkipBatch() func(Call) error { } } +func SkipRetry() func(Call) error { + return func(c Call) error { + if b, ok := c.(canSetSkipRetry); ok { + b.setSkipRetry(true) + return nil + } + return errors.New("'SkipRetry' is not implemented for this call") + } +} + +type canSetSkipRetry interface { + setSkipRetry(v bool) +} + // hasQueryOptions is interface that needs to be implemented by calls // that allow to provide Families and Filters options. type hasQueryOptions interface { @@ -119,12 +134,22 @@ type base struct { region RegionInfo resultch chan RPCResult + + skipRetry bool } func (b *base) Context() context.Context { return b.ctx } +func (b *base) SkipRetry() bool { + return b.skipRetry +} + +func (b *base) setSkipRetry() { + b.skipRetry = true +} + func (b *base) Region() RegionInfo { return b.region } diff --git a/region/multi.go b/region/multi.go index 29784928..219ce61e 100644 --- a/region/multi.go +++ b/region/multi.go @@ -323,6 +323,11 @@ func (m *multi) Context() context.Context { return context.Background() } +// SkipRetry always returns false for Multi. +func (m *multi) SkipRetry() bool { + return false +} + // String returns a description of this call func (m *multi) String() string { return "MULTI" diff --git a/rpc.go b/rpc.go index b13e5788..f3264873 100644 --- a/rpc.go +++ b/rpc.go @@ -97,16 +97,18 @@ func (c *client) SendRPC(rpc hrpc.Call) (msg proto.Message, err error) { return nil, err } msg, err = c.sendRPCToRegionClient(ctx, rpc, rc) - switch err.(type) { - case region.RetryableError: - sp.AddEvent("retrySleep") - backoff, err = sleepAndIncreaseBackoff(ctx, backoff) - if err != nil { - return msg, err + if !rpc.SkipRetry() { + switch err.(type) { + case region.RetryableError: + sp.AddEvent("retrySleep") + backoff, err = sleepAndIncreaseBackoff(ctx, backoff) + if err != nil { + return msg, err + } + continue // retry + case region.ServerError, region.NotServingRegionError: + continue // retry } - continue // retry - case region.ServerError, region.NotServingRegionError: - continue // retry } return msg, err } diff --git a/rpc_test.go b/rpc_test.go index be8a925f..786e2d41 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -88,6 +88,7 @@ func TestSendRPCSanity(t *testing.T) { expMsg := &pb.ScanResponse{} result <- hrpc.RPCResult{Msg: expMsg} mockCall.EXPECT().ResultChan().Return(result).Times(1) + mockCall.EXPECT().SkipRetry().Return(false) msg, err := c.SendRPC(mockCall) if err != nil { t.Fatal(err) diff --git a/scanner.go b/scanner.go index a5aaa698..2a92008d 100644 --- a/scanner.go +++ b/scanner.go @@ -337,22 +337,20 @@ func (s *scanner) closeRegionScanner() { return } if !s.rpc.IsClosing() { - // Not closed at server side - // if we are closing in the middle of scanning a region, - // send a close scanner request - // TODO: add a deadline + // Not closed at server side if we are closing in the middle of scanning + // a region, so send a close scanner request. This is a fire-and-forget + // call, as if we fail the scanner lease will expire and be closed + // automatically by HBase. rpc, err := hrpc.NewScanRange(context.Background(), s.rpc.Table(), s.startRow, nil, hrpc.ScannerID(s.curRegionScannerID), hrpc.CloseScanner(), - hrpc.NumberOfRows(0)) + hrpc.NumberOfRows(0), + hrpc.SkipRetry(), + ) if err != nil { panic(fmt.Sprintf("should not happen: %s", err)) } - - // If the request fails, the scanner lease will be expired - // and it will be closed automatically by hbase. - // No need to bother clients about that. go s.SendRPC(rpc) } s.curRegionScannerID = noScannerID diff --git a/test/mock/call.go b/test/mock/call.go index faf1eeb2..f0de7df8 100644 --- a/test/mock/call.go +++ b/test/mock/call.go @@ -151,6 +151,20 @@ func (mr *MockCallMockRecorder) SetRegion(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetRegion", reflect.TypeOf((*MockCall)(nil).SetRegion), arg0) } +// SkipRetry mocks base method. +func (m *MockCall) SkipRetry() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SkipRetry") + ret0, _ := ret[0].(bool) + return ret0 +} + +// SkipRetry indicates an expected call of SkipRetry. +func (mr *MockCallMockRecorder) SkipRetry() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SkipRetry", reflect.TypeOf((*MockCall)(nil).SkipRetry)) +} + // Table mocks base method. func (m *MockCall) Table() []byte { m.ctrl.T.Helper()