diff --git a/src/bec.app.src b/src/bec.app.src index a06f430..da4f345 100644 --- a/src/bec.app.src +++ b/src/bec.app.src @@ -15,5 +15,8 @@ {env, [ {bitbucket_username, ""} , {bitbucket_password, ""} , {bitbucket_url, "http://localhost:8000" } + , {base_sleep_time, 100} + , {cap_sleep_time, 60000} + , {max_retries, 15} ]} ]}. diff --git a/src/bitbucket_http.erl b/src/bitbucket_http.erl index a561a1c..ca5e2d8 100644 --- a/src/bitbucket_http.erl +++ b/src/bitbucket_http.erl @@ -31,6 +31,11 @@ -type request() :: {url(), [header()], string(), body()} | {url(), [header()]}. +-type retry_state() :: #{n := non_neg_integer(), + base_sleep_time := pos_integer(), + cap_sleep_time := pos_integer(), + max_retries := pos_integer() | infinity }. + %%============================================================================== %% DELETE %%============================================================================== @@ -72,7 +77,7 @@ do_request(Method, Url) -> Headers = headers(), Request = {Url, Headers}, ok = ?LOG_DEBUG("HTTP Request: (~p) ~p~n", [Method, Url]), - do_http_request(Method, Request). + do_http_request(Method, Request, default_retry_state()). -spec do_request(method(), url(), body()) -> {ok, map()} | {error, any()}. do_request(Method, Url, Body) -> @@ -81,11 +86,11 @@ do_request(Method, Url, Body) -> Request = {Url, Headers, Type, Body}, ok = ?LOG_DEBUG("HTTP Request: (~p) ~p~n~p~n", [Method, Url, Headers]), - do_http_request(Method, Request). + do_http_request(Method, Request, default_retry_state()). --spec do_http_request(method(), request()) -> +-spec do_http_request(method(), request(), retry_state()) -> {ok, map()} | {ok, [map()]} | {error, any()}. -do_http_request(Method, Request) -> +do_http_request(Method, Request, RetryState) -> HTTPOptions = [{autoredirect, true}], Options = [], %% Disable pipelining to avoid the socket getting closed during long runs @@ -95,7 +100,19 @@ do_http_request(Method, Request) -> ]), Result = httpc:request(Method, Request, HTTPOptions, Options), ok = ?LOG_DEBUG("HTTP Result: ~p~n", [Result]), - handle_result(Result). + case handle_result(Result) of + {error, {retry, E}} -> + case should_retry(RetryState) of + {error, max_retries_exceeded} -> + ?LOG_ERROR("Request was rate-limited and max retries was exceeded."), + {error, E}; + {ok, RetryState0} -> + ?LOG_WARNING("Request was rate-limited, retrying...", []), + do_http_request(Method, Request, RetryState0) + end; + Other -> + Other + end. -spec headers() -> [{string(), string()}]. headers() -> @@ -121,6 +138,8 @@ handle_result({ok, {{_Ver, Status, _Phrase}, _H, Body}}) when Status =:= 200; Status =:= 202; Status =:= 204 -> {ok, decode_body(Body)}; +handle_result({ok, {{_Ver, Status, _Phrase}, _H, Body}}) when Status =:= 429 -> + {error, {retry, decode_error(Body)}}; handle_result({ok, {{_Version, _Status, _Phrase}, _Headers, Resp}}) -> {error, decode_error(Resp)}; handle_result({error, Reason}) -> @@ -142,3 +161,29 @@ decode_body(Body0) -> decode_error(Body) -> Errors = maps:get(<<"errors">>, decode_body(Body)), [M || #{<<"message">> := M} <- Errors]. + +-spec default_retry_state() -> retry_state(). +default_retry_state() -> + {ok, BaseSleepTime} = application:get_env(bec, base_sleep_time), + {ok, CapSleepTime} = application:get_env(bec, cap_sleep_time), + {ok, MaxRetries} = application:get_env(bec, max_retries), + #{n => 0, + base_sleep_time => BaseSleepTime, + cap_sleep_time => CapSleepTime, + max_retries => MaxRetries}. + +-spec should_retry(RetryState :: retry_state()) -> + {ok, retry_state()} | {error, max_retries_exceeded}. +should_retry(#{n := N, max_retries := Max}) when N >= Max -> + {error, max_retries_exceeded}; +should_retry(#{ n := N + , base_sleep_time := BaseSleepTime + , cap_sleep_time := CapSleepTime} = RetryState0) -> + Sleep = calculate_sleep_time(N, BaseSleepTime, CapSleepTime), + ?LOG_DEBUG("Sleep-time: ~w ms", [Sleep]), + timer:sleep(Sleep), + {ok, RetryState0#{ n => N + 1 }}. + +calculate_sleep_time(N, BaseSleepTime, CapSleepTime) -> + Temp = min(CapSleepTime, BaseSleepTime bsl N), + Temp div 2 + rand:uniform(Temp div 2). diff --git a/test/bitbucket_api_SUITE.erl b/test/bitbucket_api_SUITE.erl index 2f6f5da..8d6d0e8 100644 --- a/test/bitbucket_api_SUITE.erl +++ b/test/bitbucket_api_SUITE.erl @@ -13,6 +13,9 @@ , get_wz_branch_reviewers/1 , get_wz_branch_reviewers_when_none_configured/1 , get_wz_branch_reviewers_with_mandatory/1 + + , test_retry_on_rate_limiting/1 + , test_max_retries_exceeded/1 ]). -include_lib("common_test/include/ct.hrl"). @@ -22,6 +25,7 @@ -define(REPO_SLUG , "a_repo"). init_per_suite(Config) -> + bec_test_utils:init_logging(), {ok, Started} = application:ensure_all_started(bec), ok = meck:new(httpc, [unlink]), [{started, Started}|Config]. @@ -69,6 +73,31 @@ init_per_testcase(get_wz_branch_reviewers_with_mandatory, Config) -> {ok, {{"1.1", 200, "OK"}, [], Body}} end, ok = meck:expect(httpc, request, Fun), + Config; +init_per_testcase(test_retry_on_rate_limiting, Config) -> + FunRateLimited = fun(_, _, _, _) -> + {ok, {{"1.1", 429, "Rate limited"}, [], rate_limited_body()}} + end, + FunOk = fun(_, _, _, _) -> + {ok, {{"1.1", 200, "OK"}, [], ""}} + end, + + NumRateLimits = 4, + + ok = meck:expect(httpc, request, 4, + meck:seq(lists:duplicate(NumRateLimits, FunRateLimited) ++ [FunOk])), + Config; +init_per_testcase(test_max_retries_exceeded, Config) -> + FunRateLimited = fun(_, _, _, _) -> + {ok, {{"1.1", 429, "Rate limited"}, [], rate_limited_body()}} + end, + FunOk = fun(_, _, _, _) -> + {ok, {{"1.1", 200, "OK"}, [], ""}} + end, + + NumRateLimits = 4, + ok = meck:expect(httpc, request, 4, + meck:seq(lists:duplicate(NumRateLimits, FunRateLimited) ++ [FunOk])), Config. end_per_testcase(_TestCase, _Config) -> @@ -82,6 +111,8 @@ all() -> , get_wz_branch_reviewers , get_wz_branch_reviewers_when_none_configured , get_wz_branch_reviewers_with_mandatory + , test_retry_on_rate_limiting + , test_max_retries_exceeded ]. get_default_branch(_Config) -> @@ -164,3 +195,15 @@ get_wz_branch_reviewers_with_mandatory(_Config) -> body(Config, TestCase) -> DataDir = ?config(data_dir, Config), file:read_file(filename:join([DataDir, atom_to_list(TestCase) ++ ".json"])). + +test_retry_on_rate_limiting(_Config) -> + Result = bitbucket:set_default_branch(?PROJECT_KEY, ?REPO_SLUG, "develop"), + ?assertEqual(ok, Result). + +test_max_retries_exceeded(_Config) -> + application:set_env(bec, max_retries, 2), + Result = bitbucket:set_default_branch(?PROJECT_KEY, ?REPO_SLUG, "develop"), + ?assertMatch({error, [<<"Rate limited">>]}, Result). + +rate_limited_body() -> + <<"{\"errors\": [{\"message\": \"Rate limited\"}]}">>.