Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kn: implement SdkBuffer #1214

Merged
merged 18 commits into from
Jan 10, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ package aws.smithy.kotlin.runtime.crt

import aws.sdk.kotlin.crt.io.MutableBuffer
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.read

internal actual fun transferRequestBody(outgoing: SdkBuffer, dest: MutableBuffer) = outgoing.read(dest.buffer)
30 changes: 15 additions & 15 deletions runtime/runtime-core/api/runtime-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,12 @@ public final class aws/smithy/kotlin/runtime/io/ClosedWriteChannelException : ja
public synthetic fun <init> (Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public class aws/smithy/kotlin/runtime/io/EOFException : java/io/EOFException {
public fun <init> ()V
public fun <init> (Ljava/lang/String;)V
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
}

public final class aws/smithy/kotlin/runtime/io/GzipByteReadChannel : aws/smithy/kotlin/runtime/io/SdkByteReadChannel {
public fun <init> (Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;)V
public fun cancel (Ljava/lang/Throwable;)Z
Expand Down Expand Up @@ -805,6 +811,10 @@ public final class aws/smithy/kotlin/runtime/io/HashingSource : aws/smithy/kotli
}

public final class aws/smithy/kotlin/runtime/io/JavaIOKt {
public static final fun inputStream (Laws/smithy/kotlin/runtime/io/SdkBuffer;)Ljava/io/InputStream;
public static final fun isOpen (Laws/smithy/kotlin/runtime/io/SdkBuffer;)Z
public static final fun outputStream (Laws/smithy/kotlin/runtime/io/SdkBuffer;)Ljava/io/OutputStream;
public static final fun read (Laws/smithy/kotlin/runtime/io/SdkBuffer;Ljava/nio/ByteBuffer;)I
public static final fun sink (Ljava/io/File;)Laws/smithy/kotlin/runtime/io/SdkSink;
public static final fun sink (Ljava/io/OutputStream;)Laws/smithy/kotlin/runtime/io/SdkSink;
public static final fun sink (Ljava/nio/file/Path;)Laws/smithy/kotlin/runtime/io/SdkSink;
Expand All @@ -815,6 +825,7 @@ public final class aws/smithy/kotlin/runtime/io/JavaIOKt {
public static final fun source (Ljava/nio/file/Path;Lkotlin/ranges/LongRange;)Laws/smithy/kotlin/runtime/io/SdkSource;
public static synthetic fun source$default (Ljava/io/File;JJILjava/lang/Object;)Laws/smithy/kotlin/runtime/io/SdkSource;
public static synthetic fun source$default (Ljava/nio/file/Path;JJILjava/lang/Object;)Laws/smithy/kotlin/runtime/io/SdkSource;
public static final fun write (Laws/smithy/kotlin/runtime/io/SdkBuffer;Ljava/nio/ByteBuffer;)I
}

public final class aws/smithy/kotlin/runtime/io/SdkBuffer : aws/smithy/kotlin/runtime/io/SdkBufferedSink, aws/smithy/kotlin/runtime/io/SdkBufferedSource {
Expand All @@ -827,12 +838,8 @@ public final class aws/smithy/kotlin/runtime/io/SdkBuffer : aws/smithy/kotlin/ru
public fun getBuffer ()Laws/smithy/kotlin/runtime/io/SdkBuffer;
public final fun getSize ()J
public fun hashCode ()I
public fun inputStream ()Ljava/io/InputStream;
public fun isOpen ()Z
public fun outputStream ()Ljava/io/OutputStream;
public fun peek ()Laws/smithy/kotlin/runtime/io/SdkBufferedSource;
public fun read (Laws/smithy/kotlin/runtime/io/SdkBuffer;J)J
public fun read (Ljava/nio/ByteBuffer;)I
public fun read ([BII)I
public fun readAll (Laws/smithy/kotlin/runtime/io/SdkSink;)J
public fun readByte ()B
Expand All @@ -852,7 +859,6 @@ public final class aws/smithy/kotlin/runtime/io/SdkBuffer : aws/smithy/kotlin/ru
public fun toString ()Ljava/lang/String;
public fun write (Laws/smithy/kotlin/runtime/io/SdkBuffer;J)V
public fun write (Laws/smithy/kotlin/runtime/io/SdkSource;J)V
public fun write (Ljava/nio/ByteBuffer;)I
public fun write ([BII)V
public fun writeAll (Laws/smithy/kotlin/runtime/io/SdkSource;)J
public fun writeByte (B)V
Expand All @@ -865,11 +871,9 @@ public final class aws/smithy/kotlin/runtime/io/SdkBuffer : aws/smithy/kotlin/ru
public fun writeUtf8 (Ljava/lang/String;II)V
}

public abstract interface class aws/smithy/kotlin/runtime/io/SdkBufferedSink : aws/smithy/kotlin/runtime/io/SdkSink, java/nio/channels/WritableByteChannel {
public abstract interface class aws/smithy/kotlin/runtime/io/SdkBufferedSink : aws/smithy/kotlin/runtime/io/SdkSink {
public abstract fun emit ()V
public abstract fun flush ()V
public abstract fun getBuffer ()Laws/smithy/kotlin/runtime/io/SdkBuffer;
public abstract fun outputStream ()Ljava/io/OutputStream;
public abstract fun write (Laws/smithy/kotlin/runtime/io/SdkSource;J)V
public abstract fun write ([BII)V
public abstract fun writeAll (Laws/smithy/kotlin/runtime/io/SdkSource;)J
Expand All @@ -888,10 +892,9 @@ public final class aws/smithy/kotlin/runtime/io/SdkBufferedSink$DefaultImpls {
public static synthetic fun writeUtf8$default (Laws/smithy/kotlin/runtime/io/SdkBufferedSink;Ljava/lang/String;IIILjava/lang/Object;)V
}

public abstract interface class aws/smithy/kotlin/runtime/io/SdkBufferedSource : aws/smithy/kotlin/runtime/io/SdkSource, java/nio/channels/ReadableByteChannel {
public abstract interface class aws/smithy/kotlin/runtime/io/SdkBufferedSource : aws/smithy/kotlin/runtime/io/SdkSource {
public abstract fun exhausted ()Z
public abstract fun getBuffer ()Laws/smithy/kotlin/runtime/io/SdkBuffer;
public abstract fun inputStream ()Ljava/io/InputStream;
public abstract fun peek ()Laws/smithy/kotlin/runtime/io/SdkBufferedSource;
public abstract fun read ([BII)I
public abstract fun readAll (Laws/smithy/kotlin/runtime/io/SdkSink;)J
Expand Down Expand Up @@ -1023,16 +1026,13 @@ public abstract interface class aws/smithy/kotlin/runtime/io/SdkSource : java/io
public abstract fun read (Laws/smithy/kotlin/runtime/io/SdkBuffer;J)J
}

public final class aws/smithy/kotlin/runtime/io/SdkSourceJVMKt {
public final class aws/smithy/kotlin/runtime/io/SdkSourceKt {
public static final fun readFully (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/io/SdkBuffer;J)V
public static final fun readToByteArray (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun toSdkByteReadChannel (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlinx/coroutines/CoroutineScope;)Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;
public static synthetic fun toSdkByteReadChannel$default (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlinx/coroutines/CoroutineScope;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;
}

public final class aws/smithy/kotlin/runtime/io/SdkSourceKt {
public static final fun readFully (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/io/SdkBuffer;J)V
}

public final class aws/smithy/kotlin/runtime/io/internal/ConvertKt {
public static final fun toOkio (Laws/smithy/kotlin/runtime/io/SdkBuffer;)Lokio/Buffer;
public static final fun toOkio (Laws/smithy/kotlin/runtime/io/SdkSink;)Lokio/Sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package aws.smithy.kotlin.runtime.content
import aws.smithy.kotlin.runtime.io.*
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal expect class BufferedSinkAdapter(sink: okio.BufferedSink) : SdkBuffered
// base class that fills in most of the common implementation, platforms just need to implement the platform specific
// part of the interface
internal abstract class AbstractBufferedSinkAdapter(
protected val delegate: okio.BufferedSink,
internal val delegate: okio.BufferedSink,
) : SdkBufferedSink {
override fun toString(): String = delegate.toString()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,53 +32,76 @@ internal expect class BufferedSourceAdapter(source: okio.BufferedSource) : SdkBu
override fun close()
}

/**
* Used to wrap calls to Okio, catching Okio exceptions (e.g. okio.EOFException) and throwing our own (e.g. aws.smithy.kotlin.runtime.io.EOFException).
*/
internal inline fun <T> SdkBufferedSource.wrapOkio(block: SdkBufferedSource.() -> T): T = try {
block()
} catch (e: okio.EOFException) {
throw EOFException("Okio operation failed: ${e.message}", e)
} catch (e: okio.IOException) {
throw IOException("Okio operation failed: ${e.message}", e)
}

// base class that fills in most of the common implementation, platforms just need to implement the platform specific
// part of the interface
internal abstract class AbstractBufferedSourceAdapter(
protected val delegate: okio.BufferedSource,
internal val delegate: okio.BufferedSource,
) : SdkBufferedSource {
override val buffer: SdkBuffer
get() = delegate.buffer.toSdk()

override fun skip(byteCount: Long): Unit = delegate.skip(byteCount)
override fun skip(byteCount: Long): Unit = wrapOkio { delegate.skip(byteCount) }

override fun readByte(): Byte = delegate.readByte()
override fun readByte(): Byte = wrapOkio { delegate.readByte() }

override fun readShort(): Short = delegate.readShort()
override fun readShort(): Short = wrapOkio { delegate.readShort() }

override fun readShortLe(): Short = delegate.readShortLe()
override fun readShortLe(): Short = wrapOkio { delegate.readShortLe() }

override fun readLong(): Long = delegate.readLong()
override fun readLong(): Long = wrapOkio { delegate.readLong() }

override fun readLongLe(): Long = delegate.readLongLe()
override fun readLongLe(): Long = wrapOkio { delegate.readLongLe() }

override fun readInt(): Int = delegate.readInt()
override fun readInt(): Int = wrapOkio { delegate.readInt() }

override fun readIntLe(): Int = delegate.readIntLe()
override fun readIntLe(): Int = wrapOkio { delegate.readIntLe() }

override fun readAll(sink: SdkSink): Long =
override fun readAll(sink: SdkSink): Long = wrapOkio {
delegate.readAll(sink.toOkio())
}

override fun read(sink: ByteArray, offset: Int, limit: Int): Int =
override fun read(sink: ByteArray, offset: Int, limit: Int): Int = wrapOkio {
delegate.read(sink, offset, limit)
}

override fun read(sink: SdkBuffer, limit: Long): Long =
override fun read(sink: SdkBuffer, limit: Long): Long = wrapOkio {
delegate.read(sink.toOkio(), limit)
}

override fun readByteArray(): ByteArray = delegate.readByteArray()
override fun readByteArray(): ByteArray = wrapOkio { delegate.readByteArray() }

override fun readByteArray(byteCount: Long): ByteArray = delegate.readByteArray(byteCount)
override fun readByteArray(byteCount: Long): ByteArray = wrapOkio {
delegate.readByteArray(byteCount)
}

override fun readUtf8(): String = delegate.readUtf8()
override fun readUtf8(): String = wrapOkio { delegate.readUtf8() }

override fun readUtf8(byteCount: Long): String = delegate.readUtf8(byteCount)
override fun readUtf8(byteCount: Long): String = wrapOkio {
delegate.readUtf8(byteCount)
}

override fun peek(): SdkBufferedSource =
override fun peek(): SdkBufferedSource = wrapOkio {
delegate.peek().toSdk().buffer()
override fun exhausted(): Boolean = delegate.exhausted()
override fun request(byteCount: Long): Boolean = delegate.request(byteCount)
}

override fun exhausted(): Boolean = wrapOkio { delegate.exhausted() }

override fun request(byteCount: Long): Boolean = wrapOkio {
delegate.request(byteCount)
}

override fun require(byteCount: Long): Unit = delegate.require(byteCount)
override fun require(byteCount: Long): Unit = wrapOkio { delegate.require(byteCount) }

override fun close() = delegate.close()
override fun close() = wrapOkio { delegate.close() }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ public expect open class IOException(message: String?, cause: Throwable?) : Exce
public constructor(message: String?)
}

public expect open class EOFException(message: String?) : IOException {
public expect open class EOFException(message: String?, cause: Throwable?) : IOException {
public constructor()
public constructor(message: String?)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package aws.smithy.kotlin.runtime.io

import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.withContext

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
package aws.smithy.kotlin.runtime.io

import aws.smithy.kotlin.runtime.InternalApi
import aws.smithy.kotlin.runtime.io.internal.JobChannel
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

/**
* A source for reading a stream of bytes (e.g. from file, network, or in-memory buffer). Sources may
Expand Down Expand Up @@ -43,16 +51,42 @@ public interface SdkSource : Closeable {
* Consume the [SdkSource] and pull the entire contents into memory as a [ByteArray].
*/
@InternalApi
public expect suspend fun SdkSource.readToByteArray(): ByteArray
public suspend fun SdkSource.readToByteArray(): ByteArray = withContext(SdkDispatchers.IO) {
use { it.buffer().readByteArray() }
}
Comment on lines +54 to +56
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: the only reason I could see these were expect/actual is because of Dispatchers.IO, but we already have an expect/actual version of that called SdkDispatchers, so I've moved these to common and deleted the JVM/Native implementations


/**
* Convert the [SdkSource] to an [SdkByteReadChannel]. Content is read from the source and forwarded
* to the channel.
* @param coroutineScope the coroutine scope to use to launch a background reader channel responsible for propagating data
* between source and the returned channel
*/
@OptIn(DelicateCoroutinesApi::class)
@InternalApi
public expect fun SdkSource.toSdkByteReadChannel(coroutineScope: CoroutineScope? = null): SdkByteReadChannel
public fun SdkSource.toSdkByteReadChannel(coroutineScope: CoroutineScope? = null): SdkByteReadChannel {
val source = this
val ch = JobChannel()
val scope = coroutineScope ?: GlobalScope
val job = scope.launch(SdkDispatchers.IO + CoroutineName("sdk-source-reader")) {
val buffer = SdkBuffer()
val result = runCatching {
source.use {
while (true) {
ensureActive()
val rc = source.read(buffer, DEFAULT_BYTE_CHANNEL_MAX_BUFFER_SIZE.toLong())
if (rc == -1L) break
ch.write(buffer)
}
}
}

ch.close(result.exceptionOrNull())
}

ch.attachJob(job)

return ch
}

/**
* Remove exactly [byteCount] bytes from this source and appends them to [sink].
Expand Down
Loading
Loading