From ac87eab7fecc314f61b8dacce55f068924ce43b0 Mon Sep 17 00:00:00 2001 From: 0marperez Date: Mon, 11 Nov 2024 17:03:55 -0500 Subject: [PATCH 1/5] fix: http call waiting for idle connection monitor to finish --- .../build.gradle.kts | 1 + .../engine/okhttp/ConnectionIdleMonitor.kt | 32 ++++++----- .../http/engine/okhttp/OkHttpEngine.kt | 54 ++++++++++++------- .../http/engine/okhttp4/OkHttp4Engine.kt | 6 +++ 4 files changed, 58 insertions(+), 35 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts b/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts index 3e5d13938..7d3ad7fcd 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts @@ -36,6 +36,7 @@ kotlin { all { languageSettings.optIn("aws.smithy.kotlin.runtime.InternalApi") + languageSettings.optIn("okhttp3.ExperimentalOkHttpApi") } } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt index 5d845c0c4..8f521c29c 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt @@ -5,18 +5,10 @@ package aws.smithy.kotlin.runtime.http.engine.okhttp import aws.smithy.kotlin.runtime.telemetry.logging.logger -import kotlinx.coroutines.CoroutineName -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import okhttp3.Call import okhttp3.Connection import okhttp3.ConnectionListener -import okhttp3.ExperimentalOkHttpApi import okhttp3.internal.closeQuietly import okio.EOFException import okio.buffer @@ -24,14 +16,20 @@ import okio.source import java.net.SocketException import java.net.SocketTimeoutException import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext import kotlin.time.Duration import kotlin.time.measureTime -@OptIn(ExperimentalOkHttpApi::class) internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() { + private val monitorScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val monitors = ConcurrentHashMap() + fun close(): Unit = runBlocking { + monitors.values.forEach { it.cancelAndJoin() } + monitorScope.cancel() + } + private fun Call.callContext() = request() .tag(SdkRequestTag::class.java) @@ -58,12 +56,11 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis override fun connectionReleased(connection: Connection, call: Call) { val connId = System.identityHashCode(connection) - val context = call.callContext() - val scope = CoroutineScope(context) - val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) { - doMonitor(connection) + val callContext = call.callContext() + val monitor = monitorScope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) { + doMonitor(connection, callContext) } - context.logger().trace { "Launched coroutine $monitor to monitor $connection" } + callContext.logger().trace { "Launched coroutine $monitor to monitor $connection" } // Non-locking map access is okay here because this code will only execute synchronously as part of a // `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for @@ -71,8 +68,8 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis monitors[connection] = monitor } - private suspend fun doMonitor(conn: Connection) { - val logger = coroutineContext.logger() + private suspend fun doMonitor(conn: Connection, callContext: CoroutineContext) { + val logger = callContext.logger() val socket = conn.socket() val source = try { @@ -111,6 +108,7 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis logger.trace { "Attempting to reset soTimeout..." } try { conn.socket().soTimeout = oldTimeout + logger.trace { "SoTimeout reset." } } catch (e: Throwable) { logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index 2818aaac7..790efe288 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -15,9 +15,9 @@ import aws.smithy.kotlin.runtime.net.TlsVersion import aws.smithy.kotlin.runtime.operation.ExecutionContext import aws.smithy.kotlin.runtime.time.Instant import aws.smithy.kotlin.runtime.time.fromEpochMilliseconds -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.job import okhttp3.* +import okhttp3.ConnectionPool import okhttp3.coroutines.executeAsync import java.util.concurrent.TimeUnit import kotlin.time.toJavaDuration @@ -44,7 +44,8 @@ public class OkHttpEngine( } private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider) - private val client = config.buildClient(metrics) + private val connectionIdleMonitor = if (config.connectionIdlePollingInterval != null) ConnectionIdleMonitor(config.connectionIdlePollingInterval) else null + private val client = config.buildClientWithConnectionListener(metrics, connectionIdleMonitor) override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { val callContext = callContext() @@ -52,7 +53,6 @@ public class OkHttpEngine( val engineRequest = request.toOkHttpRequest(context, callContext, metrics) val engineCall = client.newCall(engineRequest) - @OptIn(ExperimentalCoroutinesApi::class) val engineResponse = mapOkHttpExceptions { engineCall.executeAsync() } val response = engineResponse.toSdkResponse() @@ -71,17 +71,17 @@ public class OkHttpEngine( } override fun shutdown() { + connectionIdleMonitor?.close() client.connectionPool.evictAll() client.dispatcher.executorService.shutdown() metrics.close() } } -/** - * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client - */ -@InternalApi -public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient { +private fun OkHttpEngineConfig.buildClientFromConfig( + metrics: HttpClientMetrics, + poolOverride: ConnectionPool? = null +): OkHttpClient { val config = this return OkHttpClient.Builder().apply { @@ -99,22 +99,13 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli readTimeout(config.socketReadTimeout.toJavaDuration()) writeTimeout(config.socketWriteTimeout.toJavaDuration()) - @OptIn(ExperimentalOkHttpApi::class) - val connectionListener = if (config.connectionIdlePollingInterval == null) { - ConnectionListener.NONE - } else { - ConnectionIdleMonitor(connectionIdlePollingInterval) - } - // use our own pool configured with the timeout settings taken from config - @OptIn(ExperimentalOkHttpApi::class) val pool = ConnectionPool( maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds, TimeUnit.MILLISECONDS, - connectionListener = connectionListener, ) - connectionPool(pool) + connectionPool(poolOverride ?: pool) val dispatcher = Dispatcher().apply { maxRequests = config.maxConcurrency.toInt() @@ -147,6 +138,33 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli }.build() } +/** + * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client + */ +// Used by OkHttp4Engine - OkHttp4 does NOT have `connectionListener` +// TODO - Refactor in next minor version - Move this to OkHttp4Engine and make it private +@InternalApi +public fun OkHttpEngineConfig.buildClient( + metrics: HttpClientMetrics +): OkHttpClient = this.buildClientFromConfig(metrics) + +/** + * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client + */ +// Used by OkHttpEngine - OkHttp5 does have `connectionListener` +private fun OkHttpEngineConfig.buildClientWithConnectionListener( + metrics: HttpClientMetrics, + connectionListener: ConnectionIdleMonitor? +): OkHttpClient = this.buildClientFromConfig( + metrics, + ConnectionPool( + maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor + keepAliveDuration = this.connectionIdleTimeout.inWholeMilliseconds, + timeUnit = TimeUnit.MILLISECONDS, + connectionListener = connectionListener ?: ConnectionListener.NONE, + ) +) + private fun minTlsConnectionSpec(tlsContext: TlsContext): ConnectionSpec { val minVersion = tlsContext.minVersion ?: TlsVersion.TLS_1_2 val okHttpTlsVersions = SdkTlsVersion diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt index 422d3952c..ff5982320 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt @@ -70,6 +70,12 @@ public class OkHttp4Engine( } } } + + override fun shutdown() { + client.connectionPool.evictAll() + client.dispatcher.executorService.shutdown() + metrics.close() + } } // Copied from okhttp3 5.x: From 330353e4f34a96088163ad22778e196a1faeb49e Mon Sep 17 00:00:00 2001 From: 0marperez Date: Mon, 11 Nov 2024 17:14:25 -0500 Subject: [PATCH 2/5] lint --- .../kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index 790efe288..7622603ff 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -80,7 +80,7 @@ public class OkHttpEngine( private fun OkHttpEngineConfig.buildClientFromConfig( metrics: HttpClientMetrics, - poolOverride: ConnectionPool? = null + poolOverride: ConnectionPool? = null, ): OkHttpClient { val config = this @@ -145,7 +145,7 @@ private fun OkHttpEngineConfig.buildClientFromConfig( // TODO - Refactor in next minor version - Move this to OkHttp4Engine and make it private @InternalApi public fun OkHttpEngineConfig.buildClient( - metrics: HttpClientMetrics + metrics: HttpClientMetrics, ): OkHttpClient = this.buildClientFromConfig(metrics) /** @@ -154,7 +154,7 @@ public fun OkHttpEngineConfig.buildClient( // Used by OkHttpEngine - OkHttp5 does have `connectionListener` private fun OkHttpEngineConfig.buildClientWithConnectionListener( metrics: HttpClientMetrics, - connectionListener: ConnectionIdleMonitor? + connectionListener: ConnectionIdleMonitor?, ): OkHttpClient = this.buildClientFromConfig( metrics, ConnectionPool( @@ -162,7 +162,7 @@ private fun OkHttpEngineConfig.buildClientWithConnectionListener( keepAliveDuration = this.connectionIdleTimeout.inWholeMilliseconds, timeUnit = TimeUnit.MILLISECONDS, connectionListener = connectionListener ?: ConnectionListener.NONE, - ) + ), ) private fun minTlsConnectionSpec(tlsContext: TlsContext): ConnectionSpec { From 38e3cf2732167c742e5477180ed9148590bd572e Mon Sep 17 00:00:00 2001 From: 0marperez Date: Wed, 13 Nov 2024 10:48:50 -0500 Subject: [PATCH 3/5] PR feedback --- .../http-client-engine-okhttp/build.gradle.kts | 1 - .../runtime/http/engine/okhttp/ConnectionIdleMonitor.kt | 3 ++- .../kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts b/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts index 7d3ad7fcd..3e5d13938 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts @@ -36,7 +36,6 @@ kotlin { all { languageSettings.optIn("aws.smithy.kotlin.runtime.InternalApi") - languageSettings.optIn("okhttp3.ExperimentalOkHttpApi") } } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt index 8f521c29c..4756d1cc8 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.* import okhttp3.Call import okhttp3.Connection import okhttp3.ConnectionListener +import okhttp3.ExperimentalOkHttpApi import okhttp3.internal.closeQuietly import okio.EOFException import okio.buffer @@ -21,13 +22,13 @@ import kotlin.coroutines.coroutineContext import kotlin.time.Duration import kotlin.time.measureTime +@OptIn(ExperimentalOkHttpApi::class) internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() { private val monitorScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val monitors = ConcurrentHashMap() fun close(): Unit = runBlocking { monitors.values.forEach { it.cancelAndJoin() } - monitorScope.cancel() } private fun Call.callContext() = diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index 7622603ff..061c22a13 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -44,7 +44,7 @@ public class OkHttpEngine( } private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider) - private val connectionIdleMonitor = if (config.connectionIdlePollingInterval != null) ConnectionIdleMonitor(config.connectionIdlePollingInterval) else null + private val connectionIdleMonitor = config.connectionIdlePollingInterval?.let { ConnectionIdleMonitor(it) } private val client = config.buildClientWithConnectionListener(metrics, connectionIdleMonitor) override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { @@ -100,12 +100,12 @@ private fun OkHttpEngineConfig.buildClientFromConfig( writeTimeout(config.socketWriteTimeout.toJavaDuration()) // use our own pool configured with the timeout settings taken from config - val pool = ConnectionPool( + val pool = poolOverride ?: ConnectionPool( maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds, TimeUnit.MILLISECONDS, ) - connectionPool(poolOverride ?: pool) + connectionPool(pool) val dispatcher = Dispatcher().apply { maxRequests = config.maxConcurrency.toInt() @@ -152,6 +152,7 @@ public fun OkHttpEngineConfig.buildClient( * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client */ // Used by OkHttpEngine - OkHttp5 does have `connectionListener` +@OptIn(ExperimentalOkHttpApi::class) private fun OkHttpEngineConfig.buildClientWithConnectionListener( metrics: HttpClientMetrics, connectionListener: ConnectionIdleMonitor?, From 6ef2a481bbd02a2391ef8bcec9a916548c382b2e Mon Sep 17 00:00:00 2001 From: 0marperez Date: Wed, 13 Nov 2024 12:20:51 -0500 Subject: [PATCH 4/5] SoTimeout -> soTimeout in logs --- .../kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt index 4756d1cc8..27e21bb7c 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt @@ -109,7 +109,7 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis logger.trace { "Attempting to reset soTimeout..." } try { conn.socket().soTimeout = oldTimeout - logger.trace { "SoTimeout reset." } + logger.trace { "soTimeout reset." } } catch (e: Throwable) { logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." } } From 3b23c4959b3f1dc256e5b9b3b158b6cf6d8c1443 Mon Sep 17 00:00:00 2001 From: 0marperez Date: Thu, 14 Nov 2024 10:59:55 -0500 Subject: [PATCH 5/5] Cancel and join monitor job --- .../runtime/http/engine/okhttp/ConnectionIdleMonitor.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt index 27e21bb7c..f64a6b5f8 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt @@ -28,7 +28,10 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis private val monitors = ConcurrentHashMap() fun close(): Unit = runBlocking { - monitors.values.forEach { it.cancelAndJoin() } + val monitorJob = requireNotNull(monitorScope.coroutineContext[Job]) { + "Connection idle monitor scope cannot be cancelled because it does not have a job: $this" + } + monitorJob.cancelAndJoin() } private fun Call.callContext() =