diff --git a/.changes/289fe06b-6d65-47da-a007-360398c39244.json b/.changes/289fe06b-6d65-47da-a007-360398c39244.json new file mode 100644 index 000000000..52cb88512 --- /dev/null +++ b/.changes/289fe06b-6d65-47da-a007-360398c39244.json @@ -0,0 +1,8 @@ +{ + "id": "289fe06b-6d65-47da-a007-360398c39244", + "type": "bugfix", + "description": "Correctly handle async cancellation of call context in OkHttp engine", + "issues": [ + "awslabs/smithy-kotlin#1061" + ] +} \ No newline at end of file diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index ea8c8773d..aa65f0aba 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -50,15 +50,19 @@ public class OkHttpEngine( val engineCall = client.newCall(engineRequest) val engineResponse = mapOkHttpExceptions { engineCall.executeAsync() } - callContext.job.invokeOnCompletion { - engineResponse.body.close() - } - val response = engineResponse.toSdkResponse() val requestTime = Instant.fromEpochMilliseconds(engineResponse.sentRequestAtMillis) val responseTime = Instant.fromEpochMilliseconds(engineResponse.receivedResponseAtMillis) - return OkHttpCall(request, response, requestTime, responseTime, callContext, engineCall) + return OkHttpCall(request, response, requestTime, responseTime, callContext, engineCall).also { call -> + callContext.job.invokeOnCompletion { cause -> + // If cause is non-null that means the job was cancelled (CancellationException) or failed (anything + // else). In both cases we need to ensure that the engine-side resources are cleaned up completely + // since they wouldn't otherwise be. https://github.com/smithy-lang/smithy-kotlin/issues/1061 + if (cause != null) call.cancelInFlight() + engineResponse.body.close() + } + } } override fun shutdown() { diff --git a/runtime/protocol/http-client-engines/test-suite/common/test/aws/smithy/kotlin/runtime/http/test/AsyncStressTest.kt b/runtime/protocol/http-client-engines/test-suite/common/test/aws/smithy/kotlin/runtime/http/test/AsyncStressTest.kt index ca0a3b0e8..63faeb120 100644 --- a/runtime/protocol/http-client-engines/test-suite/common/test/aws/smithy/kotlin/runtime/http/test/AsyncStressTest.kt +++ b/runtime/protocol/http-client-engines/test-suite/common/test/aws/smithy/kotlin/runtime/http/test/AsyncStressTest.kt @@ -87,4 +87,45 @@ class AsyncStressTest : AbstractEngineTest() { assertEquals(engineJobsBefore.size, engineJobsAfter.size, message) } } + + @Test + fun testJobCancellation() = testEngines { + // https://github.com/smithy-lang/smithy-kotlin/issues/1061 + + test { _, client -> + val req = HttpRequest { + testSetup() + url.path.decoded = "slow" + } + + // Expect CancellationException because we're cancelling + assertFailsWith { + coroutineScope { + val parentScope = this + val call = client.call(req) + + val bytes = async { + delay(100.milliseconds) + + try { + // The server side is feeding us bytes very slowly. This shouldn't complete before the + // parentScope cancellation happens below. + call.response.body.readAll() + } catch (e: Throwable) { + // "IllegalStateException: Unbalanced enter/exit" will be thrown if body closed improperly + assertIsNot(e) + null + } + } + + val cancellation = async { + delay(400.milliseconds) + parentScope.cancel("Cancelling!") + } + + awaitAll(bytes, cancellation) + } + } + } + } } diff --git a/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/suite/Concurrency.kt b/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/suite/Concurrency.kt index 42de270cd..fad3e7fec 100644 --- a/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/suite/Concurrency.kt +++ b/runtime/protocol/http-client-engines/test-suite/jvm/src/aws/smithy/kotlin/runtime/http/test/suite/Concurrency.kt @@ -8,6 +8,8 @@ package aws.smithy.kotlin.runtime.http.test.suite import io.ktor.server.application.* import io.ktor.server.response.* import io.ktor.server.routing.* +import kotlinx.coroutines.delay +import kotlin.time.Duration.Companion.milliseconds internal fun Application.concurrentTests() { routing { @@ -18,5 +20,17 @@ internal fun Application.concurrentTests() { call.respondText(text.repeat(respSize / text.length)) } } + + route("slow") { + get { + val chunk = ByteArray(256) { it.toByte() } + call.respondOutputStream { + repeat(10) { + delay(200.milliseconds) + write(chunk) + } + } + } + } } }