Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No examples tell me how to use LookupPartitionStrategy&PredicatePartitionStrategy #109

Open
xiaofeige opened this issue Mar 10, 2023 · 2 comments

Comments

@xiaofeige
Copy link

any examples?
why partitions cann't not be group by grpc method?

@platinummonkey
Copy link
Owner

You might look at the tests for current examples:

https://github.com/platinummonkey/go-concurrency-limits/blob/master/strategy/lookup_partition_test.go
https://github.com/platinummonkey/go-concurrency-limits/blob/master/strategy/predicate_partition_test.go

The per method is interesting. Currently this https://github.com/platinummonkey/go-concurrency-limits/blob/master/grpc/grpc_unary.go#L11 applies to all methods but could easily obtain the method name (and context args for something even more custom) with your own interceptor. The per method name seems like a relatively good FR that can be generalized

@xiaofeige
Copy link
Author

`
func (i *InterceptorManager) GetAutoRateLimiter() lwp.ServerInterceptor {

emptyLimitFunc := func(ctx context.Context, req *msgpack.ArgsCodec, info *lwp.ServerInfo, handler lwp.UnaryHandler) (resp interface{}, err error) {
	return handler(ctx, req)
}

if !fileConf.Config.FileServerSettting.StartAutoRateLimit {
	return emptyLimitFunc
}

partitions := make(map[string]*strategy.LookupPartition)
partitions["default"] = strategy.NewLookupPartitionWithMetricRegistry("default", 0.5, 100, limitCore.EmptyMetricRegistryInstance)
partitions["/r/Icon/genAutomaticIcon"] = strategy.NewLookupPartitionWithMetricRegistry("/r/Icon/genAutomaticIcon", 0.2, 10, limitCore.EmptyMetricRegistryInstance)

fileLimitStrategy, err := strategy.NewLookupPartitionStrategyWithMetricRegistry(
	partitions,
	func(ctx context.Context) string {
		rpcCtx, ok := lwputil.RpcContextFromContext(ctx)
		if !ok {
			return "default"
		}

		if _, ok = partitions[rpcCtx.Url]; !ok {
			return "default"
		}

		return rpcCtx.Url
	},
	1000,
	limitCore.EmptyMetricRegistryInstance,
)

tags := make([]string, 0)
autoLimiter, err := limiter.NewDefaultLimiterWithDefaults(
	"file",
	fileLimitStrategy,
	&log.RateLimitLogger{},
	limitCore.EmptyMetricRegistryInstance,
	tags...,
)

if err != nil || autoLimiter == nil {
	log.Logger.Errorf("create auto limiter err:%v", err)
	return emptyLimitFunc
}

go func() {
	for {
		time.Sleep(1 * time.Second)
		log.Logger.Infof("[rate_limit_state]: %s estimate:%d", autoLimiter.String(), autoLimiter.EstimatedLimit())
	}
}()

classifyRspCode := func(url string, err error) string {
	if err == nil {
		return "success"
	}

	lwpStat, ok := err.(*status.LwpStatus)
	if !ok {
		log.Logger.Infof("convert lwp error failed, err:%T", err)
		return "ignore"
	}

	if lwpStat.LwpCode() == 408 || lwpStat.LwpCode() >= 500 {
		return "drop"
	}

	return "ignore"
}

return func(ctx context.Context, args *msgpack.ArgsCodec, info *lwp.ServerInfo, handler lwp.UnaryHandler) (resp interface{}, err error) {

	rpcCtx, ok := lwputil.RpcContextFromContext(ctx)
	if !ok {
		return nil, status.NewLwpBadRequestErrorWithScope(status.LwpDefaultLang, lwputil.BizCodeRpcContextMissing,
			"RpcContext_empty", "", lwputil.GetBizScope("AuthInterceptor"))
	}

	token, ok := autoLimiter.Acquire(ctx)
	if !ok {
		log.LogFmt.CommonErrorLog(ctx, "rate_limit", fmt.Errorf("auto_rate_limit"), map[string]interface{}{
			"uid": rpcCtx.Uid,
			"did": rpcCtx.Did,
		})
		return nil, status.NewLwpBadRequestErrorWithScope(status.LwpDefaultLang, fmt.Sprint(types.ErrTooManyRequest),
			"服务繁忙,请稍后重试", "", lwputil.GetBizScope("AuthInterceptor"))
	}
	defer func() {
		rspType := classifyRspCode(rpcCtx.Url, err)

		switch rspType {
		case "success":
			token.OnSuccess()
		case "ignore":
			token.OnIgnore()
		case "drop":
			token.OnDropped()
		default:
			token.OnIgnore()
		}
	}()

	return handler(ctx, args)
}

}
`

this is how I code, but it seems like the token wasn't released correctly. In my presure test, all the request will be forbinden finally, and it won't recover unless I restart the server, even if I stop the presure test for a long time.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants