Skip to content

Commit

Permalink
Enhance support for replayable instances of [InputStream]
Browse files Browse the repository at this point in the history
  • Loading branch information
lauzadis committed Dec 17, 2024
1 parent 7085c8a commit 3c3d092
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,25 @@ public fun ByteStream.Companion.fromInputStream(
* @param contentLength If specified, indicates how many bytes remain in this stream. Defaults to `null`.
*/
public fun InputStream.asByteStream(contentLength: Long? = null): ByteStream.SourceStream {
val source = source()
if (markSupported() && contentLength != null) {
mark(contentLength.toInt())
}

return object : ByteStream.SourceStream() {
override val contentLength: Long? = contentLength
override val isOneShot: Boolean = !markSupported()
override fun readFrom(): SdkSource = source
override fun readFrom(): SdkSource {
if (markSupported() && contentLength != null) {
reset()
mark(contentLength.toInt())
return object : SdkSource by source() {
// no-op close
override fun close() { }
}
}

return source()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package aws.smithy.kotlin.runtime.content

import aws.smithy.kotlin.runtime.io.readToByteArray
import aws.smithy.kotlin.runtime.testing.RandomTempFile
import kotlinx.coroutines.test.runTest
import java.io.BufferedInputStream
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
Expand Down Expand Up @@ -228,6 +231,31 @@ class ByteStreamJVMTest {
assertFalse(sos.closed)
}

// https://github.com/awslabs/aws-sdk-kotlin/issues/1473
@Test
fun testReplayableInputStreamAsByteStream() = runTest {
val content = "Hello, Bytes!".encodeToByteArray()
val byteArrayIns = ByteArrayInputStream(content)
val nonReplayableIns = NonReplayableInputStream(byteArrayIns)

// buffer the non-replayable stream, making it replayable...
val bufferedIns = BufferedInputStream(nonReplayableIns)

val byteStream = bufferedIns.asByteStream(content.size.toLong())

// Test that it can be read at least twice (once for hashing the body, once for transmitting the body)
assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() })
assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() })
}

private class NonReplayableInputStream(val inputStream: InputStream) : InputStream() {
override fun markSupported(): Boolean = false // not replayable

override fun read(): Int = inputStream.read()
override fun mark(readlimit: Int)= inputStream.mark(readlimit)
override fun reset() = inputStream.reset()
}

private class StatusTrackingOutputStream(val os: OutputStream) : OutputStream() {
var closed: Boolean = false

Expand Down

0 comments on commit 3c3d092

Please sign in to comment.