Skip to content

Commit

Permalink
Store target origin in request context (#659)
Browse files Browse the repository at this point in the history
Origin ID stored under key "styx.originid" as type com.hotels.styx.api.Id
  • Loading branch information
chrisgresty authored May 14, 2020
1 parent 53b7053 commit 13dbb2a
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ private Collection<RemoteHost> pools(OriginState state) {
return origins.values().stream()
.filter(origin -> origin.state().equals(state))
.map(origin -> {
HttpHandler hostClient = (request, context) -> new Eventual<>(origin.hostClient.sendRequest(request));
HttpHandler hostClient = (request, context) -> new Eventual<>(origin.hostClient.sendRequest(request, context));
return remoteHost(origin.origin, hostClient, origin.hostClient);
})
.collect(toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2019 Expedia Inc.
Copyright (C) 2013-2020 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/
package com.hotels.styx.client;

import com.hotels.styx.api.HttpInterceptor.Context;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.ResponseEventListener;
Expand All @@ -30,6 +31,8 @@
* A Styx HTTP Client for proxying to an individual origin host.
*/
public class StyxHostHttpClient implements LoadBalancingMetricSupplier {
public static final String ORIGINID_CONTEXT_KEY = "styx.originid";

private final ConnectionPool pool;

StyxHostHttpClient(ConnectionPool pool) {
Expand All @@ -40,7 +43,10 @@ public static StyxHostHttpClient create(ConnectionPool pool) {
return new StyxHostHttpClient(pool);
}

public Publisher<LiveHttpResponse> sendRequest(LiveHttpRequest request) {
public Publisher<LiveHttpResponse> sendRequest(LiveHttpRequest request, Context context) {
if (context != null) {
context.add(ORIGINID_CONTEXT_KEY, pool.getOrigin().id());
}
return Flux.from(pool.borrowConnection())
.flatMap(connection -> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.net.HostAndPort;
import com.hotels.styx.api.Eventual;
import com.hotels.styx.api.HttpHandler;
import com.hotels.styx.api.HttpInterceptor.Context;
import com.hotels.styx.api.Id;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
Expand Down Expand Up @@ -112,7 +113,7 @@ public void sendsRequestToHostChosenByLoadBalancer() {
LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block();

assertThat(response.status(), is(OK));
verify(hostClient).sendRequest(eq(SOME_REQ));
verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class));
}

@Test
Expand Down Expand Up @@ -188,8 +189,8 @@ public void retriesWhenRetryPolicyTellsToRetry() {
assertThat(response.status(), is(OK));

InOrder ordered = inOrder(firstClient, secondClient);
ordered.verify(firstClient).sendRequest(eq(SOME_REQ));
ordered.verify(secondClient).sendRequest(eq(SOME_REQ));
ordered.verify(firstClient).sendRequest(eq(SOME_REQ), any(Context.class));
ordered.verify(secondClient).sendRequest(eq(SOME_REQ), any(Context.class));
}

@Test
Expand All @@ -213,9 +214,9 @@ public void stopsRetriesWhenRetryPolicyTellsToStop() {
.verifyError(OriginUnreachableException.class);

InOrder ordered = inOrder(firstClient, secondClient, thirdClient);
ordered.verify(firstClient).sendRequest(eq(SOME_REQ));
ordered.verify(secondClient).sendRequest(eq(SOME_REQ));
ordered.verify(thirdClient, never()).sendRequest(eq(SOME_REQ));
ordered.verify(firstClient).sendRequest(eq(SOME_REQ), any(Context.class));
ordered.verify(secondClient).sendRequest(eq(SOME_REQ), any(Context.class));
ordered.verify(thirdClient, never()).sendRequest(eq(SOME_REQ), any(Context.class));
}

@Test
Expand All @@ -242,10 +243,10 @@ public void retriesAtMost3Times() {
.verifyError(NoAvailableHostsException.class);

InOrder ordered = inOrder(firstClient, secondClient, thirdClient, fourthClient);
ordered.verify(firstClient).sendRequest(eq(SOME_REQ));
ordered.verify(secondClient).sendRequest(eq(SOME_REQ));
ordered.verify(thirdClient).sendRequest(eq(SOME_REQ));
ordered.verify(fourthClient, never()).sendRequest(any(LiveHttpRequest.class));
ordered.verify(firstClient).sendRequest(eq(SOME_REQ), any(Context.class));
ordered.verify(secondClient).sendRequest(eq(SOME_REQ), any(Context.class));
ordered.verify(thirdClient).sendRequest(eq(SOME_REQ), any(Context.class));
ordered.verify(fourthClient, never()).sendRequest(any(LiveHttpRequest.class), any(Context.class));
}


Expand All @@ -262,7 +263,7 @@ public void incrementsResponseStatusMetricsForBadResponse() {
LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block();

assertThat(response.status(), is(BAD_REQUEST));
verify(hostClient).sendRequest(eq(SOME_REQ));
verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class));
assertThat(metricRegistry.counter("origins.response.status.400").getCount(), is(1L));
}

Expand All @@ -280,7 +281,7 @@ public void incrementsResponseStatusMetricsFor401() {
LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block();

assertThat(response.status(), is(UNAUTHORIZED));
verify(hostClient).sendRequest(eq(SOME_REQ));
verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class));
assertThat(metricRegistry.counter("origins.response.status.401").getCount(), is(1L));
}

Expand All @@ -298,7 +299,7 @@ public void incrementsResponseStatusMetricsFor500() {
LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block();

assertThat(response.status(), is(INTERNAL_SERVER_ERROR));
verify(hostClient).sendRequest(eq(SOME_REQ));
verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class));
assertThat(metricRegistry.counter("origins.response.status.500").getCount(), is(1L));
}

