diff --git a/.changes/386353e6-e3cc-4561-bfd6-9763d3ac033b.json b/.changes/386353e6-e3cc-4561-bfd6-9763d3ac033b.json new file mode 100644 index 000000000..81e306639 --- /dev/null +++ b/.changes/386353e6-e3cc-4561-bfd6-9763d3ac033b.json @@ -0,0 +1,8 @@ +{ + "id": "386353e6-e3cc-4561-bfd6-9763d3ac033b", + "type": "bugfix", + "description": "Add support for connection idle monitoring for OkHttp via the engine config parameter `connectionIdlePollingInterval`. Monitoring is disabled by default to match previous behavior. This monitoring will switch to enabled by default in an upcoming minor version release.", + "issues": [ + "awslabs/aws-sdk-kotlin#1214" + ] +} \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index dfc7be76b..eb5170e4c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -92,7 +92,7 @@ ktor-http-cio = { module = "io.ktor:ktor-http-cio", version.ref = "ktor-version" ktor-utils = { module = "io.ktor:ktor-utils", version.ref = "ktor-version" } ktor-io = { module = "io.ktor:ktor-io", version.ref = "ktor-version" } ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor-version" } -ktor-server-jetty = { module = "io.ktor:ktor-server-jetty", version.ref = "ktor-version" } +ktor-server-jetty-jakarta = { module = "io.ktor:ktor-server-jetty-jakarta", version.ref = "ktor-version" } ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor-version" } ktor-network-tls-certificates = { module = "io.ktor:ktor-network-tls-certificates", version.ref = "ktor-version" } diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api b/runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api index 3ab63989e..25c233955 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api @@ -66,14 +66,17 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine$Com public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl { public static final field Companion Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Companion; public synthetic fun (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder;Lkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun getConnectionIdlePollingInterval-FghU774 ()Lkotlin/time/Duration; public final fun getMaxConcurrencyPerHost-pVg5ArA ()I public fun toBuilderApplicator ()Lkotlin/jvm/functions/Function1; } public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl$BuilderImpl { public fun ()V + public final fun getConnectionIdlePollingInterval-FghU774 ()Lkotlin/time/Duration; public final fun getMaxConcurrencyPerHost-0hXNFcg ()Lkotlin/UInt; public fun getTelemetryProvider ()Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider; + public final fun setConnectionIdlePollingInterval-BwNAW2A (Lkotlin/time/Duration;)V public final fun setMaxConcurrencyPerHost-ExVfyTY (Lkotlin/UInt;)V public fun setTelemetryProvider (Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;)V } diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt new file mode 100644 index 000000000..5d845c0c4 --- /dev/null +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt @@ -0,0 +1,120 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package aws.smithy.kotlin.runtime.http.engine.okhttp + +import aws.smithy.kotlin.runtime.telemetry.logging.logger +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import okhttp3.Call +import okhttp3.Connection +import okhttp3.ConnectionListener +import okhttp3.ExperimentalOkHttpApi +import okhttp3.internal.closeQuietly +import okio.EOFException +import okio.buffer +import okio.source +import java.net.SocketException +import java.net.SocketTimeoutException +import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.coroutineContext +import kotlin.time.Duration +import kotlin.time.measureTime + +@OptIn(ExperimentalOkHttpApi::class) +internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() { + private val monitors = ConcurrentHashMap() + + private fun Call.callContext() = + request() + .tag(SdkRequestTag::class.java) + ?.callContext + ?: Dispatchers.IO + + override fun connectionAcquired(connection: Connection, call: Call) { + // Non-locking map access is okay here because this code will only execute synchronously as part of a + // `connectionAcquired` event and will be complete before any future `connectionReleased` event could fire for + // the same connection. + monitors.remove(connection)?.let { monitor -> + val context = call.callContext() + val logger = context.logger() + logger.trace { "Cancel monitoring for $connection" } + + // Use `runBlocking` because this _must_ finish before OkHttp goes to use the connection + val cancelTime = measureTime { + runBlocking(context) { monitor.cancelAndJoin() } + } + + logger.trace { "Monitoring canceled for $connection in $cancelTime" } + } + } + + override fun connectionReleased(connection: Connection, call: Call) { + val connId = System.identityHashCode(connection) + val context = call.callContext() + val scope = CoroutineScope(context) + val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) { + doMonitor(connection) + } + context.logger().trace { "Launched coroutine $monitor to monitor $connection" } + + // Non-locking map access is okay here because this code will only execute synchronously as part of a + // `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for + // the same connection. + monitors[connection] = monitor + } + + private suspend fun doMonitor(conn: Connection) { + val logger = coroutineContext.logger() + + val socket = conn.socket() + val source = try { + socket.source() + } catch (_: SocketException) { + logger.trace { "Socket for $conn closed before monitoring started. Skipping polling loop." } + return + }.buffer().peek() + + logger.trace { "Commence socket monitoring for $conn" } + var resetTimeout = true + val oldTimeout = socket.soTimeout + + try { + socket.soTimeout = pollInterval.inWholeMilliseconds.toInt() + + while (coroutineContext.isActive) { + try { + logger.trace { "Polling socket for $conn" } + source.readByte() // Blocking read; will take up to `pollInterval` time to complete + } catch (_: SocketTimeoutException) { + logger.trace { "Socket still alive for $conn" } + } catch (_: EOFException) { + logger.trace { "Socket closed remotely for $conn" } + socket.closeQuietly() + resetTimeout = false + return + } + } + + logger.trace { "Monitoring coroutine has been cancelled. Ending polling loop." } + } catch (e: Throwable) { + logger.warn(e) { "Failed to poll $conn. Ending polling loop. Connection may be unstable now." } + } finally { + if (resetTimeout) { + logger.trace { "Attempting to reset soTimeout..." } + try { + conn.socket().soTimeout = oldTimeout + } catch (e: Throwable) { + logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." } + } + } + } + } +} diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index ba50e7c51..2818aaac7 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -99,14 +99,20 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli readTimeout(config.socketReadTimeout.toJavaDuration()) writeTimeout(config.socketWriteTimeout.toJavaDuration()) - // FIXME - register a [ConnectionListener](https://github.com/square/okhttp/blob/master/okhttp/src/jvmMain/kotlin/okhttp3/ConnectionListener.kt#L27) - // when a new okhttp release is cut that contains this abstraction and wireup connection uptime metrics + @OptIn(ExperimentalOkHttpApi::class) + val connectionListener = if (config.connectionIdlePollingInterval == null) { + ConnectionListener.NONE + } else { + ConnectionIdleMonitor(connectionIdlePollingInterval) + } // use our own pool configured with the timeout settings taken from config + @OptIn(ExperimentalOkHttpApi::class) val pool = ConnectionPool( maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds, TimeUnit.MILLISECONDS, + connectionListener = connectionListener, ) connectionPool(pool) diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig.kt index 8466ff083..9c852f421 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig.kt @@ -9,6 +9,7 @@ import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfigImpl import aws.smithy.kotlin.runtime.telemetry.Global import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider +import kotlin.time.Duration /** * The configuration parameters for an OkHttp HTTP client engine. @@ -28,6 +29,22 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie public val Default: OkHttpEngineConfig = OkHttpEngineConfig(Builder()) } + /** + * The interval in which to poll idle connections for remote closure or `null` to disable monitoring of idle + * connections. The default value is `null`. + * + * When this value is non-`null`, polling is enabled on connections which are released from an engine call and + * enter the connection pool. Polling consists of a loop that performs blocking reads with the socket timeout + * set to [connectionIdlePollingInterval]. Polling is cancelled for a connection when the engine acquires it + * from the pool or when the connection is evicted from the pool and closed. Because the polling loop uses + * blocking reads, an engine call to acquire or close a connection may be delayed by as much as + * [connectionIdlePollingInterval]. + * + * When this value is `null`, polling is disabled. Idle connections in the pool which are closed remotely may + * encounter errors when they are acquired for a subsequent call. + */ + public val connectionIdlePollingInterval: Duration? = builder.connectionIdlePollingInterval + /** * The maximum number of requests to execute concurrently for a single host. */ @@ -37,6 +54,7 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie super.toBuilderApplicator()() if (this is Builder) { + connectionIdlePollingInterval = this@OkHttpEngineConfig.connectionIdlePollingInterval maxConcurrencyPerHost = this@OkHttpEngineConfig.maxConcurrencyPerHost } } @@ -45,6 +63,22 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie * A builder for [OkHttpEngineConfig] */ public class Builder : BuilderImpl() { + /** + * The interval in which to poll idle connections for remote closure or `null` to disable monitoring of idle + * connections. The default value is `null`. + * + * When this value is non-`null`, polling is enabled on connections which are released from an engine call and + * enter the connection pool. Polling consists of a loop that performs blocking reads with the socket timeout + * set to [connectionIdlePollingInterval]. Polling is cancelled for a connection when the engine acquires it + * from the pool or when the connection is evicted from the pool and closed. Because the polling loop uses + * blocking reads, an engine call to acquire or close a connection may be delayed by as much as + * [connectionIdlePollingInterval]. + * + * When this value is `null`, polling is disabled. Idle connections in the pool which are closed remotely may + * encounter errors when they are acquired for a subsequent call. + */ + public var connectionIdlePollingInterval: Duration? = null + /** * The maximum number of requests to execute concurrently for a single host. Defaults to [maxConcurrency]. */ diff --git a/runtime/protocol/http-client-engines/test-suite/build.gradle.kts b/runtime/protocol/http-client-engines/test-suite/build.gradle.kts index 10edd6851..ece9d6e10 100644 --- a/runtime/protocol/http-client-engines/test-suite/build.gradle.kts +++ b/runtime/protocol/http-client-engines/test-suite/build.gradle.kts @@ -24,7 +24,7 @@ kotlin { jvmMain { dependencies { - implementation(libs.ktor.server.jetty) + implementation(libs.ktor.server.jetty.jakarta) implementation(libs.ktor.network.tls.certificates) implementation(project(":runtime:protocol:http-client-engines:http-client-engine-default")) @@ -52,6 +52,8 @@ kotlin { implementation("org.bouncycastle:bcpkix-jdk18on:1.78") // https://github.com/docker-java/docker-java/pull/2326 implementation(libs.docker.transport.zerodep) + + implementation(project(":runtime:observability:telemetry-defaults")) } } diff --git a/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/suite/Connections.kt b/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/suite/Connections.kt new file mode 100644 index 000000000..704ab31bc --- /dev/null +++ b/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/suite/Connections.kt @@ -0,0 +1,21 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package aws.smithy.kotlin.runtime.http.test.suite + +import io.ktor.server.application.Application +import io.ktor.server.response.respondText +import io.ktor.server.routing.post +import io.ktor.server.routing.route +import io.ktor.server.routing.routing + +internal fun Application.connectionTests() { + routing { + route("connectionDrop") { + post { + call.respondText("Bar") + } + } + } +} diff --git a/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/util/TestServers.kt b/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/util/TestServers.kt index 2afff67c2..1bc9d6e9e 100644 --- a/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/util/TestServers.kt +++ b/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/util/TestServers.kt @@ -13,10 +13,13 @@ import aws.smithy.kotlin.runtime.http.test.suite.uploadTests import io.ktor.server.application.* import io.ktor.server.engine.* import io.ktor.server.jetty.* +import io.ktor.server.jetty.jakarta.Jetty +import io.ktor.server.jetty.jakarta.JettyApplicationEngineBase import redirectTests import java.io.Closeable import java.nio.file.Paths import java.util.concurrent.TimeUnit +import kotlin.time.Duration.Companion.seconds private data class TestServer( val port: Int, @@ -94,7 +97,7 @@ private fun tlsServer(instance: TestServer, sslConfig: SslConfig): EmbeddedServe val rootConfig = serverConfig { module(instance.initializer) } - val engineConfig: ApplicationEngine.Configuration.() -> Unit = { + val engineConfig: JettyApplicationEngineBase.Configuration.() -> Unit = { when (instance.type) { ConnectorType.HTTP -> connector { port = instance.port } @@ -109,6 +112,8 @@ private fun tlsServer(instance: TestServer, sslConfig: SslConfig): EmbeddedServe enabledProtocols = instance.protocolName?.let(::listOf) } } + + idleTimeout = 3.seconds // Required for ConnectionTest.testShortLivedConnections } return try { @@ -126,6 +131,7 @@ internal fun Application.testRoutes() { uploadTests() concurrentTests() headerTests() + connectionTests() } // configure SSL-only routes diff --git a/runtime/protocol/http-client-engines/test-suite/jvm/test/aws/smithy/kotlin/runtime/http/test/ConnectionTest.kt b/runtime/protocol/http-client-engines/test-suite/jvm/test/aws/smithy/kotlin/runtime/http/test/ConnectionTest.kt index 809512115..de47ed47d 100644 --- a/runtime/protocol/http-client-engines/test-suite/jvm/test/aws/smithy/kotlin/runtime/http/test/ConnectionTest.kt +++ b/runtime/protocol/http-client-engines/test-suite/jvm/test/aws/smithy/kotlin/runtime/http/test/ConnectionTest.kt @@ -6,13 +6,17 @@ package aws.smithy.kotlin.runtime.http.test import aws.smithy.kotlin.runtime.content.decodeToString import aws.smithy.kotlin.runtime.http.* +import aws.smithy.kotlin.runtime.http.engine.okhttp.OkHttpEngineConfig import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.http.request.url import aws.smithy.kotlin.runtime.http.test.util.* import aws.smithy.kotlin.runtime.http.test.util.testServers import aws.smithy.kotlin.runtime.net.TlsVersion +import kotlinx.coroutines.delay import java.nio.file.Paths import kotlin.test.* +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds class ConnectionTest : AbstractEngineTest() { private fun testMinTlsVersion(version: TlsVersion, serverType: ServerType) { @@ -79,4 +83,45 @@ class ConnectionTest : AbstractEngineTest() { @Test fun testMinTls1_3() = testMinTlsVersion(TlsVersion.TLS_1_3, ServerType.TLS_1_3) + + // See https://github.com/awslabs/aws-sdk-kotlin/issues/1214 + @Test + fun testShortLivedConnections() = testEngines( + // Only run this test on OkHttp + skipEngines = setOf("CrtHttpEngine", "OkHttp4Engine"), + ) { + engineConfig { + this as OkHttpEngineConfig.Builder + connectionIdlePollingInterval = 200.milliseconds + connectionIdleTimeout = 10.seconds // Longer than the server-side timeout + } + + test { _, client -> + val initialReq = HttpRequest { + testSetup() + method = HttpMethod.POST + url { + path.decoded = "/connectionDrop" + } + body = "Foo".toHttpBody() + } + val initialCall = client.call(initialReq) + val initialResp = initialCall.response.body.toByteStream()?.decodeToString() + assertEquals("Bar", initialResp) + + delay(5.seconds) // Longer than the service side timeout, shorter than the client-side timeout + + val subsequentReq = HttpRequest { + testSetup() + method = HttpMethod.POST + url { + path.decoded = "/connectionDrop" + } + body = "Foo".toHttpBody() + } + val subsequentCall = client.call(subsequentReq) + val subsequentResp = subsequentCall.response.body.toByteStream()?.decodeToString() + assertEquals("Bar", subsequentResp) + } + } }