Skip to content

Commit

Permalink
fix:add support for idle connection monitoring (opt-in for now)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianbotsf committed Oct 30, 2024
1 parent b5f5e91 commit 3ce4ae7
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 2 deletions.
8 changes: 8 additions & 0 deletions .changes/386353e6-e3cc-4561-bfd6-9763d3ac033b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "386353e6-e3cc-4561-bfd6-9763d3ac033b",
"type": "bugfix",
"description": "Add support for connection idle monitoring for OkHttp via the engine config parameter `connectionIdlePollingInterval`. Monitoring is disabled by default to match previous behavior. This monitoring will switch to enabled by default in an upcoming minor version release.",
"issues": [
"awslabs/aws-sdk-kotlin#1214"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,17 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine$Com
public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl {
public static final field Companion Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Companion;
public synthetic fun <init> (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getConnectionIdlePollingInterval-FghU774 ()Lkotlin/time/Duration;
public final fun getMaxConcurrencyPerHost-pVg5ArA ()I
public fun toBuilderApplicator ()Lkotlin/jvm/functions/Function1;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl$BuilderImpl {
public fun <init> ()V
public final fun getConnectionIdlePollingInterval-FghU774 ()Lkotlin/time/Duration;
public final fun getMaxConcurrencyPerHost-0hXNFcg ()Lkotlin/UInt;
public fun getTelemetryProvider ()Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;
public final fun setConnectionIdlePollingInterval-BwNAW2A (Lkotlin/time/Duration;)V
public final fun setMaxConcurrencyPerHost-ExVfyTY (Lkotlin/UInt;)V
public fun setTelemetryProvider (Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;)V
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.telemetry.logging.Logger
import aws.smithy.kotlin.runtime.telemetry.logging.logger
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import okhttp3.Call
import okhttp3.Connection
import okhttp3.ConnectionListener
import okhttp3.ExperimentalOkHttpApi
import okhttp3.internal.closeQuietly
import okio.EOFException
import okio.buffer
import okio.source
import java.net.SocketException
import java.net.SocketTimeoutException
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.measureTime

@OptIn(ExperimentalOkHttpApi::class)
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() {
private val monitors = ConcurrentHashMap<Connection, Job>()

private fun Call.callContext() =
request()
.tag(SdkRequestTag::class.java)
?.callContext
?: Dispatchers.IO

override fun connectionAcquired(connection: Connection, call: Call) {
// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionAcquired` event and will be complete before any future `connectionReleased` event could fire for
// the same connection.
monitors.remove(connection)?.let { monitor ->
5.seconds
val context = call.callContext()
val logger = context.logger<ConnectionIdleMonitor>()
logger.trace { "Cancel monitoring for $connection" }

// Use `runBlocking` because this _must_ finish before OkHttp goes to use the connection
val cancelTime = measureTime {
runBlocking(context) { monitor.cancelAndJoin() }
}

logger.trace { "Monitoring canceled for $connection in $cancelTime" }
}
}

override fun connectionReleased(connection: Connection, call: Call) {
val connId = System.identityHashCode(connection)
val context = call.callContext()
val scope = CoroutineScope(context)
val logger = context.logger<ConnectionIdleMonitor>()
val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
doMonitor(connection, logger)
}
logger.trace { "Launched coroutine $monitor to monitor $connection" }

// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for
// the same connection.
monitors[connection] = monitor
}

private suspend fun doMonitor(conn: Connection, logger: Logger) {
val socket = conn.socket()
val source = try {
socket.source()
} catch (_: SocketException) {
logger.trace { "Socket for $conn closed before monitoring started. Skipping polling loop." }
return
}.buffer().peek()

logger.trace { "Commence socket monitoring for $conn" }
var resetTimeout = true
val oldTimeout = socket.soTimeout

try {
socket.soTimeout = pollInterval.inWholeMilliseconds.toInt()

while (coroutineContext.isActive) {
try {
source.readByte() // Blocking read; will take up to READ_TIMEOUT_MS to complete
} catch (_: SocketTimeoutException) {
// Socket is still alive
} catch (_: EOFException) {
logger.trace { "Socket for $conn was closed remotely" }
socket.closeQuietly()
resetTimeout = false
return
}
}
} catch (e: Throwable) {
logger.warn(e) { "Failed to poll $conn. Ending polling loop. Connection may be unstable now." }
} finally {
if (resetTimeout) {
logger.trace { "Attempting to reset soTimeout..." }
try {
conn.socket().soTimeout = oldTimeout
} catch (e: Throwable) {
logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." }
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,20 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
readTimeout(config.socketReadTimeout.toJavaDuration())
writeTimeout(config.socketWriteTimeout.toJavaDuration())

// FIXME - register a [ConnectionListener](https://github.com/square/okhttp/blob/master/okhttp/src/jvmMain/kotlin/okhttp3/ConnectionListener.kt#L27)
// when a new okhttp release is cut that contains this abstraction and wireup connection uptime metrics
@OptIn(ExperimentalOkHttpApi::class)
val connectionListener = if (config.connectionIdlePollingInterval == null) {
ConnectionListener.NONE
} else {
ConnectionIdleMonitor(connectionIdlePollingInterval)
}

// use our own pool configured with the timeout settings taken from config
@OptIn(ExperimentalOkHttpApi::class)
val pool = ConnectionPool(
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds,
TimeUnit.MILLISECONDS,
connectionListener = connectionListener,
)
connectionPool(pool)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfigImpl
import aws.smithy.kotlin.runtime.telemetry.Global
import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider
import kotlin.time.Duration

/**
* The configuration parameters for an OkHttp HTTP client engine.
Expand All @@ -28,6 +29,22 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
public val Default: OkHttpEngineConfig = OkHttpEngineConfig(Builder())
}

/**
* The interval in which to poll idle connections for remote closure or `null` to disable monitoring of idle
* connections. The default value is `null`.
*
* When this value is non-`null`, polling is enabled on connections which are released from an engine call and
* enter the connection pool. Polling consists of a loop that performs blocking reads with the socket timeout
* set to [connectionIdlePollingInterval]. Polling is cancelled for a connection when the engine acquires it
* from the pool or when the connection is evicted from the pool and closed. Because the polling loop uses
* blocking reads, an engine call to acquire or close a connection may be delayed by as much as
* [connectionIdlePollingInterval].
*
* When this value is `null`, polling is disabled. Idle connections in the pool which are closed remotely may
* encounter errors when they are acquired for a subsequent call.
*/
public val connectionIdlePollingInterval: Duration? = builder.connectionIdlePollingInterval

/**
* The maximum number of requests to execute concurrently for a single host.
*/
Expand All @@ -37,6 +54,7 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
super.toBuilderApplicator()()

if (this is Builder) {
connectionIdlePollingInterval = this@OkHttpEngineConfig.connectionIdlePollingInterval
maxConcurrencyPerHost = this@OkHttpEngineConfig.maxConcurrencyPerHost
}
}
Expand All @@ -45,6 +63,22 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
* A builder for [OkHttpEngineConfig]
*/
public class Builder : BuilderImpl() {
/**
* The interval in which to poll idle connections for remote closure or `null` to disable monitoring of idle
* connections. The default value is `null`.
*
* When this value is non-`null`, polling is enabled on connections which are released from an engine call and
* enter the connection pool. Polling consists of a loop that performs blocking reads with the socket timeout
* set to [connectionIdlePollingInterval]. Polling is cancelled for a connection when the engine acquires it
* from the pool or when the connection is evicted from the pool and closed. Because the polling loop uses
* blocking reads, an engine call to acquire or close a connection may be delayed by as much as
* [connectionIdlePollingInterval].
*
* When this value is `null`, polling is disabled. Idle connections in the pool which are closed remotely may
* encounter errors when they are acquired for a subsequent call.
*/
public var connectionIdlePollingInterval: Duration? = null

/**
* The maximum number of requests to execute concurrently for a single host. Defaults to [maxConcurrency].
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.test.suite

import io.ktor.server.application.Application
import io.ktor.server.application.call
import io.ktor.server.jetty.JettyApplicationCall
import io.ktor.server.response.respondText
import io.ktor.server.routing.RoutingApplicationCall
import io.ktor.server.routing.post
import io.ktor.server.routing.route
import io.ktor.server.routing.routing
import io.ktor.util.InternalAPI
import io.ktor.utils.io.close
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlin.time.Duration.Companion.seconds

internal fun Application.connectionTests() {
routing {
route("connectionDrop") {
@OptIn(InternalAPI::class)
post {
val routingCall = call as RoutingApplicationCall
val jettyCall = routingCall.engineCall as JettyApplicationCall

launch {
delay(4.seconds)
jettyCall.response.responseChannel().close()
}

jettyCall.respondText("Bar")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ internal fun Application.testRoutes() {
uploadTests()
concurrentTests()
headerTests()
connectionTests()
}

// configure SSL-only routes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ package aws.smithy.kotlin.runtime.http.test

import aws.smithy.kotlin.runtime.content.decodeToString
import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.engine.okhttp.OkHttpEngineConfig
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.http.request.url
import aws.smithy.kotlin.runtime.http.test.util.*
import aws.smithy.kotlin.runtime.http.test.util.testServers
import aws.smithy.kotlin.runtime.net.TlsVersion
import kotlinx.coroutines.delay
import java.nio.file.Paths
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

class ConnectionTest : AbstractEngineTest() {
private fun testMinTlsVersion(version: TlsVersion, serverType: ServerType) {
Expand Down Expand Up @@ -79,4 +83,44 @@ class ConnectionTest : AbstractEngineTest() {

@Test
fun testMinTls1_3() = testMinTlsVersion(TlsVersion.TLS_1_3, ServerType.TLS_1_3)

@Test
fun testShortLivedConnections() = testEngines(
// Only run this test on OkHttp
skipEngines = setOf("CrtHttpEngine", "OkHttp4Engine"),
) {
engineConfig {
this as OkHttpEngineConfig.Builder
connectionIdlePollingInterval = 200.milliseconds
connectionIdleTimeout = 10.seconds // Longer than the server-side timeout
}

test { _, client ->
val initialReq = HttpRequest {
testSetup()
method = HttpMethod.POST
url {
path.decoded = "/connectionDrop"
}
body = "Foo".toHttpBody()
}
val initialCall = client.call(initialReq)
val initialResp = initialCall.response.body.toByteStream()?.decodeToString()
assertEquals("Bar", initialResp)

delay(5.seconds) // Shorter than the client-side timeout

val subsequentReq = HttpRequest {
testSetup()
method = HttpMethod.POST
url {
path.decoded = "/connectionDrop"
}
body = "Foo".toHttpBody()
}
val subsequentCall = client.call(subsequentReq)
val subsequentResp = subsequentCall.response.body.toByteStream()?.decodeToString()
assertEquals("Bar", subsequentResp)
}
}
}

0 comments on commit 3ce4ae7

Please sign in to comment.