Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

phc: test multiple unhealthy endpoints #3058

Merged
merged 10 commits into from
May 7, 2024
201 changes: 151 additions & 50 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package proxy

import (
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -10,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zalando/skipper/metrics/metricstest"
zhttptest "github.com/zalando/skipper/net/httptest"
"github.com/zalando/skipper/routing"
)

Expand All @@ -22,8 +25,8 @@ func defaultEndpointRegistry() *routing.EndpointRegistry {
return routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 10,
MaxHealthCheckDropProbability: 1.0,
MinRequests: 2,
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
}
Expand All @@ -49,8 +52,18 @@ func sendGetRequests(t *testing.T, ps *httptest.Server) (failed int) {
return
}

func fireVegeta(t *testing.T, ps *httptest.Server, freq int, per time.Duration, timeout time.Duration) *zhttptest.VegetaAttacker {
t.Helper()
va := zhttptest.NewVegetaAttacker(ps.URL, freq, per, timeout)
va.Attack(io.Discard, 5*time.Second, t.Name())
return va
}

func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.Server) {
endpointRegistry := defaultEndpointRegistry()
return setupProxyWithCustomEndpointRegisty(t, doc, defaultEndpointRegistry())
}

func setupProxyWithCustomEndpointRegisty(t *testing.T, doc string, endpointRegistry *routing.EndpointRegistry) (*metricstest.MockMetrics, *httptest.Server) {
m := &metricstest.MockMetrics{}

tp, err := newTestProxyWithParams(doc, Params{
Expand All @@ -68,20 +81,33 @@ func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.S
return m, ps
}

func TestPHCWithoutRequests(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
func setupServices(t *testing.T, healthy, unhealthy int) []string {
services := []string{}
for i := 0; i < healthy; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
services = append(services, service.URL)
t.Cleanup(service.Close)
}
for i := 0; i < unhealthy; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
w.WriteHeader(http.StatusOK)
}))
services = append(services, service.URL)
t.Cleanup(service.Close)
}
return services
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
}

func TestPHCWithoutRequests(t *testing.T) {
services := setupServices(t, 3, 0)

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
algorithm, services[0], services[1], services[2]))
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -93,7 +119,7 @@ func TestPHCWithoutRequests(t *testing.T) {

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
services[0], services[1], services[2]))
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -104,13 +130,10 @@ func TestPHCWithoutRequests(t *testing.T) {
}

func TestPHCForSingleHealthyEndpoint(t *testing.T) {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer service.Close()
service := setupServices(t, 1, 0)[0]
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> "%s"`, service.URL)
doc := fmt.Sprintf(`* -> "%s"`, service)
tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
Expand All @@ -128,63 +151,141 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) {
}

func TestPHCForMultipleHealthyEndpoints(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
}
services := setupServices(t, 3, 0)

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
algorithm, services[0], services[1], services[2]))
va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
count200, ok := va.CountStatus(200)
assert.True(t, ok)
assert.InDelta(t, count200, 15000, 50) // 3000*5s, the delta is for CI
assert.Equal(t, count200, int(va.TotalRequests()))
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
})
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
services[0], services[1], services[2]))
va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)
count200, ok := va.CountStatus(200)
assert.True(t, ok)
assert.Equal(t, count200, int(va.TotalRequests()))
})
}

func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
serviceNum := i
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if serviceNum == 0 {
// emulating unhealthy endpoint
time.Sleep(100 * time.Millisecond)
}
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
}

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
services := setupServices(t, 2, 1)
for _, algorithm := range []string{"random", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
algorithm, services[0], services[1], services[2]))

va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)

count200, ok := va.CountStatus(200)
assert.True(t, ok)

count504, ok := va.CountStatus(504)
assert.True(t, ok)

failedRequests := math.Abs(float64(va.TotalRequests())) - float64(count200)
t.Logf("total requests: %d, count200: %d, count504: %d, failedRequests: %f", va.TotalRequests(), count200, count504, failedRequests)

assert.InDelta(t, float64(count504), failedRequests, 5)
assert.InDelta(t, 0, float64(failedRequests), 0.1*float64(va.TotalRequests()))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
assert.InDelta(t, float64(va.TotalRequests()), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(va.TotalRequests())) // allow 30% error
})
})
}

t.Run("consistentHash", func(t *testing.T) {
Copy link
Member Author

@MustafaSaber MustafaSaber May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsistentHash was separated and using old approach because it's a little bit tricky to set headers for different set of requests with Vegeta, we use the hash to map the request to endpoint.

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <consistentHash, "%s", "%s", "%s">`,
services[0], services[1], services[2]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
})
})

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0], services[1], services[2]))
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
})
})
}

func TestPHCForMultipleHealthyAndMultipleUnhealthyEndpoints(t *testing.T) {
services := setupServices(t, 2, 2)
for _, algorithm := range []string{"random", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s", "%s">`,
algorithm, services[0], services[1], services[2], services[3]))

MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)

count200, ok := va.CountStatus(200)
assert.True(t, ok)

count504, ok := va.CountStatus(504)
assert.True(t, ok)

failedRequests := math.Abs(float64(va.TotalRequests())) - float64(count200)
t.Logf("total requests: %d, count200: %d, count504: %d, failedRequests: %f", va.TotalRequests(), count200, count504, failedRequests)

assert.InDelta(t, float64(count504), failedRequests, 5)
assert.InDelta(t, 0, float64(failedRequests), 0.3*float64(va.TotalRequests()))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, 2*float64(va.TotalRequests()), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(va.TotalRequests()))
})
})
}

t.Run("consistentHash", func(t *testing.T) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions and -race
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <consistentHash, "%s", "%s", "%s", "%s">`,
services[0], services[1], services[2], services[3]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.2*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, 2*float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(nRequests))
})
})

t.Run("consistent hash with balance factor", func(t *testing.T) {
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
PassiveHealthCheckEnabled: true,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions and -race
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s", "%s">`,
services[0], services[1], services[2], services[3]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.2*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, 2*float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(nRequests))
})
})
}
Loading