Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fabric8 Kubernetes client integration #5167

Merged
merged 19 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions .github/workflows/e2e-chaos-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
name: E2E Tests

on:
push:
branches:
- main
tags-ignore:
# The release versions will be verified by 'publish-release.yml'
- armeria-*
pull_request:

concurrency:
group: ci-e2e-chaos-tests-${{ github.event.pull_request.number || github.sha }}
cancel-in-progress: true

env:
CHAOS_MESH_VERSION: 2.6.2
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GRADLE_ENTERPRISE_ACCESS_KEY }}

jobs:
chaos-tests:
name: Kubernetes Chaos test
runs-on: ubuntu-latest
timeout-minutes: 120
if: github.repository == 'line/armeria'
steps:
- uses: actions/checkout@v4

- id: setup-jdk-19
name: Setup Java 19
uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: 19

- name: Setup Minikube
id: minikube
uses: medyagh/setup-minikube@latest

- name: Install Chaos Mesh
run: |
curl -sSL https://mirrors.chaos-mesh.org/v${CHAOS_MESH_VERSION}/install.sh | bash
kubectl wait --for=condition=Ready pods --all-namespaces --all --timeout=600s
shell: bash

- name: Setup Gradle
uses: gradle/gradle-build-action@v2

- name: Build Chaos test images
run: |
# The images should be built in the minikube docker environment
eval $(minikube -p minikube docker-env)
./gradlew --no-daemon --stacktrace :it:kubernetes-chaos-tests:k8sBuild
shell: bash

- name: Run Chaos Tests - network-delay.yaml
env:
CHAOS_TEST: network-delay.yaml
run: |
./gradlew --no-daemon --stacktrace :it:kubernetes-chaos-tests:test
shell: bash

- name: Run Chaos Tests - network-loss.yaml
env:
CHAOS_TEST: network-loss.yaml
run: |
# --rerun-tasks is required to run the tests because only the environment variable is changed
./gradlew --no-daemon --stacktrace :it:kubernetes-chaos-tests:test --rerun-tasks
shell: bash

- name: Run Chaos Tests - network-duplicate.yaml
env:
CHAOS_TEST: network-duplicate.yaml
run: |
./gradlew --no-daemon --stacktrace :it:kubernetes-chaos-tests:test --rerun-tasks
shell: bash
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.linecorp.armeria.client;

import static com.linecorp.armeria.common.SessionProtocol.httpAndHttpsValues;
import static com.linecorp.armeria.internal.common.ArmeriaHttpUtil.toNettyHttp1ClientHeaders;

import java.lang.reflect.Array;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -64,6 +65,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.ProxyConnectException;
import io.netty.handler.proxy.ProxyHandler;
Expand Down Expand Up @@ -131,10 +133,11 @@
final ConnectProxyConfig connectProxyConfig = (ConnectProxyConfig) proxyConfig;
final String username = connectProxyConfig.username();
final String password = connectProxyConfig.password();
final HttpHeaders proxyHeaders = toNettyHttp1ClientHeaders(connectProxyConfig.headers());
if (username == null || password == null) {
proxyHandler = new HttpProxyHandler(proxyAddress);
proxyHandler = new HttpProxyHandler(proxyAddress, proxyHeaders);
} else {
proxyHandler = new HttpProxyHandler(proxyAddress, username, password);
proxyHandler = new HttpProxyHandler(proxyAddress, username, password, proxyHeaders);

Check warning on line 140 in core/src/main/java/com/linecorp/armeria/client/HttpChannelPool.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/linecorp/armeria/client/HttpChannelPool.java#L140

Added line #L140 was not covered by tests
}
break;
case HAPROXY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.annotation.Nullable;

/**
Expand All @@ -36,13 +37,16 @@ public final class ConnectProxyConfig extends ProxyConfig {
@Nullable
private final String password;

private final HttpHeaders headers;

private final boolean useTls;

ConnectProxyConfig(InetSocketAddress proxyAddress, @Nullable String username,
@Nullable String password, boolean useTls) {
@Nullable String password, HttpHeaders headers, boolean useTls) {
this.proxyAddress = proxyAddress;
this.username = username;
this.password = password;
this.headers = headers;
this.useTls = useTls;
}

Expand All @@ -67,6 +71,13 @@ public String password() {
return password;
}

/**
* Returns the configured {@link HttpHeaders}.
*/
public HttpHeaders headers() {
return headers;
}

