From 3c3d092a8baf2cb09adb942bcf77a1141e5dbc87 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Tue, 17 Dec 2024 10:43:01 -0500 Subject: [PATCH 1/6] Enhance support for replayable instances of [InputStream] --- .../kotlin/runtime/content/ByteStreamJVM.kt | 18 ++++++++++-- .../runtime/content/ByteStreamJVMTest.kt | 28 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt index 1d6124357..10d7e7a77 100644 --- a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt +++ b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt @@ -114,11 +114,25 @@ public fun ByteStream.Companion.fromInputStream( * @param contentLength If specified, indicates how many bytes remain in this stream. Defaults to `null`. */ public fun InputStream.asByteStream(contentLength: Long? = null): ByteStream.SourceStream { - val source = source() + if (markSupported() && contentLength != null) { + mark(contentLength.toInt()) + } + return object : ByteStream.SourceStream() { override val contentLength: Long? = contentLength override val isOneShot: Boolean = !markSupported() - override fun readFrom(): SdkSource = source + override fun readFrom(): SdkSource { + if (markSupported() && contentLength != null) { + reset() + mark(contentLength.toInt()) + return object : SdkSource by source() { + // no-op close + override fun close() { } + } + } + + return source() + } } } diff --git a/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt b/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt index 054387b2e..3925addc2 100644 --- a/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt +++ b/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt @@ -5,8 +5,11 @@ package aws.smithy.kotlin.runtime.content +import aws.smithy.kotlin.runtime.io.readToByteArray import aws.smithy.kotlin.runtime.testing.RandomTempFile import kotlinx.coroutines.test.runTest +import java.io.BufferedInputStream +import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.io.InputStream import java.io.OutputStream @@ -228,6 +231,31 @@ class ByteStreamJVMTest { assertFalse(sos.closed) } + // https://github.com/awslabs/aws-sdk-kotlin/issues/1473 + @Test + fun testReplayableInputStreamAsByteStream() = runTest { + val content = "Hello, Bytes!".encodeToByteArray() + val byteArrayIns = ByteArrayInputStream(content) + val nonReplayableIns = NonReplayableInputStream(byteArrayIns) + + // buffer the non-replayable stream, making it replayable... + val bufferedIns = BufferedInputStream(nonReplayableIns) + + val byteStream = bufferedIns.asByteStream(content.size.toLong()) + + // Test that it can be read at least twice (once for hashing the body, once for transmitting the body) + assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() }) + assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() }) + } + + private class NonReplayableInputStream(val inputStream: InputStream) : InputStream() { + override fun markSupported(): Boolean = false // not replayable + + override fun read(): Int = inputStream.read() + override fun mark(readlimit: Int)= inputStream.mark(readlimit) + override fun reset() = inputStream.reset() + } + private class StatusTrackingOutputStream(val os: OutputStream) : OutputStream() { var closed: Boolean = false From d76cab08e1efcb6bc1a28870ecf966b67e8e69be Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Tue, 17 Dec 2024 10:44:10 -0500 Subject: [PATCH 2/6] docs --- .../test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt b/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt index 3925addc2..e52732b90 100644 --- a/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt +++ b/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt @@ -243,7 +243,7 @@ class ByteStreamJVMTest { val byteStream = bufferedIns.asByteStream(content.size.toLong()) - // Test that it can be read at least twice (once for hashing the body, once for transmitting the body) + // Test that it can be read at least twice (e.g. once for hashing the body, once for transmitting the body) assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() }) assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() }) } From 89630f9b8832e7738f2ea81e1dc093e7499693cc Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Tue, 17 Dec 2024 10:44:25 -0500 Subject: [PATCH 3/6] ktlint --- .../test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt b/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt index e52732b90..e8324fb11 100644 --- a/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt +++ b/runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt @@ -252,7 +252,7 @@ class ByteStreamJVMTest { override fun markSupported(): Boolean = false // not replayable override fun read(): Int = inputStream.read() - override fun mark(readlimit: Int)= inputStream.mark(readlimit) + override fun mark(readlimit: Int) = inputStream.mark(readlimit) override fun reset() = inputStream.reset() } From 81ee3d7b87bd781777b566b5bedb1458e5b0bfff Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Tue, 17 Dec 2024 10:45:00 -0500 Subject: [PATCH 4/6] Changelog --- .changes/2de8162f-e5a0-4618-b00d-8da3cfdbc6a2.json | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changes/2de8162f-e5a0-4618-b00d-8da3cfdbc6a2.json diff --git a/.changes/2de8162f-e5a0-4618-b00d-8da3cfdbc6a2.json b/.changes/2de8162f-e5a0-4618-b00d-8da3cfdbc6a2.json new file mode 100644 index 000000000..fb16fa584 --- /dev/null +++ b/.changes/2de8162f-e5a0-4618-b00d-8da3cfdbc6a2.json @@ -0,0 +1,8 @@ +{ + "id": "2de8162f-e5a0-4618-b00d-8da3cfdbc6a2", + "type": "feature", + "description": "Enhance support for replayable instances of `InputStream`", + "issues": [ + "https://github.com/awslabs/aws-sdk-kotlin/issues/1473" + ] +} \ No newline at end of file From 845852eaa5aaee7167b549d845350263905c3a14 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Tue, 17 Dec 2024 12:43:50 -0500 Subject: [PATCH 5/6] Add more detail to comment about no-op close --- .../src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt index 10d7e7a77..6a7c2a8be 100644 --- a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt +++ b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt @@ -126,7 +126,12 @@ public fun InputStream.asByteStream(contentLength: Long? = null): ByteStream.Sou reset() mark(contentLength.toInt()) return object : SdkSource by source() { - // no-op close + /** + * This is a no-op close to prevent body hashing from closing the underlying InputStream, which causes + * `IOException: Stream closed` on subsequent reads. Consider making [ByteStream.ChannelStream]/[ByteStream.SourceStream] + * (or possibly even [ByteStream] itself) implement [Closeable] to better handle closing streams. + * This should allow us to clean up our usage of [ByteStream.cancel()]. + */ override fun close() { } } } From 1f418c07bd46018e20db05e91f803e92ce2eb6e9 Mon Sep 17 00:00:00 2001 From: Matas Lauzadis Date: Tue, 17 Dec 2024 12:48:11 -0500 Subject: [PATCH 6/6] Not KDocs --- .../jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt index 6a7c2a8be..5647ac15a 100644 --- a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt +++ b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt @@ -126,7 +126,7 @@ public fun InputStream.asByteStream(contentLength: Long? = null): ByteStream.Sou reset() mark(contentLength.toInt()) return object : SdkSource by source() { - /** + /* * This is a no-op close to prevent body hashing from closing the underlying InputStream, which causes * `IOException: Stream closed` on subsequent reads. Consider making [ByteStream.ChannelStream]/[ByteStream.SourceStream] * (or possibly even [ByteStream] itself) implement [Closeable] to better handle closing streams.