Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add support for idle connection monitoring (opt-in for now) #1171

Merged
merged 8 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/386353e6-e3cc-4561-bfd6-9763d3ac033b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "386353e6-e3cc-4561-bfd6-9763d3ac033b",
"type": "bugfix",
"description": "Add support for connection idle monitoring for OkHttp via the engine config parameter `connectionIdlePollingInterval`. Monitoring is disabled by default to match previous behavior. This monitoring will switch to enabled by default in an upcoming minor version release.",
"issues": [
"awslabs/aws-sdk-kotlin#1214"
]
}
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ ktor-http-cio = { module = "io.ktor:ktor-http-cio", version.ref = "ktor-version"
ktor-utils = { module = "io.ktor:ktor-utils", version.ref = "ktor-version" }
ktor-io = { module = "io.ktor:ktor-io", version.ref = "ktor-version" }
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor-version" }
ktor-server-jetty = { module = "io.ktor:ktor-server-jetty", version.ref = "ktor-version" }
ktor-server-jetty-jakarta = { module = "io.ktor:ktor-server-jetty-jakarta", version.ref = "ktor-version" }
ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor-version" }
ktor-network-tls-certificates = { module = "io.ktor:ktor-network-tls-certificates", version.ref = "ktor-version" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,17 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine$Com
public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl {
public static final field Companion Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Companion;
public synthetic fun <init> (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getConnectionIdlePollingInterval-FghU774 ()Lkotlin/time/Duration;
public final fun getMaxConcurrencyPerHost-pVg5ArA ()I
public fun toBuilderApplicator ()Lkotlin/jvm/functions/Function1;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl$BuilderImpl {
public fun <init> ()V
public final fun getConnectionIdlePollingInterval-FghU774 ()Lkotlin/time/Duration;
public final fun getMaxConcurrencyPerHost-0hXNFcg ()Lkotlin/UInt;
public fun getTelemetryProvider ()Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;
public final fun setConnectionIdlePollingInterval-BwNAW2A (Lkotlin/time/Duration;)V
public final fun setMaxConcurrencyPerHost-ExVfyTY (Lkotlin/UInt;)V
public fun setTelemetryProvider (Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;)V
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.telemetry.logging.logger
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import okhttp3.Call
import okhttp3.Connection
import okhttp3.ConnectionListener
import okhttp3.ExperimentalOkHttpApi
import okhttp3.internal.closeQuietly
import okio.EOFException
import okio.buffer
import okio.source
import java.net.SocketException
import java.net.SocketTimeoutException
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlin.time.measureTime

@OptIn(ExperimentalOkHttpApi::class)
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() {
private val monitors = ConcurrentHashMap<Connection, Job>()

private fun Call.callContext() =
request()
.tag(SdkRequestTag::class.java)
?.callContext
?: Dispatchers.IO

override fun connectionAcquired(connection: Connection, call: Call) {
// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionAcquired` event and will be complete before any future `connectionReleased` event could fire for
// the same connection.
monitors.remove(connection)?.let { monitor ->
val context = call.callContext()
val logger = context.logger<ConnectionIdleMonitor>()
logger.trace { "Cancel monitoring for $connection" }

// Use `runBlocking` because this _must_ finish before OkHttp goes to use the connection
val cancelTime = measureTime {
runBlocking(context) { monitor.cancelAndJoin() }
}
Comment on lines +50 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is measuring the cancel time important here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time spent waiting to cancel is blocking and delays the usage of the connection. Measuring that time and emitting it in a subsequent log message can help users tune their polling interval.


logger.trace { "Monitoring canceled for $connection in $cancelTime" }
}
}

override fun connectionReleased(connection: Connection, call: Call) {
val connId = System.identityHashCode(connection)
val context = call.callContext()
val scope = CoroutineScope(context)
val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
doMonitor(connection)
}
context.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }

// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for
// the same connection.
monitors[connection] = monitor
}

private suspend fun doMonitor(conn: Connection) {
val logger = coroutineContext.logger<ConnectionIdleMonitor>()

val socket = conn.socket()
val source = try {
socket.source()
} catch (_: SocketException) {
logger.trace { "Socket for $conn closed before monitoring started. Skipping polling loop." }
return
}.buffer().peek()

logger.trace { "Commence socket monitoring for $conn" }
var resetTimeout = true
val oldTimeout = socket.soTimeout

try {
socket.soTimeout = pollInterval.inWholeMilliseconds.toInt()

while (coroutineContext.isActive) {
try {
logger.trace { "Polling socket for $conn" }
source.readByte() // Blocking read; will take up to `pollInterval` time to complete
} catch (_: SocketTimeoutException) {
logger.trace { "Socket still alive for $conn" }
} catch (_: EOFException) {
logger.trace { "Socket closed remotely for $conn" }
socket.closeQuietly()
resetTimeout = false
return
}
}

logger.trace { "Monitoring coroutine has been cancelled. Ending polling loop." }
} catch (e: Throwable) {
logger.warn(e) { "Failed to poll $conn. Ending polling loop. Connection may be unstable now." }
} finally {
if (resetTimeout) {
logger.trace { "Attempting to reset soTimeout..." }
try {
conn.socket().soTimeout = oldTimeout
Comment on lines +110 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is it possible for a monitored connection to be pulled and used by OkHttp before we reset the soTimeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort of. We start monitoring when a connection is released after a call and we cancel monitoring when a connection is acquired in preparation for a call. At the point we get the connectionAcquired event, OkHttp has already identified the connection it will use from the pool and run some health checks on it. But as far as I can see (and as far as my testing has shown), OkHttp won't actually send/receive data from the socket until after this event is complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have measurements for how long those health checks last and how long we take to reset the soTimeout? We might have a lot of overhead or maybe not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm this comment, I was just misunderstanding something

} catch (e: Throwable) {
logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." }
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,20 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
readTimeout(config.socketReadTimeout.toJavaDuration())
writeTimeout(config.socketWriteTimeout.toJavaDuration())

// FIXME - register a [ConnectionListener](https://github.com/square/okhttp/blob/master/okhttp/src/jvmMain/kotlin/okhttp3/ConnectionListener.kt#L27)
// when a new okhttp release is cut that contains this abstraction and wireup connection uptime metrics
@OptIn(ExperimentalOkHttpApi::class)
val connectionListener = if (config.connectionIdlePollingInterval == null) {
ConnectionListener.NONE
} else {
ConnectionIdleMonitor(connectionIdlePollingInterval)
}

// use our own pool configured with the timeout settings taken from config
@OptIn(ExperimentalOkHttpApi::class)
val pool = ConnectionPool(
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds,
TimeUnit.MILLISECONDS,
connectionListener = connectionListener,
)
connectionPool(pool)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfigImpl
import aws.smithy.kotlin.runtime.telemetry.Global
import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider
import kotlin.time.Duration

/**
* The configuration parameters for an OkHttp HTTP client engine.
Expand All @@ -28,6 +29,22 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
public val Default: OkHttpEngineConfig = OkHttpEngineConfig(Builder())
}

/**
* The interval in which to poll idle connections for remote closure or `null` to disable monitoring of idle
* connections. The default value is `null`.
*
* When this value is non-`null`, polling is enabled on connections which are released from an engine call and
* enter the connection pool. Polling consists of a loop that performs blocking reads with the socket timeout
* set to [connectionIdlePollingInterval]. Polling is cancelled for a connection when the engine acquires it
* from the pool or when the connection is evicted from the pool and closed. Because the polling loop uses
* blocking reads, an engine call to acquire or close a connection may be delayed by as much as
* [connectionIdlePollingInterval].
*
* When this value is `null`, polling is disabled. Idle connections in the pool which are closed remotely may
* encounter errors when they are acquired for a subsequent call.
*/
public val connectionIdlePollingInterval: Duration? = builder.connectionIdlePollingInterval

/**
* The maximum number of requests to execute concurrently for a single host.
*/
Expand All @@ -37,6 +54,7 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
super.toBuilderApplicator()()

if (this is Builder) {
connectionIdlePollingInterval = [email protected]
maxConcurrencyPerHost = [email protected]
}
}
Expand All @@ -45,6 +63,22 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie
* A builder for [OkHttpEngineConfig]
*/
public class Builder : BuilderImpl() {
/**
* The interval in which to poll idle connections for remote closure or `null` to disable monitoring of idle
* connections. The default value is `null`.
*
* When this value is non-`null`, polling is enabled on connections which are released from an engine call and
* enter the connection pool. Polling consists of a loop that performs blocking reads with the socket timeout
* set to [connectionIdlePollingInterval]. Polling is cancelled for a connection when the engine acquires it
* from the pool or when the connection is evicted from the pool and closed. Because the polling loop uses
* blocking reads, an engine call to acquire or close a connection may be delayed by as much as
* [connectionIdlePollingInterval].
*
* When this value is `null`, polling is disabled. Idle connections in the pool which are closed remotely may
* encounter errors when they are acquired for a subsequent call.
*/
public var connectionIdlePollingInterval: Duration? = null

/**
* The maximum number of requests to execute concurrently for a single host. Defaults to [maxConcurrency].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ kotlin {

jvmMain {
dependencies {
implementation(libs.ktor.server.jetty)
implementation(libs.ktor.server.jetty.jakarta)
implementation(libs.ktor.network.tls.certificates)

implementation(project(":runtime:protocol:http-client-engines:http-client-engine-default"))
Expand Down Expand Up @@ -52,6 +52,8 @@ kotlin {
implementation("org.bouncycastle:bcpkix-jdk18on:1.78") // https://github.com/docker-java/docker-java/pull/2326

implementation(libs.docker.transport.zerodep)

implementation(project(":runtime:observability:telemetry-defaults"))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.test.suite

import io.ktor.server.application.Application
import io.ktor.server.response.respondText
import io.ktor.server.routing.post
import io.ktor.server.routing.route
import io.ktor.server.routing.routing

internal fun Application.connectionTests() {
routing {
route("connectionDrop") {
post {
call.respondText("Bar")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import aws.smithy.kotlin.runtime.http.test.suite.uploadTests
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.jetty.*
import io.ktor.server.jetty.jakarta.Jetty
import io.ktor.server.jetty.jakarta.JettyApplicationEngineBase
import redirectTests
import java.io.Closeable
import java.nio.file.Paths
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds

private data class TestServer(
val port: Int,
Expand Down Expand Up @@ -94,7 +97,7 @@ private fun tlsServer(instance: TestServer, sslConfig: SslConfig): EmbeddedServe
val rootConfig = serverConfig {
module(instance.initializer)
}
val engineConfig: ApplicationEngine.Configuration.() -> Unit = {
val engineConfig: JettyApplicationEngineBase.Configuration.() -> Unit = {
when (instance.type) {
ConnectorType.HTTP -> connector { port = instance.port }

Expand All @@ -109,6 +112,8 @@ private fun tlsServer(instance: TestServer, sslConfig: SslConfig): EmbeddedServe
enabledProtocols = instance.protocolName?.let(::listOf)
}
}

idleTimeout = 3.seconds // Required for ConnectionTest.testShortLivedConnections
}

return try {
Expand All @@ -126,6 +131,7 @@ internal fun Application.testRoutes() {
uploadTests()
concurrentTests()
headerTests()
connectionTests()
}

// configure SSL-only routes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ package aws.smithy.kotlin.runtime.http.test

import aws.smithy.kotlin.runtime.content.decodeToString
import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.engine.okhttp.OkHttpEngineConfig
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.http.request.url
import aws.smithy.kotlin.runtime.http.test.util.*
import aws.smithy.kotlin.runtime.http.test.util.testServers
import aws.smithy.kotlin.runtime.net.TlsVersion
import kotlinx.coroutines.delay
import java.nio.file.Paths
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds

class ConnectionTest : AbstractEngineTest() {
private fun testMinTlsVersion(version: TlsVersion, serverType: ServerType) {
Expand Down Expand Up @@ -79,4 +83,45 @@ class ConnectionTest : AbstractEngineTest() {

@Test
fun testMinTls1_3() = testMinTlsVersion(TlsVersion.TLS_1_3, ServerType.TLS_1_3)

// See https://github.com/awslabs/aws-sdk-kotlin/issues/1214
@Test
fun testShortLivedConnections() = testEngines(
// Only run this test on OkHttp
skipEngines = setOf("CrtHttpEngine", "OkHttp4Engine"),
) {
engineConfig {
this as OkHttpEngineConfig.Builder
connectionIdlePollingInterval = 200.milliseconds
connectionIdleTimeout = 10.seconds // Longer than the server-side timeout
}

test { _, client ->
val initialReq = HttpRequest {
testSetup()
method = HttpMethod.POST
url {
path.decoded = "/connectionDrop"
}
body = "Foo".toHttpBody()
}
val initialCall = client.call(initialReq)
val initialResp = initialCall.response.body.toByteStream()?.decodeToString()
assertEquals("Bar", initialResp)

delay(5.seconds) // Longer than the service side timeout, shorter than the client-side timeout

val subsequentReq = HttpRequest {
testSetup()
method = HttpMethod.POST
url {
path.decoded = "/connectionDrop"
}
body = "Foo".toHttpBody()
}
val subsequentCall = client.call(subsequentReq)
val subsequentResp = subsequentCall.response.body.toByteStream()?.decodeToString()
assertEquals("Bar", subsequentResp)
}
}
}
Loading