/**
* Returns whether ssl is enabled.
*/
Expand All @@ -91,12 +102,13 @@ public boolean equals(@Nullable Object o) {
return useTls == that.useTls &&
proxyAddress.equals(that.proxyAddress) &&
Objects.equals(username, that.username) &&
Objects.equals(password, that.password);
Objects.equals(password, that.password) &&
headers.equals(that.headers);
}

@Override
public int hashCode() {
return Objects.hash(proxyAddress, username, password, useTls);
return Objects.hash(proxyAddress, username, password, headers, useTls);
}

@Override
Expand All @@ -106,6 +118,8 @@ public String toString() {
.add("proxyAddress", proxyAddress())
.add("username", username())
.add("password", maskPassword(username(), password()))
// Headers are omitted since they may contain sensitive information such as
// (Proxy-)Authorization.
.add("useTls", useTls())
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.net.InetSocketAddress;

import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.server.ServiceRequestContext;

/**
Expand Down Expand Up @@ -88,7 +90,7 @@ public static Socks5ProxyConfig socks5(
public static ConnectProxyConfig connect(InetSocketAddress proxyAddress) {
requireNonNull(proxyAddress, "proxyAddress");
checkArgument(!proxyAddress.isUnresolved(), "proxyAddress must be resolved");
return new ConnectProxyConfig(proxyAddress, null, null, false);
return new ConnectProxyConfig(proxyAddress, null, null, HttpHeaders.of(), false);
}

/**
Expand All @@ -100,7 +102,7 @@ public static ConnectProxyConfig connect(InetSocketAddress proxyAddress) {
public static ConnectProxyConfig connect(InetSocketAddress proxyAddress, boolean useTls) {
requireNonNull(proxyAddress, "proxyAddress");
checkArgument(!proxyAddress.isUnresolved(), "proxyAddress must be resolved");
return new ConnectProxyConfig(proxyAddress, null, null, useTls);
return new ConnectProxyConfig(proxyAddress, null, null, HttpHeaders.of(), useTls);
}

/**
Expand All @@ -113,10 +115,42 @@ public static ConnectProxyConfig connect(InetSocketAddress proxyAddress, boolean
*/
public static ConnectProxyConfig connect(
InetSocketAddress proxyAddress, String username, String password, boolean useTls) {
return connect(proxyAddress, username, password, HttpHeaders.of(), useTls);
}

/**
* Creates a {@code ProxyConfig} configuration for CONNECT protocol.
*
* @param proxyAddress the proxy address
* @param headers the {@link HttpHeaders} to send to the proxy
* @param useTls whether to use TLS to connect to the proxy
*/
@UnstableApi
public static ConnectProxyConfig connect(
InetSocketAddress proxyAddress, HttpHeaders headers, boolean useTls) {
requireNonNull(proxyAddress, "proxyAddress");
checkArgument(!proxyAddress.isUnresolved(), "proxyAddress must be resolved");
return new ConnectProxyConfig(proxyAddress, null, null, headers, useTls);
}

/**
* Creates a {@code ProxyConfig} configuration for CONNECT protocol.
*
* @param proxyAddress the proxy address
* @param username the username
* @param password the password
* @param headers the {@link HttpHeaders} to send to the proxy
* @param useTls whether to use TLS to connect to the proxy
*/
@UnstableApi
public static ConnectProxyConfig connect(InetSocketAddress proxyAddress, String username, String password,
HttpHeaders headers, boolean useTls) {
requireNonNull(proxyAddress, "proxyAddress");
checkArgument(!proxyAddress.isUnresolved(), "proxyAddress must be resolved");
return new ConnectProxyConfig(proxyAddress, requireNonNull(username, "username"),
requireNonNull(password, "password"), useTls);
requireNonNull(username, "username");
requireNonNull(password, "password");
requireNonNull(headers, "headers");
return new ConnectProxyConfig(proxyAddress, username, password, headers, useTls);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.ClientRequestContextCaptor;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.RequestOptions;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
Expand Down Expand Up @@ -83,16 +85,17 @@ final class DefaultWebSocketClient implements WebSocketClient {
}

@Override
public CompletableFuture<WebSocketSession> connect(String path) {
public CompletableFuture<WebSocketSession> connect(String path, HttpHeaders headers,
RequestOptions requestOptions) {
requireNonNull(path, "path");
final RequestHeaders requestHeaders = webSocketHeaders(path);
final RequestHeaders requestHeaders = webSocketHeaders(path, headers);

final CompletableFuture<StreamMessage<HttpData>> outboundFuture = new CompletableFuture<>();
final HttpRequest request = HttpRequest.of(requestHeaders, StreamMessage.of(outboundFuture));
final HttpResponse response;
final ClientRequestContext ctx;
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
response = webClient.execute(request);
response = webClient.execute(request, requestOptions);
ctx = captor.get();
}
final SplitHttpResponse split =
Expand Down Expand Up @@ -127,21 +130,27 @@ public CompletableFuture<WebSocketSession> connect(String path) {
return result;
}

private RequestHeaders webSocketHeaders(String path) {
final RequestHeadersBuilder builder;
private RequestHeaders webSocketHeaders(String path, HttpHeaders headers) {
final RequestHeadersBuilder builder = RequestHeaders.builder();
if (!headers.isEmpty()) {
headers.forEach((k, v) -> builder.add(k, v));
}

if (scheme().sessionProtocol().isExplicitHttp2()) {
builder = RequestHeaders.builder(HttpMethod.CONNECT, path)
.set(HttpHeaderNames.PROTOCOL, HttpHeaderValues.WEBSOCKET.toString());
builder.method(HttpMethod.CONNECT)
.path(path)
.set(HttpHeaderNames.PROTOCOL, HttpHeaderValues.WEBSOCKET.toString());
} else {
builder = RequestHeaders.builder(HttpMethod.GET, path)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE.toString())
.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET.toString());
final String secWebSocketKey = generateSecWebSocketKey();
builder.set(HttpHeaderNames.SEC_WEBSOCKET_KEY, secWebSocketKey);
builder.method(HttpMethod.GET)
.path(path)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE.toString())
.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET.toString())
.set(HttpHeaderNames.SEC_WEBSOCKET_KEY, secWebSocketKey);
}

builder.set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, "13");
if (!subprotocols.isEmpty()) {
if (!builder.contains(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL) && !subprotocols.isEmpty()) {
builder.set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, joinedSubprotocols);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package com.linecorp.armeria.client.websocket;

import static com.linecorp.armeria.internal.client.ClientUtil.UNDEFINED_URI;
import static java.util.Objects.requireNonNull;

import java.net.URI;
import java.util.concurrent.CompletableFuture;

import com.linecorp.armeria.client.ClientBuilderParams;
import com.linecorp.armeria.client.ClientOptions;
import com.linecorp.armeria.client.RequestOptions;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.Scheme;
import com.linecorp.armeria.common.SerializationFormat;
import com.linecorp.armeria.common.SessionProtocol;
Expand Down Expand Up @@ -78,11 +81,11 @@ static WebSocketClient of() {
return DefaultWebSocketClient.DEFAULT;
}

/**
* Returns a new {@link WebSocketClient} that connects to the specified {@code uri} using the
* default options.
*/
static WebSocketClient of(String uri) {
/**
* Returns a new {@link WebSocketClient} that connects to the specified {@code uri} using the
* default options.
*/
static WebSocketClient of(String uri) {
return builder(uri).build();
}

Expand Down Expand Up @@ -142,6 +145,13 @@ static WebSocketClient of(SessionProtocol protocol, EndpointGroup endpointGroup,
return builder(protocol, endpointGroup, path).build();
}

/**
* Returns a new {@link WebSocketClientBuilder} without a base URI.
*/
static WebSocketClientBuilder builder() {
return builder(UNDEFINED_URI);
}

/**
* Returns a new {@link WebSocketClientBuilder} created with the specified base {@code uri}.
*/
Expand Down Expand Up @@ -214,8 +224,31 @@ static WebSocketClientBuilder builder(SessionProtocol protocol, EndpointGroup en

/**
* Connects to the specified {@code path}.
*
* <p>Note that the returned {@link CompletableFuture} is exceptionally completes with
* {@link WebSocketClientHandshakeException} if the handshake failed.
*/
default CompletableFuture<WebSocketSession> connect(String path) {
return connect(path, HttpHeaders.of());
}

/**
* Connects to the specified {@code path} with the specified headers.
*
* <p>Note that the returned {@link CompletableFuture} is exceptionally completes with
* {@link WebSocketClientHandshakeException} if the handshake failed.
*/
default CompletableFuture<WebSocketSession> connect(String path, HttpHeaders headers) {
return connect(path, headers, RequestOptions.of());
}

/**
* Connects to the specified {@code path} with the specified {@link HttpHeaders} and {@link RequestOptions}.
*
* <p>Note that the returned {@link CompletableFuture} is exceptionally completes with
* {@link WebSocketClientHandshakeException} if the handshake failed.
*/
CompletableFuture<WebSocketSession> connect(String path);
CompletableFuture<WebSocketSession> connect(String path, HttpHeaders headers, RequestOptions options);

@Override
WebClient unwrap();
Expand Down
Loading
Loading