diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt index 5d845c0c4..f64a6b5f8 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/ConnectionIdleMonitor.kt @@ -5,14 +5,7 @@ package aws.smithy.kotlin.runtime.http.engine.okhttp import aws.smithy.kotlin.runtime.telemetry.logging.logger -import kotlinx.coroutines.CoroutineName -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import okhttp3.Call import okhttp3.Connection import okhttp3.ConnectionListener @@ -24,14 +17,23 @@ import okio.source import java.net.SocketException import java.net.SocketTimeoutException import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext import kotlin.time.Duration import kotlin.time.measureTime @OptIn(ExperimentalOkHttpApi::class) internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() { + private val monitorScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val monitors = ConcurrentHashMap() + fun close(): Unit = runBlocking { + val monitorJob = requireNotNull(monitorScope.coroutineContext[Job]) { + "Connection idle monitor scope cannot be cancelled because it does not have a job: $this" + } + monitorJob.cancelAndJoin() + } + private fun Call.callContext() = request() .tag(SdkRequestTag::class.java) @@ -58,12 +60,11 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis override fun connectionReleased(connection: Connection, call: Call) { val connId = System.identityHashCode(connection) - val context = call.callContext() - val scope = CoroutineScope(context) - val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) { - doMonitor(connection) + val callContext = call.callContext() + val monitor = monitorScope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) { + doMonitor(connection, callContext) } - context.logger().trace { "Launched coroutine $monitor to monitor $connection" } + callContext.logger().trace { "Launched coroutine $monitor to monitor $connection" } // Non-locking map access is okay here because this code will only execute synchronously as part of a // `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for @@ -71,8 +72,8 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis monitors[connection] = monitor } - private suspend fun doMonitor(conn: Connection) { - val logger = coroutineContext.logger() + private suspend fun doMonitor(conn: Connection, callContext: CoroutineContext) { + val logger = callContext.logger() val socket = conn.socket() val source = try { @@ -111,6 +112,7 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis logger.trace { "Attempting to reset soTimeout..." } try { conn.socket().soTimeout = oldTimeout + logger.trace { "soTimeout reset." } } catch (e: Throwable) { logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index 2818aaac7..061c22a13 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -15,9 +15,9 @@ import aws.smithy.kotlin.runtime.net.TlsVersion import aws.smithy.kotlin.runtime.operation.ExecutionContext import aws.smithy.kotlin.runtime.time.Instant import aws.smithy.kotlin.runtime.time.fromEpochMilliseconds -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.job import okhttp3.* +import okhttp3.ConnectionPool import okhttp3.coroutines.executeAsync import java.util.concurrent.TimeUnit import kotlin.time.toJavaDuration @@ -44,7 +44,8 @@ public class OkHttpEngine( } private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider) - private val client = config.buildClient(metrics) + private val connectionIdleMonitor = config.connectionIdlePollingInterval?.let { ConnectionIdleMonitor(it) } + private val client = config.buildClientWithConnectionListener(metrics, connectionIdleMonitor) override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { val callContext = callContext() @@ -52,7 +53,6 @@ public class OkHttpEngine( val engineRequest = request.toOkHttpRequest(context, callContext, metrics) val engineCall = client.newCall(engineRequest) - @OptIn(ExperimentalCoroutinesApi::class) val engineResponse = mapOkHttpExceptions { engineCall.executeAsync() } val response = engineResponse.toSdkResponse() @@ -71,17 +71,17 @@ public class OkHttpEngine( } override fun shutdown() { + connectionIdleMonitor?.close() client.connectionPool.evictAll() client.dispatcher.executorService.shutdown() metrics.close() } } -/** - * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client - */ -@InternalApi -public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient { +private fun OkHttpEngineConfig.buildClientFromConfig( + metrics: HttpClientMetrics, + poolOverride: ConnectionPool? = null, +): OkHttpClient { val config = this return OkHttpClient.Builder().apply { @@ -99,20 +99,11 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli readTimeout(config.socketReadTimeout.toJavaDuration()) writeTimeout(config.socketWriteTimeout.toJavaDuration()) - @OptIn(ExperimentalOkHttpApi::class) - val connectionListener = if (config.connectionIdlePollingInterval == null) { - ConnectionListener.NONE - } else { - ConnectionIdleMonitor(connectionIdlePollingInterval) - } - // use our own pool configured with the timeout settings taken from config - @OptIn(ExperimentalOkHttpApi::class) - val pool = ConnectionPool( + val pool = poolOverride ?: ConnectionPool( maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds, TimeUnit.MILLISECONDS, - connectionListener = connectionListener, ) connectionPool(pool) @@ -147,6 +138,34 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli }.build() } +/** + * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client + */ +// Used by OkHttp4Engine - OkHttp4 does NOT have `connectionListener` +// TODO - Refactor in next minor version - Move this to OkHttp4Engine and make it private +@InternalApi +public fun OkHttpEngineConfig.buildClient( + metrics: HttpClientMetrics, +): OkHttpClient = this.buildClientFromConfig(metrics) + +/** + * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client + */ +// Used by OkHttpEngine - OkHttp5 does have `connectionListener` +@OptIn(ExperimentalOkHttpApi::class) +private fun OkHttpEngineConfig.buildClientWithConnectionListener( + metrics: HttpClientMetrics, + connectionListener: ConnectionIdleMonitor?, +): OkHttpClient = this.buildClientFromConfig( + metrics, + ConnectionPool( + maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor + keepAliveDuration = this.connectionIdleTimeout.inWholeMilliseconds, + timeUnit = TimeUnit.MILLISECONDS, + connectionListener = connectionListener ?: ConnectionListener.NONE, + ), +) + private fun minTlsConnectionSpec(tlsContext: TlsContext): ConnectionSpec { val minVersion = tlsContext.minVersion ?: TlsVersion.TLS_1_2 val okHttpTlsVersions = SdkTlsVersion diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt index 422d3952c..ff5982320 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp4/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp4/OkHttp4Engine.kt @@ -70,6 +70,12 @@ public class OkHttp4Engine( } } } + + override fun shutdown() { + client.connectionPool.evictAll() + client.dispatcher.executorService.shutdown() + metrics.close() + } } // Copied from okhttp3 5.x: