Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: http call waiting for idle connection monitor to finish #1178

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ kotlin {

all {
languageSettings.optIn("aws.smithy.kotlin.runtime.InternalApi")
languageSettings.optIn("okhttp3.ExperimentalOkHttpApi")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: We should generally not opt into experimental/internal annotations we don't own across entire modules. Our opt-ins should be narrowly scoped and intentional. Please remove this and add @OptIn(ExperimentalOkHttpApi::class) on the specific methods or lines that require the opt-in.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,31 @@
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.telemetry.logging.logger
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.*
import okhttp3.Call
import okhttp3.Connection
import okhttp3.ConnectionListener
import okhttp3.ExperimentalOkHttpApi
import okhttp3.internal.closeQuietly
import okio.EOFException
import okio.buffer
import okio.source
import java.net.SocketException
import java.net.SocketTimeoutException
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlin.time.measureTime

@OptIn(ExperimentalOkHttpApi::class)
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() {
private val monitorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: HTTP engines have their own coroutine scope. Could we reuse that as a parent scope instead of creating a new one from scratch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, let me test this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing some issues trying to use the HTTP engine scope. We try to wait for the HTTP engine job to complete before calling shutdown. And that make us wait on the connection idle monitor to just fail it seems like, since it's running in a while loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's unfortunate. We could probably structure engine closure a bit better to allow for child scopes/jobs but that's a bigger and riskier change. We can stick with this for now.

private val monitors = ConcurrentHashMap<Connection, Job>()

fun close(): Unit = runBlocking {
monitors.values.forEach { it.cancelAndJoin() }
monitorScope.cancel()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Cancelling/joining child jobs sequentially in a loop is time-consuming. Moreover, the docs say that cancelling a parent job (even a SupervisorJob) should automatically cancel all child jobs. Does this code accomplish the same thing?

fun close(): Unit = runBlocking {
    monitorScope.cancel()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is that we're not waiting for the child jobs to actually finish cancelling (join). So we could be leaving some resources behind.

I think it should improve performance if we just cancel and don't wait for a join. It shouldn't be an issue unless someone is using a lot of clients.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this and time to cancel the connection idle monitor(s) went from ~200ms to ~600us. Now that I think of it, if someone is opening and closing clients very often and with a lot of connections per client then not waiting for the connection monitors to join could be a bigger issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, you're right that could be an issue. So then how about:

fun close(): Unit = runBlocking {
    monitorScope.cancelAndJoin()
}

Copy link
Contributor Author

@0marperez 0marperez Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there's no cancelAndJoin for CoroutineScope, only Job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh jeez you're right. Well the CoroutineScope.cancel() method implementation pretty much just verifies the scope contains a job and then calls cancel on that job. We can do something similar here:

val monitorJob = requireNotNull(monitorScope.coroutineContext[Job]) { "<some error msg>" }
monitorJob.cancelAndJoin()

private fun Call.callContext() =
request()
.tag(SdkRequestTag::class.java)
Expand All @@ -58,21 +56,20 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis

override fun connectionReleased(connection: Connection, call: Call) {
val connId = System.identityHashCode(connection)
val context = call.callContext()
val scope = CoroutineScope(context)
val monitor = scope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
doMonitor(connection)
val callContext = call.callContext()
val monitor = monitorScope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
doMonitor(connection, callContext)
}
context.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }
callContext.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }

// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for
// the same connection.
monitors[connection] = monitor
}

private suspend fun doMonitor(conn: Connection) {
val logger = coroutineContext.logger<ConnectionIdleMonitor>()
private suspend fun doMonitor(conn: Connection, callContext: CoroutineContext) {
val logger = callContext.logger<ConnectionIdleMonitor>()

val socket = conn.socket()
val source = try {
Expand Down Expand Up @@ -111,6 +108,7 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
logger.trace { "Attempting to reset soTimeout..." }
try {
conn.socket().soTimeout = oldTimeout
logger.trace { "SoTimeout reset." }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: soTimeout

} catch (e: Throwable) {
logger.warn(e) { "Failed to reset socket timeout on $conn. Connection may be unstable now." }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import aws.smithy.kotlin.runtime.net.TlsVersion
import aws.smithy.kotlin.runtime.operation.ExecutionContext
import aws.smithy.kotlin.runtime.time.Instant
import aws.smithy.kotlin.runtime.time.fromEpochMilliseconds
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.job
import okhttp3.*
import okhttp3.ConnectionPool
import okhttp3.coroutines.executeAsync
import java.util.concurrent.TimeUnit
import kotlin.time.toJavaDuration
Expand All @@ -44,15 +44,15 @@ public class OkHttpEngine(
}

private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider)
private val client = config.buildClient(metrics)
private val connectionIdleMonitor = if (config.connectionIdlePollingInterval != null) ConnectionIdleMonitor(config.connectionIdlePollingInterval) else null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

private val connectionIdleMonitor = config.connectionIdlePollingInterval?.let { ConnectionIdleMonitor(it) }

private val client = config.buildClientWithConnectionListener(metrics, connectionIdleMonitor)

override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall {
val callContext = callContext()

val engineRequest = request.toOkHttpRequest(context, callContext, metrics)
val engineCall = client.newCall(engineRequest)

@OptIn(ExperimentalCoroutinesApi::class)
val engineResponse = mapOkHttpExceptions { engineCall.executeAsync() }

val response = engineResponse.toSdkResponse()
Expand All @@ -71,17 +71,17 @@ public class OkHttpEngine(
}

override fun shutdown() {
connectionIdleMonitor?.close()
client.connectionPool.evictAll()
client.dispatcher.executorService.shutdown()
metrics.close()
}
}

/**
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
*/
@InternalApi
public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient {
private fun OkHttpEngineConfig.buildClientFromConfig(
metrics: HttpClientMetrics,
poolOverride: ConnectionPool? = null,
): OkHttpClient {
val config = this

return OkHttpClient.Builder().apply {
Expand All @@ -99,22 +99,13 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
readTimeout(config.socketReadTimeout.toJavaDuration())
writeTimeout(config.socketWriteTimeout.toJavaDuration())

@OptIn(ExperimentalOkHttpApi::class)
val connectionListener = if (config.connectionIdlePollingInterval == null) {
ConnectionListener.NONE
} else {
ConnectionIdleMonitor(connectionIdlePollingInterval)
}

// use our own pool configured with the timeout settings taken from config
@OptIn(ExperimentalOkHttpApi::class)
val pool = ConnectionPool(
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds,
TimeUnit.MILLISECONDS,
connectionListener = connectionListener,
)
connectionPool(pool)
connectionPool(poolOverride ?: pool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: With this change we're now constructing a ConnectionPool we'll just throw away if poolOverride is set. We should only construct the OkHttp4-safe pool if poolOverride is null.


val dispatcher = Dispatcher().apply {
maxRequests = config.maxConcurrency.toInt()
Expand Down Expand Up @@ -147,6 +138,33 @@ public fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCli
}.build()
}

/**
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
*/
// Used by OkHttp4Engine - OkHttp4 does NOT have `connectionListener`
// TODO - Refactor in next minor version - Move this to OkHttp4Engine and make it private
@InternalApi
public fun OkHttpEngineConfig.buildClient(
metrics: HttpClientMetrics,
): OkHttpClient = this.buildClientFromConfig(metrics)

/**
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
*/
// Used by OkHttpEngine - OkHttp5 does have `connectionListener`
private fun OkHttpEngineConfig.buildClientWithConnectionListener(
metrics: HttpClientMetrics,
connectionListener: ConnectionIdleMonitor?,
): OkHttpClient = this.buildClientFromConfig(
metrics,
ConnectionPool(
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
keepAliveDuration = this.connectionIdleTimeout.inWholeMilliseconds,
timeUnit = TimeUnit.MILLISECONDS,
connectionListener = connectionListener ?: ConnectionListener.NONE,
),
)

private fun minTlsConnectionSpec(tlsContext: TlsContext): ConnectionSpec {
val minVersion = tlsContext.minVersion ?: TlsVersion.TLS_1_2
val okHttpTlsVersions = SdkTlsVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public class OkHttp4Engine(
}
}
}

override fun shutdown() {
client.connectionPool.evictAll()
client.dispatcher.executorService.shutdown()
metrics.close()
}
Comment on lines +74 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

}

// Copied from okhttp3 5.x:
Expand Down
Loading