From 298a529b393f1d5900b69d8c0b38b087501257c7 Mon Sep 17 00:00:00 2001 From: Roman Zavodskikh Date: Tue, 26 Mar 2024 14:54:42 +0100 Subject: [PATCH] Fixed PHC for consistentHash with balance factor (#2998) * Added unit test for PHC together with consistentHash with balance factor Signed-off-by: Roman Zavodskikh * Fixed PHC for consistentHash with balance factor Signed-off-by: Roman Zavodskikh --------- Signed-off-by: Roman Zavodskikh Co-authored-by: Roman Zavodskikh --- loadbalancer/algorithm.go | 2 +- proxy/healthy_endpoints_test.go | 158 ++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 62 deletions(-) diff --git a/loadbalancer/algorithm.go b/loadbalancer/algorithm.go index 1c15520c85..a51551ab74 100644 --- a/loadbalancer/algorithm.go +++ b/loadbalancer/algorithm.go @@ -181,7 +181,7 @@ func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, c if skipEndpoint(ctx, endpointIndex) { continue } - load := ctx.LBEndpoints[endpointIndex].Metrics.InflightRequests() + load := ctx.Route.LBEndpoints[endpointIndex].Metrics.InflightRequests() // We know there must be an endpoint whose load <= average load. // Since targetLoad >= average load (balancerFactor >= 1), there must also be an endpoint with load <= targetLoad. if float64(load) <= targetLoad { diff --git a/proxy/healthy_endpoints_test.go b/proxy/healthy_endpoints_test.go index 908ec4f389..ac7c7d427c 100644 --- a/proxy/healthy_endpoints_test.go +++ b/proxy/healthy_endpoints_test.go @@ -63,30 +63,42 @@ func TestPHCWithoutRequests(t *testing.T) { } for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} { - t.Run(algorithm, func(t *testing.T) { - endpointRegistry := defaultEndpointRegistry() - doc := fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`, - algorithm, services[0].URL, services[1].URL, services[2].URL) - - tp, err := newTestProxyWithParams(doc, Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, - }) - if err != nil { - t.Fatal(err) - } - defer tp.close() - - ps := httptest.NewServer(tp.proxy) - defer ps.Close() - - rsp := sendGetRequest(t, ps, 0) - assert.Equal(t, http.StatusOK, rsp.StatusCode) - rsp.Body.Close() + balanceFactors := []bool{false} + if algorithm == "consistentHash" { + balanceFactors = []bool{false, true} + } - time.Sleep(10 * period) - /* this test is needed to check PHC will not crash without requests sent during period at all */ - }) + for _, balanceFactor := range balanceFactors { + t.Run(fmt.Sprintf("%s_%t", algorithm, balanceFactor), func(t *testing.T) { + endpointRegistry := defaultEndpointRegistry() + + balanceFactorStr := "" + if balanceFactor { + balanceFactorStr = ` -> consistentHashBalanceFactor(1.25)` + } + doc := fmt.Sprintf(`* %s -> <%s, "%s", "%s", "%s">`, + balanceFactorStr, algorithm, services[0].URL, services[1].URL, services[2].URL) + + tp, err := newTestProxyWithParams(doc, Params{ + EnablePassiveHealthCheck: true, + EndpointRegistry: endpointRegistry, + }) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + rsp := sendGetRequest(t, ps, 0) + assert.Equal(t, http.StatusOK, rsp.StatusCode) + rsp.Body.Close() + + time.Sleep(10 * period) + /* this test is needed to check PHC will not crash without requests sent during period at all */ + }) + } } } @@ -125,26 +137,38 @@ func TestPHCForMultipleHealthyEndpoints(t *testing.T) { } for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} { - t.Run(algorithm, func(t *testing.T) { - endpointRegistry := defaultEndpointRegistry() - doc := fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`, - algorithm, services[0].URL, services[1].URL, services[2].URL) - - tp, err := newTestProxyWithParams(doc, Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, - }) - if err != nil { - t.Fatal(err) - } - defer tp.close() - - ps := httptest.NewServer(tp.proxy) - defer ps.Close() + balanceFactors := []bool{false} + if algorithm == "consistentHash" { + balanceFactors = []bool{false, true} + } - failedReqs := sendGetRequests(t, ps) - assert.Equal(t, 0, failedReqs) - }) + for _, balanceFactor := range balanceFactors { + t.Run(fmt.Sprintf("%s_%t", algorithm, balanceFactor), func(t *testing.T) { + endpointRegistry := defaultEndpointRegistry() + + balanceFactorStr := "" + if balanceFactor { + balanceFactorStr = ` -> consistentHashBalanceFactor(1.25)` + } + doc := fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") %s -> <%s, "%s", "%s", "%s">`, + balanceFactorStr, algorithm, services[0].URL, services[1].URL, services[2].URL) + + tp, err := newTestProxyWithParams(doc, Params{ + EnablePassiveHealthCheck: true, + EndpointRegistry: endpointRegistry, + }) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + failedReqs := sendGetRequests(t, ps) + assert.Equal(t, 0, failedReqs) + }) + } } } @@ -164,25 +188,37 @@ func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) { } for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} { - t.Run(algorithm, func(t *testing.T) { - endpointRegistry := defaultEndpointRegistry() - doc := fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`, - algorithm, services[0].URL, services[1].URL, services[2].URL) - - tp, err := newTestProxyWithParams(doc, Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, - }) - if err != nil { - t.Fatal(err) - } - defer tp.close() - - ps := httptest.NewServer(tp.proxy) - defer ps.Close() + balanceFactors := []bool{false} + if algorithm == "consistentHash" { + balanceFactors = []bool{false, true} + } - failedReqs := sendGetRequests(t, ps) - assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*float64(nRequests), failedReqs, 0.1*float64(nRequests)) - }) + for _, balanceFactor := range balanceFactors { + t.Run(fmt.Sprintf("%s_%t", algorithm, balanceFactor), func(t *testing.T) { + endpointRegistry := defaultEndpointRegistry() + + balanceFactorStr := "" + if balanceFactor { + balanceFactorStr = ` -> consistentHashBalanceFactor(1.25)` + } + doc := fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") %s -> <%s, "%s", "%s", "%s">`, + balanceFactorStr, algorithm, services[0].URL, services[1].URL, services[2].URL) + + tp, err := newTestProxyWithParams(doc, Params{ + EnablePassiveHealthCheck: true, + EndpointRegistry: endpointRegistry, + }) + if err != nil { + t.Fatal(err) + } + defer tp.close() + + ps := httptest.NewServer(tp.proxy) + defer ps.Close() + + failedReqs := sendGetRequests(t, ps) + assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*float64(nRequests), failedReqs, 0.1*float64(nRequests)) + }) + } } }