Expand All @@ -315,7 +316,7 @@ public void incrementsResponseStatusMetricsFor501() {

LiveHttpResponse response = Mono.from(styxHttpClient.sendRequest(SOME_REQ, requestContext())).block();
assertThat(response.status(), is(NOT_IMPLEMENTED));
verify(hostClient).sendRequest(SOME_REQ);
verify(hostClient).sendRequest(eq(SOME_REQ), any(Context.class));
assertThat(metricRegistry.counter("origins.response.status.501").getCount(), is(1L));
}

Expand Down Expand Up @@ -452,7 +453,7 @@ public void prefersRestrictedOriginsOverStickyOriginsWhenBothAreConfigured() {
}

private HttpHandler toHandler(StyxHostHttpClient hostClient) {
return (request, ctx) -> new Eventual<>(hostClient.sendRequest(request));
return (request, ctx) -> new Eventual<>(hostClient.sendRequest(request, ctx));
}

private RetryPolicy mockRetryPolicy(Boolean first, Boolean... outcomes) {
Expand Down Expand Up @@ -485,7 +486,7 @@ private LoadBalancer mockLoadBalancer(Optional<RemoteHost> first, Optional<Remot

private StyxHostHttpClient mockHostClient(Publisher<LiveHttpResponse> responsePublisher) {
StyxHostHttpClient secondClient = mock(StyxHostHttpClient.class);
when(secondClient.sendRequest(any(LiveHttpRequest.class))).thenReturn(responsePublisher);
when(secondClient.sendRequest(any(LiveHttpRequest.class), any(Context.class))).thenReturn(responsePublisher);
return secondClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

import com.hotels.styx.api.Buffer;
import com.hotels.styx.api.ByteStream;
import com.hotels.styx.api.HttpInterceptor;
import com.hotels.styx.api.HttpInterceptor.Context;
import com.hotels.styx.api.HttpRequest;
import com.hotels.styx.api.HttpResponse;
import com.hotels.styx.api.Id;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.extension.Origin;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import com.hotels.styx.server.HttpInterceptorContext;
import com.hotels.styx.support.Support;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
Expand All @@ -34,6 +40,8 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.hotels.styx.api.HttpResponseStatus.OK;
import static com.hotels.styx.client.StyxHostHttpClient.ORIGINID_CONTEXT_KEY;
import static com.hotels.styx.support.Support.requestContext;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -62,17 +70,19 @@ public void setUp() {
public void returnsConnectionBackToPool() {
Connection connection = mockConnection(just(response));
ConnectionPool pool = mockPool(connection);
Context context = mockContext();

StyxHostHttpClient hostClient = new StyxHostHttpClient(pool);

StepVerifier.create(hostClient.sendRequest(request))
StepVerifier.create(hostClient.sendRequest(request, context))
.consumeNextWith(response -> response.consume())
.expectComplete()
.verify();

verify(pool).borrowConnection();
verify(connection).write(any(LiveHttpRequest.class));
verify(pool).returnConnection(any(Connection.class));
verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin"));
}

@Test
Expand All @@ -82,13 +92,14 @@ public void ignoresCancelledHeaders() {
// been published.
Connection connection = mockConnection(just(response));
ConnectionPool pool = mockPool(connection);
Context context = mockContext();
AtomicReference<LiveHttpResponse> transformedResponse = new AtomicReference<>();

StyxHostHttpClient hostClient = new StyxHostHttpClient(pool);

// The StepVerifier consumes the response event and then unsubscribes
// from the response observable.
StepVerifier.create(hostClient.sendRequest(request))
StepVerifier.create(hostClient.sendRequest(request, context))
.consumeNextWith(transformedResponse::set)
.verifyComplete();

Expand All @@ -101,17 +112,20 @@ public void ignoresCancelledHeaders() {

// Finally, the connection is returned after the response body is fully consumed:
verify(pool).returnConnection(any(Connection.class));

verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin"));
}

@Test
public void releasesIfRequestIsCancelledBeforeHeaders() {
Connection connection = mockConnection(EmitterProcessor.create());
ConnectionPool pool = mockPool(connection);
Context context = mockContext();

StyxHostHttpClient hostClient = new StyxHostHttpClient(pool);
AtomicReference<Subscription> subscription = new AtomicReference<>();

Flux.from(hostClient.sendRequest(request))
Flux.from(hostClient.sendRequest(request, context))
.subscribe(new BaseSubscriber<LiveHttpResponse>() {
@Override
protected void hookOnSubscribe(Subscription s) {
Expand All @@ -123,17 +137,19 @@ protected void hookOnSubscribe(Subscription s) {

subscription.get().cancel();
verify(pool).closeConnection(any(Connection.class));
verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin"));
}

@Test
public void ignoresResponseObservableErrorsAfterHeaders() {
Connection connection = mockConnection(responseProvider);
ConnectionPool pool = mockPool(connection);
Context context = mockContext();
AtomicReference<LiveHttpResponse> newResponse = new AtomicReference<>();

StyxHostHttpClient hostClient = new StyxHostHttpClient(pool);

StepVerifier.create(hostClient.sendRequest(request))
StepVerifier.create(hostClient.sendRequest(request, context))
.then(() -> {
responseProvider.onNext(response);
responseProvider.onError(new RuntimeException("oh dear ..."));
Expand All @@ -145,50 +161,56 @@ public void ignoresResponseObservableErrorsAfterHeaders() {
newResponse.get().consume();

verify(pool).returnConnection(any(Connection.class));
verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin"));
}

@Test
public void terminatesConnectionWhenResponseObservableCompletesWithoutHeaders() {
// A connection that yields no response:
Connection connection = mockConnection(Flux.empty());
ConnectionPool pool = mockPool(connection);
Context context = mockContext();

StyxHostHttpClient hostClient = new StyxHostHttpClient(pool);

StepVerifier.create(hostClient.sendRequest(request))
StepVerifier.create(hostClient.sendRequest(request, context))
.expectNextCount(0)
.expectComplete()
.log()
.verify();

verify(pool).closeConnection(any(Connection.class));
verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin"));
}

@Test
public void releasesConnectionWhenResponseFailsBeforeHeaders() {
Connection connection = mockConnection(Flux.error(new RuntimeException()));
ConnectionPool pool = mockPool(connection);
Context context = mockContext();

StyxHostHttpClient hostClient = new StyxHostHttpClient(pool);

StepVerifier.create(hostClient.sendRequest(request))
StepVerifier.create(hostClient.sendRequest(request, context))
.expectNextCount(0)
.expectError()
.verify();

verify(pool).closeConnection(any(Connection.class));
verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin"));
}

@Test
public void terminatesConnectionDueToUnsubscribedBody() {
TestPublisher<Buffer> testPublisher = TestPublisher.create();
Connection connection = mockConnection(just(LiveHttpResponse.response(OK).body(new ByteStream(testPublisher)).build()));
ConnectionPool pool = mockPool(connection);
Context context = mockContext();
AtomicReference<LiveHttpResponse> receivedResponse = new AtomicReference<>();

StyxHostHttpClient hostClient = new StyxHostHttpClient(pool);

StepVerifier.create(hostClient.sendRequest(request))
StepVerifier.create(hostClient.sendRequest(request, context))
.consumeNextWith(receivedResponse::set)
.expectComplete()
.verify();
Expand All @@ -198,6 +220,7 @@ public void terminatesConnectionDueToUnsubscribedBody() {
.verify();

verify(pool).closeConnection(any(Connection.class));
verify(context).add(ORIGINID_CONTEXT_KEY, Id.id("mockorigin"));
}

@Test
Expand All @@ -219,6 +242,18 @@ Connection mockConnection(Flux<LiveHttpResponse> responseObservable) {
ConnectionPool mockPool(Connection connection) {
ConnectionPool pool = mock(ConnectionPool.class);
when(pool.borrowConnection()).thenReturn(Flux.just(connection));
Origin origin = mockOrigin("mockorigin");
when(pool.getOrigin()).thenReturn(origin);
return pool;
}

Origin mockOrigin(String id) {
Origin origin = mock(Origin.class);
when(origin.id()).thenReturn(Id.id(id));
return origin;
}

HttpInterceptor.Context mockContext() {
return mock(HttpInterceptor.Context.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public HostProxy(String host, int port, StyxHostHttpClient client, OriginMetrics
public Eventual<LiveHttpResponse> handle(LiveHttpRequest request, HttpInterceptor.Context context) {
if (active) {
return new Eventual<>(
ResponseEventListener.from(client.sendRequest(request))
ResponseEventListener.from(client.sendRequest(request, context))
.whenCancelled(originMetrics::requestCancelled)
.apply());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.hotels.styx.Environment;
import com.hotels.styx.StyxConfig;
import com.hotels.styx.api.HttpInterceptor.Context;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.configuration.Configuration.MapBackedConfiguration;
Expand Down Expand Up @@ -186,7 +187,7 @@ public void usesTheOriginSpecifiedInTheOriginsRestrictionCookie() {

private StyxHostHttpClient hostClient(LiveHttpResponse response) {
StyxHostHttpClient mockClient = mock(StyxHostHttpClient.class);
when(mockClient.sendRequest(any(LiveHttpRequest.class))).thenReturn(Flux.just(response));
when(mockClient.sendRequest(any(LiveHttpRequest.class), any(Context.class))).thenReturn(Flux.just(response));
when(mockClient.loadBalancingMetric()).thenReturn(new LoadBalancingMetric(1));
return mockClient;
}
Expand Down
Loading

0 comments on commit 13dbb2a

Please sign in to comment.