Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

phc: test multiple unhealthy endpoints #3058

Merged
merged 10 commits into from
May 7, 2024
111 changes: 69 additions & 42 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func sendGetRequests(t *testing.T, ps *httptest.Server) (failed int) {
return
}

func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.Server) {
endpointRegistry := defaultEndpointRegistry()
func setupProxyWithCustomEndpointRegisty(t *testing.T, doc string, endpointRegistry *routing.EndpointRegistry) (*metricstest.MockMetrics, *httptest.Server) {
m := &metricstest.MockMetrics{}

tp, err := newTestProxyWithParams(doc, Params{
Expand All @@ -68,20 +67,33 @@ func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.S
return m, ps
}

func TestPHCWithoutRequests(t *testing.T) {
func setupServices(t *testing.T, healthy, unhealthy int) []*httptest.Server {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
for i := 0; i < healthy; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
t.Cleanup(service.Close)
}
for i := 0; i < unhealthy; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
t.Cleanup(service.Close)
}
return services
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
}

func TestPHCWithoutRequests(t *testing.T) {
services := setupServices(t, 3, 0)

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
_, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL), defaultEndpointRegistry())
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -92,8 +104,8 @@ func TestPHCWithoutRequests(t *testing.T) {
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
_, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL), defaultEndpointRegistry())
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -104,10 +116,7 @@ func TestPHCWithoutRequests(t *testing.T) {
}

func TestPHCForSingleHealthyEndpoint(t *testing.T) {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer service.Close()
service := setupServices(t, 1, 0)[0]
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> "%s"`, service.URL)
Expand All @@ -128,62 +137,80 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) {
}

func TestPHCForMultipleHealthyEndpoints(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
}
services := setupServices(t, 3, 0)

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
_, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL), defaultEndpointRegistry())
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
})
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
_, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL), defaultEndpointRegistry())
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
})
}

func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
serviceNum := i
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if serviceNum == 0 {
// emulating unhealthy endpoint
time.Sleep(100 * time.Millisecond)
}
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
services := setupServices(t, 2, 1)
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL), defaultEndpointRegistry())
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
})
})
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL), defaultEndpointRegistry())
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
})
}

func TestPHCForMultipleHealthyAndMultipleUnhealthyEndpoints(t *testing.T) {
services := setupServices(t, 2, 2)
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))

MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 2, // with 3 test case fails
MaxHealthCheckDropProbability: 1.0,
MinHealthCheckDropProbability: 0.01,
})

mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL, services[3].URL), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
assert.InDelta(t, float64(nRequests)*2.0, float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
})
})
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
_, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 2, // with 3 test case fails
MaxHealthCheckDropProbability: 1.0,
MinHealthCheckDropProbability: 0.01,
})
_, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL, services[3].URL), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
})
Expand Down
Loading