Skip to content

Commit

Permalink
fix: add support for idle connection monitoring (opt-in for now) (#1171)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianbotsf authored Oct 31, 2024
1 parent 792a6f8 commit 5f7c98d
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 5 deletions.
8 changes: 8 additions & 0 deletions .changes/386353e6-e3cc-4561-bfd6-9763d3ac033b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "386353e6-e3cc-4561-bfd6-9763d3ac033b",
"type": "bugfix",
"description": "Add support for connection idle monitoring for OkHttp via the engine config parameter `connectionIdlePollingInterval`. Monitoring is disabled by default to match previous behavior. This monitoring will switch to enabled by default in an upcoming minor version release.",
"issues": [
"awslabs/aws-sdk-kotlin#1214"
]
}
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ ktor-http-cio = { module = "io.ktor:ktor-http-cio", version.ref = "ktor-version"
ktor-utils = { module = "io.ktor:ktor-utils", version.ref = "ktor-version" }
ktor-io = { module = "io.ktor:ktor-io", version.ref = "ktor-version" }
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor-version" }
ktor-server-jetty = { module = "io.ktor:ktor-server-jetty", version.ref = "ktor-version" }
ktor-server-jetty-jakarta = { module = "io.ktor:ktor-server-jetty-jakarta", version.ref = "ktor-version" }
ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor-version" }
ktor-network-tls-certificates = { module = "io.ktor:ktor-network-tls-certificates", version.ref = "ktor-version" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,17 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine$Com
public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl {
public static final field Companion Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Companion;
public synthetic fun <init> (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getConnectionIdlePollingInterval-FghU774 ()Lkotlin/time/Duration;
public final fun getMaxConcurrencyPerHost-pVg5ArA ()I
public fun toBuilderApplicator ()Lkotlin/jvm/functions/Function1;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl$BuilderImpl {
public fun <init> ()V
public final fun getConnectionIdlePollingInterval-FghU774 ()Lkotlin/time/Duration;
public final fun getMaxConcurrencyPerHost-0hXNFcg ()Lkotlin/UInt;
public fun getTelemetryProvider ()Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;
public final fun setConnectionIdlePollingInterval-BwNAW2A (Lkotlin/time/Duration;)V
public final fun setMaxConcurrencyPerHost-ExVfyTY (Lkotlin/UInt;)V
public fun setTelemetryProvider (Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;)V
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.telemetry.logging.logger
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import okhttp3.Call
import okhttp3.Connection
import okhttp3.ConnectionListener
import okhttp3.ExperimentalOkHttpApi
import okhttp3.internal.closeQuietly
import okio.EOFException
import okio.buffer
import okio.source
import java.net.SocketException
import java.net.SocketTimeoutException
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlin.time.measureTime

@OptIn(ExperimentalOkHttpApi::class)
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() {
private val monitors = ConcurrentHashMap<Connection, Job>()

private fun Call.callContext() =
request()
.tag(SdkRequestTag::class.java)
?.callContext
?: Dispatchers.IO

override fun connectionAcquired(connection: Connection, call: Call) {
// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionAcquired` event and will be complete before any future `connectionReleased` event could fire for
// the same connection.
monitors.remove(connection)?.let { monitor ->
val context = call.callContext()
val logger = context.logger<ConnectionIdleMonitor>()
logger.trace { "Cancel monitoring for $connection" }

// Use `runBlocking` because this _must_ finish before OkHttp goes to use the connection
val cancelTime = measureTime {
runBlocking(context) { monitor.cancelAndJoin() }
}

logger.trace { "Monitoring canceled for $connection in $cancelTime" }
}
}

override fun connectionReleased(connection: Connection, call: Call) {
val connId = System.identityHashCode(connection)
val context = call.callContext()
val scope = CoroutineScope(context)
val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
doMonitor(connection)
}
context.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }

// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for
// the same connection.
monitors[connection] = monitor
}

private suspend fun doMonitor(conn: Connection) {
val logger = coroutineContext.logger<ConnectionIdleMonitor>()

val socket = conn.socket()
val source = try {
socket.source()
} catch (_: SocketException) {
logger.trace { "Socket for $conn closed before monitoring started. Skipping polling loop." }
return
}.buffer().peek()

logger.trace { "Commence socket monitoring for $conn" }
var resetTimeout = true
val oldTimeout = socket.soTimeout

try {
socket.soTimeout = pollInterval.inWholeMilliseconds.toInt()

while (coroutineContext.isActive) {
try {
logger.trace { "Polling socket for $conn" }
source.readByte() // Blocking read; will take up to `pollInterval` time to complete
} catch (_: SocketTimeoutException) {
logger.trace { "Socket still alive for $conn" }
} catch (_: EOFException) {
logger.trace { "Socket closed remotely for $conn" }
socket.closeQuietly()
resetTimeout = false
return
}
}

logger.trace { "Monitoring coroutine has been cancelled. Ending polling loop." }
} catch (e: Throwable) {
logger.warn(e) { "Failed to poll $conn. Ending polling loop. Connection may be unstable now." }
} finally {
if (resetTimeout) {
logger.trace { "Attempting to reset soTimeout..." }
try {
conn.socket().soTimeout = oldTimeout
} catch (e: Throwable) {
logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." }
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,20 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
readTimeout(config.socketReadTimeout.toJavaDuration())
writeTimeout(config.socketWriteTimeout.toJavaDuration())

// FIXME - register a [ConnectionListener](https://github.com/square/okhttp/blob/master/okhttp/src/jvmMain/kotlin/okhttp3/ConnectionListener.kt#L27)
// when a new okhttp release is cut that contains this abstraction and wireup connection uptime metrics
@OptIn(ExperimentalOkHttpApi::class)
val connectionListener = if (config.connectionIdlePollingInterval == null) {
ConnectionListener.NONE
} else {
ConnectionIdleMonitor(connectionIdlePollingInterval)
}

// use our own pool configured with the timeout settings taken from config
@OptIn(ExperimentalOkHttpApi::class)
val pool = ConnectionPool(
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds,
TimeUnit.MILLISECONDS,
connectionListener = connectionListener,
)
connectionPool(pool)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfigImpl
import aws.smithy.kotlin.runtime.telemetry.Global
import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider
import kotlin.time.Duration

/**
* The configuration parameters for an OkHttp HTTP client engine.
Expand All @@ -28,6 +29,22 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
public val Default: OkHttpEngineConfig = OkHttpEngineConfig(Builder())
}

/**
* The interval in which to poll idle connections for remote closure or `null` to disable monitoring of idle
* connections. The default value is `null`.
*
* When this value is non-`null`, polling is enabled on connections which are released from an engine call and
* enter the connection pool. Polling consists of a loop that performs blocking reads with the socket timeout
* set to [connectionIdlePollingInterval]. Polling is cancelled for a connection when the engine acquires it
* from the pool or when the connection is evicted from the pool and closed. Because the polling loop uses
* blocking reads, an engine call to acquire or close a connection may be delayed by as much as
* [connectionIdlePollingInterval].
*
* When this value is `null`, polling is disabled. Idle connections in the pool which are closed remotely may
* encounter errors when they are acquired for a subsequent call.
*/
public val connectionIdlePollingInterval: Duration? = builder.connectionIdlePollingInterval

/**
* The maximum number of requests to execute concurrently for a single host.
*/
Expand All @@ -37,6 +54,7 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
super.toBuilderApplicator()()

if (this is Builder) {
connectionIdlePollingInterval = this@OkHttpEngineConfig.connectionIdlePollingInterval
maxConcurrencyPerHost = this@OkHttpEngineConfig.maxConcurrencyPerHost
}
}
Expand All @@ -45,6 +63,22 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
* A builder for [OkHttpEngineConfig]
*/
public class Builder : BuilderImpl() {
/**
* The interval in which to poll idle connections for remote closure or `null` to disable monitoring of idle
* connections. The default value is `null`.
*
* When this value is non-`null`, polling is enabled on connections which are released from an engine call and
* enter the connection pool. Polling consists of a loop that performs blocking reads with the socket timeout
* set to [connectionIdlePollingInterval]. Polling is cancelled for a connection when the engine acquires it
* from the pool or when the connection is evicted from the pool and closed. Because the polling loop uses
* blocking reads, an engine call to acquire or close a connection may be delayed by as much as
* [connectionIdlePollingInterval].
*
* When this value is `null`, polling is disabled. Idle connections in the pool which are closed remotely may
* encounter errors when they are acquired for a subsequent call.
*/
public var connectionIdlePollingInterval: Duration? = null

/**
* The maximum number of requests to execute concurrently for a single host. Defaults to [maxConcurrency].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ kotlin {

jvmMain {
dependencies {
implementation(libs.ktor.server.jetty)
implementation(libs.ktor.server.jetty.jakarta)
implementation(libs.ktor.network.tls.certificates)

implementation(project(":runtime:protocol:http-client-engines:http-client-engine-default"))
Expand Down Expand Up @@ -52,6 +52,8 @@ kotlin {
implementation("org.bouncycastle:bcpkix-jdk18on:1.78") // https://github.com/docker-java/docker-java/pull/2326

implementation(libs.docker.transport.zerodep)

implementation(project(":runtime:observability:telemetry-defaults"))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.test.suite

import io.ktor.server.application.Application
import io.ktor.server.response.respondText
import io.ktor.server.routing.post
import io.ktor.server.routing.route
import io.ktor.server.routing.routing

internal fun Application.connectionTests() {
routing {
route("connectionDrop") {
post {
call.respondText("Bar")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import aws.smithy.kotlin.runtime.http.test.suite.uploadTests
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.jetty.*
import io.ktor.server.jetty.jakarta.Jetty
import io.ktor.server.jetty.jakarta.JettyApplicationEngineBase
import redirectTests
import java.io.Closeable
import java.nio.file.Paths
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds

private data class TestServer(
val port: Int,
Expand Down Expand Up @@ -94,7 +97,7 @@ private fun tlsServer(instance: TestServer, sslConfig: SslConfig): EmbeddedServe
val rootConfig = serverConfig {
module(instance.initializer)
}
val engineConfig: ApplicationEngine.Configuration.() -> Unit = {
val engineConfig: JettyApplicationEngineBase.Configuration.() -> Unit = {
when (instance.type) {
ConnectorType.HTTP -> connector { port = instance.port }

Expand All @@ -109,6 +112,8 @@ private fun tlsServer(instance: TestServer, sslConfig: SslConfig): EmbeddedServe
enabledProtocols = instance.protocolName?.let(::listOf)
}
}

idleTimeout = 3.seconds // Required for ConnectionTest.testShortLivedConnections
}

return try {
Expand All @@ -126,6 +131,7 @@ internal fun Application.testRoutes() {
uploadTests()
concurrentTests()
headerTests()
connectionTests()
}

// configure SSL-only routes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ package aws.smithy.kotlin.runtime.http.test

import aws.smithy.kotlin.runtime.content.decodeToString
import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.engine.okhttp.OkHttpEngineConfig
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.http.request.url
import aws.smithy.kotlin.runtime.http.test.util.*
import aws.smithy.kotlin.runtime.http.test.util.testServers
import aws.smithy.kotlin.runtime.net.TlsVersion
import kotlinx.coroutines.delay
import java.nio.file.Paths
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

class ConnectionTest : AbstractEngineTest() {
private fun testMinTlsVersion(version: TlsVersion, serverType: ServerType) {
Expand Down Expand Up @@ -79,4 +83,45 @@ class ConnectionTest : AbstractEngineTest() {

@Test
fun testMinTls1_3() = testMinTlsVersion(TlsVersion.TLS_1_3, ServerType.TLS_1_3)

// See https://github.com/awslabs/aws-sdk-kotlin/issues/1214
@Test
fun testShortLivedConnections() = testEngines(
// Only run this test on OkHttp
skipEngines = setOf("CrtHttpEngine", "OkHttp4Engine"),
) {
engineConfig {
this as OkHttpEngineConfig.Builder
connectionIdlePollingInterval = 200.milliseconds
connectionIdleTimeout = 10.seconds // Longer than the server-side timeout
}

test { _, client ->
val initialReq = HttpRequest {
testSetup()
method = HttpMethod.POST
url {
path.decoded = "/connectionDrop"
}
body = "Foo".toHttpBody()
}
val initialCall = client.call(initialReq)
val initialResp = initialCall.response.body.toByteStream()?.decodeToString()
assertEquals("Bar", initialResp)

delay(5.seconds) // Longer than the service side timeout, shorter than the client-side timeout

val subsequentReq = HttpRequest {
testSetup()
method = HttpMethod.POST
url {
path.decoded = "/connectionDrop"
}
body = "Foo".toHttpBody()
}
val subsequentCall = client.call(subsequentReq)
val subsequentResp = subsequentCall.response.body.toByteStream()?.decodeToString()
assertEquals("Bar", subsequentResp)
}
}
}

0 comments on commit 5f7c98d

Please sign in to comment.