Skip to content

Commit

Permalink
Fixed PHC for consistentHash with balance factor (#2998)
Browse files Browse the repository at this point in the history
* Added unit test for PHC together with consistentHash with balance factor

Signed-off-by: Roman Zavodskikh <[email protected]>

* Fixed PHC for consistentHash with balance factor

Signed-off-by: Roman Zavodskikh <[email protected]>

---------

Signed-off-by: Roman Zavodskikh <[email protected]>
Co-authored-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
RomanZavodskikh and Roman Zavodskikh authored Mar 26, 2024
1 parent 1bfc4c3 commit 298a529
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 62 deletions.
2 changes: 1 addition & 1 deletion loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, c
if skipEndpoint(ctx, endpointIndex) {
continue
}
load := ctx.LBEndpoints[endpointIndex].Metrics.InflightRequests()
load := ctx.Route.LBEndpoints[endpointIndex].Metrics.InflightRequests()
// We know there must be an endpoint whose load <= average load.
// Since targetLoad >= average load (balancerFactor >= 1), there must also be an endpoint with load <= targetLoad.
if float64(load) <= targetLoad {
Expand Down
158 changes: 97 additions & 61 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,42 @@ func TestPHCWithoutRequests(t *testing.T) {
}

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()
doc := fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
balanceFactors := []bool{false}
if algorithm == "consistentHash" {
balanceFactors = []bool{false, true}
}

time.Sleep(10 * period)
/* this test is needed to check PHC will not crash without requests sent during period at all */
})
for _, balanceFactor := range balanceFactors {
t.Run(fmt.Sprintf("%s_%t", algorithm, balanceFactor), func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()

balanceFactorStr := ""
if balanceFactor {
balanceFactorStr = ` -> consistentHashBalanceFactor(1.25)`
}
doc := fmt.Sprintf(`* %s -> <%s, "%s", "%s", "%s">`,
balanceFactorStr, algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()

time.Sleep(10 * period)
/* this test is needed to check PHC will not crash without requests sent during period at all */
})
}
}
}

Expand Down Expand Up @@ -125,26 +137,38 @@ func TestPHCForMultipleHealthyEndpoints(t *testing.T) {
}

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()
doc := fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()
balanceFactors := []bool{false}
if algorithm == "consistentHash" {
balanceFactors = []bool{false, true}
}

failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
})
for _, balanceFactor := range balanceFactors {
t.Run(fmt.Sprintf("%s_%t", algorithm, balanceFactor), func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()

balanceFactorStr := ""
if balanceFactor {
balanceFactorStr = ` -> consistentHashBalanceFactor(1.25)`
}
doc := fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") %s -> <%s, "%s", "%s", "%s">`,
balanceFactorStr, algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
})
}
}
}

Expand All @@ -164,25 +188,37 @@ func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) {
}

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()
doc := fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()
balanceFactors := []bool{false}
if algorithm == "consistentHash" {
balanceFactors = []bool{false, true}
}

failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*float64(nRequests), failedReqs, 0.1*float64(nRequests))
})
for _, balanceFactor := range balanceFactors {
t.Run(fmt.Sprintf("%s_%t", algorithm, balanceFactor), func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()

balanceFactorStr := ""
if balanceFactor {
balanceFactorStr = ` -> consistentHashBalanceFactor(1.25)`
}
doc := fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") %s -> <%s, "%s", "%s", "%s">`,
balanceFactorStr, algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*float64(nRequests), failedReqs, 0.1*float64(nRequests))
})
}
}
}

0 comments on commit 298a529

Please sign in to comment.