Skip to content

Commit

Permalink
Use vegeta with all algorithms except consistentHash as it\'s a bit…
Browse files Browse the repository at this point in the history
… tricky to set different headers for attacks

Signed-off-by: Mustafa Abdelrahman <[email protected]>
  • Loading branch information
MustafaSaber committed May 6, 2024
1 parent 4d4f898 commit 3b66e9c
Showing 1 changed file with 102 additions and 39 deletions.
141 changes: 102 additions & 39 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package proxy

import (
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -10,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zalando/skipper/metrics/metricstest"
zhttptest "github.com/zalando/skipper/net/httptest"
"github.com/zalando/skipper/routing"
)

Expand All @@ -22,8 +25,8 @@ func defaultEndpointRegistry() *routing.EndpointRegistry {
return routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 10,
MaxHealthCheckDropProbability: 1.0,
MinRequests: 2,
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
}
Expand All @@ -49,6 +52,13 @@ func sendGetRequests(t *testing.T, ps *httptest.Server) (failed int) {
return
}

func fireVegeta(t *testing.T, ps *httptest.Server, ferq int, per time.Duration, timeout time.Duration) *zhttptest.VegetaAttacker {
t.Helper()
va := zhttptest.NewVegetaAttacker(ps.URL, ferq, per, timeout)
va.Attack(io.Discard, 5*time.Second, t.Name())
return va
}

func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.Server) {
return setupProxyWithCustomEndpointRegisty(t, doc, defaultEndpointRegistry())
}
Expand All @@ -71,21 +81,21 @@ func setupProxyWithCustomEndpointRegisty(t *testing.T, doc string, endpointRegis
return m, ps
}

func setupServices(t *testing.T, healthy, unhealthy int) []*httptest.Server {
services := []*httptest.Server{}
func setupServices(t *testing.T, healthy, unhealthy int) []string {
services := []string{}
for i := 0; i < healthy; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
services = append(services, service.URL)
t.Cleanup(service.Close)
}
for i := 0; i < unhealthy; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
services = append(services, service.URL)
t.Cleanup(service.Close)
}
return services
Expand All @@ -97,7 +107,7 @@ func TestPHCWithoutRequests(t *testing.T) {
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
algorithm, services[0], services[1], services[2]))
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -109,7 +119,7 @@ func TestPHCWithoutRequests(t *testing.T) {

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
services[0], services[1], services[2]))
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -123,7 +133,7 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) {
service := setupServices(t, 1, 0)[0]
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> "%s"`, service.URL)
doc := fmt.Sprintf(`* -> "%s"`, service)
tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
Expand All @@ -146,76 +156,129 @@ func TestPHCForMultipleHealthyEndpoints(t *testing.T) {
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
algorithm, services[0], services[1], services[2]))
va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)
count200, ok := va.CountStatus(200)
assert.True(t, ok)
assert.Equal(t, count200, int(va.TotalRequests()))
})
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
services[0], services[1], services[2]))
va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)
count200, ok := va.CountStatus(200)
assert.True(t, ok)
assert.Equal(t, count200, int(va.TotalRequests()))
})
}

func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) {
services := setupServices(t, 2, 1)
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
for _, algorithm := range []string{"random", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
algorithm, services[0], services[1], services[2]))

va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)

count200, ok := va.CountStatus(200)
assert.True(t, ok)

count504, ok := va.CountStatus(504)
assert.True(t, ok)

failedRequests := math.Abs(float64(va.TotalRequests())) - float64(count200)
t.Logf("total requests: %d, count200: %d, count504: %d, failedRequests: %f", va.TotalRequests(), count200, count504, failedRequests)

assert.Equal(t, float64(count504), failedRequests)
assert.InDelta(t, 0, float64(failedRequests), 0.1*float64(va.TotalRequests()))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
assert.InDelta(t, float64(va.TotalRequests()), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(va.TotalRequests())) // allow 60% error
})
})
}

t.Run("consistentHash", func(t *testing.T) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <consistentHash, "%s", "%s", "%s">`,
services[0], services[1], services[2]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(nRequests)) // allow 60% error
})
})

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
services[0], services[1], services[2]))
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
})
}

func TestPHCForMultipleHealthyAndMultipleUnhealthyEndpoints(t *testing.T) {
services := setupServices(t, 2, 2)
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
for _, algorithm := range []string{"random", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s", "%s">`,
algorithm, services[0], services[1], services[2], services[3]))

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 1, // with 3 test case fails
MaxHealthCheckDropProbability: 1.0,
MinHealthCheckDropProbability: 0.01,
})
va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)

mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL, services[3].URL), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
count200, ok := va.CountStatus(200)
assert.True(t, ok)

count504, ok := va.CountStatus(504)
assert.True(t, ok)

failedRequests := math.Abs(float64(va.TotalRequests())) - float64(count200)
t.Logf("total requests: %d, count200: %d, count504: %d, failedRequests: %f", va.TotalRequests(), count200, count504, failedRequests)

assert.Equal(t, float64(count504), failedRequests)
assert.InDelta(t, 0, float64(failedRequests), 0.3*float64(va.TotalRequests()))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests)*2.0, float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
assert.InDelta(t, 2*float64(va.TotalRequests()), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(va.TotalRequests()))
})
})
}

t.Run("consistentHash", func(t *testing.T) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions and -race
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <consistentHash, "%s", "%s", "%s", "%s">`,
services[0], services[1], services[2], services[3]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.2*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, 2*float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(nRequests))
})
})

t.Run("consistent hash with balance factor", func(t *testing.T) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 1, // with 3 test case fails
MaxHealthCheckDropProbability: 1.0,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions and -race
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
_, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL, services[3].URL), endpointRegistry)
services[0], services[1], services[2], services[3]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
assert.InDelta(t, 0, failedReqs, 0.2*float64(nRequests))
})
}

0 comments on commit 3b66e9c

Please sign in to comment.