-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: add support for idle connection monitoring (opt-in for now) #1171
Changes from 6 commits
3ce4ae7
12b0ce9
2216508
f9523b6
ed8dd3e
9e10451
e2638d7
21aa46d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
{ | ||
"id": "386353e6-e3cc-4561-bfd6-9763d3ac033b", | ||
"type": "bugfix", | ||
"description": "Add support for connection idle monitoring for OkHttp via the engine config parameter `connectionIdlePollingInterval`. Monitoring is disabled by default to match previous behavior. This monitoring will switch to enabled by default in an upcoming minor version release.", | ||
"issues": [ | ||
"awslabs/aws-sdk-kotlin#1214" | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package aws.smithy.kotlin.runtime.http.engine.okhttp | ||
|
||
import aws.smithy.kotlin.runtime.telemetry.logging.logger | ||
import kotlinx.coroutines.CoroutineName | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.Job | ||
import kotlinx.coroutines.cancelAndJoin | ||
import kotlinx.coroutines.isActive | ||
import kotlinx.coroutines.launch | ||
import kotlinx.coroutines.runBlocking | ||
import okhttp3.Call | ||
import okhttp3.Connection | ||
import okhttp3.ConnectionListener | ||
import okhttp3.ExperimentalOkHttpApi | ||
import okhttp3.internal.closeQuietly | ||
import okio.EOFException | ||
import okio.buffer | ||
import okio.source | ||
import java.net.SocketException | ||
import java.net.SocketTimeoutException | ||
import java.util.concurrent.ConcurrentHashMap | ||
import kotlin.coroutines.coroutineContext | ||
import kotlin.time.Duration | ||
import kotlin.time.Duration.Companion.seconds | ||
import kotlin.time.measureTime | ||
|
||
@OptIn(ExperimentalOkHttpApi::class) | ||
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() { | ||
private val monitors = ConcurrentHashMap<Connection, Job>() | ||
|
||
private fun Call.callContext() = | ||
request() | ||
.tag(SdkRequestTag::class.java) | ||
?.callContext | ||
?: Dispatchers.IO | ||
|
||
override fun connectionAcquired(connection: Connection, call: Call) { | ||
// Non-locking map access is okay here because this code will only execute synchronously as part of a | ||
// `connectionAcquired` event and will be complete before any future `connectionReleased` event could fire for | ||
// the same connection. | ||
monitors.remove(connection)?.let { monitor -> | ||
5.seconds | ||
val context = call.callContext() | ||
val logger = context.logger<ConnectionIdleMonitor>() | ||
logger.trace { "Cancel monitoring for $connection" } | ||
|
||
// Use `runBlocking` because this _must_ finish before OkHttp goes to use the connection | ||
val cancelTime = measureTime { | ||
runBlocking(context) { monitor.cancelAndJoin() } | ||
} | ||
Comment on lines
+50
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is measuring the cancel time important here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The time spent waiting to cancel is blocking and delays the usage of the connection. Measuring that time and emitting it in a subsequent log message can help users tune their polling interval. |
||
|
||
logger.trace { "Monitoring canceled for $connection in $cancelTime" } | ||
} | ||
} | ||
|
||
override fun connectionReleased(connection: Connection, call: Call) { | ||
val connId = System.identityHashCode(connection) | ||
val context = call.callContext() | ||
val scope = CoroutineScope(context) | ||
val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) { | ||
doMonitor(connection) | ||
} | ||
context.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" } | ||
|
||
// Non-locking map access is okay here because this code will only execute synchronously as part of a | ||
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for | ||
// the same connection. | ||
monitors[connection] = monitor | ||
} | ||
|
||
private suspend fun doMonitor(conn: Connection) { | ||
val logger = coroutineContext.logger<ConnectionIdleMonitor>() | ||
|
||
val socket = conn.socket() | ||
val source = try { | ||
socket.source() | ||
} catch (_: SocketException) { | ||
logger.trace { "Socket for $conn closed before monitoring started. Skipping polling loop." } | ||
return | ||
}.buffer().peek() | ||
|
||
logger.trace { "Commence socket monitoring for $conn" } | ||
var resetTimeout = true | ||
val oldTimeout = socket.soTimeout | ||
|
||
try { | ||
socket.soTimeout = pollInterval.inWholeMilliseconds.toInt() | ||
|
||
while (coroutineContext.isActive) { | ||
try { | ||
logger.trace { "Polling socket for $conn" } | ||
source.readByte() // Blocking read; will take up to READ_TIMEOUT_MS to complete | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
question/clarification request: I don't see a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, leftover from when the polling interval was a fixed constant instead of a configurable parameter. Will update. |
||
} catch (_: SocketTimeoutException) { | ||
logger.trace { "Socket still alive for $conn" } | ||
// Socket is still alive | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This comment seems unnecessary ( |
||
} catch (_: EOFException) { | ||
logger.trace { "Socket closed remotely for $conn" } | ||
socket.closeQuietly() | ||
resetTimeout = false | ||
return | ||
} | ||
} | ||
|
||
logger.trace { "Monitoring coroutine has been cancelled. Ending polling loop." } | ||
} catch (e: Throwable) { | ||
logger.warn(e) { "Failed to poll $conn. Ending polling loop. Connection may be unstable now." } | ||
} finally { | ||
if (resetTimeout) { | ||
logger.trace { "Attempting to reset soTimeout..." } | ||
try { | ||
conn.socket().soTimeout = oldTimeout | ||
Comment on lines
+110
to
+113
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: Is it possible for a monitored connection to be pulled and used by OkHttp before we reset the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sort of. We start monitoring when a connection is released after a call and we cancel monitoring when a connection is acquired in preparation for a call. At the point we get the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have measurements for how long those health checks last and how long we take to reset the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvm this comment, I was just misunderstanding something |
||
} catch (e: Throwable) { | ||
logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." } | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package aws.smithy.kotlin.runtime.http.test.suite | ||
|
||
import io.ktor.server.application.Application | ||
import io.ktor.server.response.respondText | ||
import io.ktor.server.routing.post | ||
import io.ktor.server.routing.route | ||
import io.ktor.server.routing.routing | ||
|
||
internal fun Application.connectionTests() { | ||
routing { | ||
route("connectionDrop") { | ||
post { | ||
call.respondText("Bar") | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unused value