Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite Blob to facilitate interop with other libs #1211

Merged
merged 5 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions modules/bootstrapped/test/src/smithy4s/BlobSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package smithy4s
import munit._

import java.nio.ByteBuffer

class BlobSpec() extends FunSuite {

test("sameBytesAs works across data structures") {
Expand All @@ -37,13 +36,6 @@ class BlobSpec() extends FunSuite {
)
}

test("ByteBufferBlob.toArray is idempotent, instantiation-wise") {
val blob = Blob(ByteBuffer.wrap("foo".getBytes))
assert(blob.toArray != null)
assert(blob.toArray.eq(blob.toArray))
assertEquals(Blob(blob.toArray), Blob("foo"))
}

test("ByteArrayBlob.hashcode is consistent") {
def makeBlob(str: String) = Blob(str.getBytes)
val blob1 = makeBlob("foo")
Expand All @@ -62,4 +54,13 @@ class BlobSpec() extends FunSuite {
assertNotEquals(blob1.hashCode, blob3.hashCode)
}

test("Concat works as expected") {
val blob = Blob("foo") ++ Blob("bar")
assertEquals(blob.size, 6)
assertEquals(blob(2), 'o'.toByte)
assertEquals(blob(4), 'a'.toByte)
java.util.Arrays
.equals(blob.toArray, "foo".getBytes ++ "bar".getBytes())
}

}
265 changes: 217 additions & 48 deletions modules/core/src/smithy4s/Blob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,93 @@ package smithy4s

import java.nio.ByteBuffer
import java.util.Base64
import scala.collection.immutable.Queue
import java.nio.charset.StandardCharsets
import java.io.OutputStream

// scalafmt: {maxColumn = 120}
sealed trait Blob {
def toArray: Array[Byte]
def buffer: ByteBuffer
def isEmpty: Boolean

def apply(i: Int): Byte
def size: Int
def sameBytesAs(other: Blob): Boolean
final def toArrayBlob: Blob = Blob(toArray)

def foreach(f: Byte => Unit) = {
var i = 0
while (i < size) { f(apply(i)); i += 1 }
}

def foreachWithIndex(f: (Byte, Int) => Unit) = {
var i = 0
while (i < size) { f(apply(i), i); i += 1 }
}

def toArray: Array[Byte] = {
val result = Array.ofDim[Byte](size)
foreachWithIndex((b, i) => result(i) = b)
result
}

def toArrayUnsafe: Array[Byte] = toArray
Baccata marked this conversation as resolved.
Show resolved Hide resolved
def isEmpty: Boolean

def sameBytesAs(other: Blob): Boolean = {
size == other.size && {
var i = 0
var result = true
while (i < size && result) {
result = this(i) == other(i)
i += 1
}
result
}
}

def asByteBuffer(offset: Int, size: Int): ByteBuffer = {
val arr = new Array[Byte](size)
copyToArray(arr, 0, offset, size)
ByteBuffer.wrap(arr)
}
def asByteBuffer: ByteBuffer = asByteBuffer(0, size)
def asByteBufferUnsafe(offset: Int, size: Int): ByteBuffer = asByteBuffer(offset, size)
def asByteBufferUnsafe: ByteBuffer = asByteBuffer(0, size)
daddykotex marked this conversation as resolved.
Show resolved Hide resolved

def copyToArray(xs: Array[Byte], start: Int, offset: Int, size: Int): Unit = {
var i = 0
while (i < size) {
xs(start + i) = apply(offset + i)
i += 1
}
}

def copyToBuffer(buffer: ByteBuffer, offset: Int, size: Int): Int = {
var i = 0
while (i < size && buffer.remaining > 0) {
buffer.put(apply(offset + i))
i += 1
}
i
}

def copyToStream(s: OutputStream, offset: Int, size: Int): Unit = {
var i = 0
while (i < size) {
s.write(apply(offset + i).toInt)
i += 1
}
}

final def toBase64String: String = Base64.getEncoder().encodeToString(toArray)
final def toUTF8String: String = new String(toArray, StandardCharsets.UTF_8)

def concat(other: Blob) =
if (this.isEmpty) other
else
this match {
case qb: Blob.QueueBlob => new Blob.QueueBlob(qb.blobs :+ other, qb.size + other.size)
case b => new Blob.QueueBlob(Queue(b, other), this.size + other.size)
}

final def ++(other: Blob) = concat(other)
}

object Blob {
Expand All @@ -41,78 +117,171 @@ object Blob {
string.getBytes(StandardCharsets.UTF_8)
)

trait Encoder[A] {
def encode(a: A): Blob
}
final class ByteArrayBlob(val arr: Array[Byte]) extends Blob {
Baccata marked this conversation as resolved.
Show resolved Hide resolved

trait Decoder[E, A] {
def decode(blob: Blob): Either[E, A]
}
def apply(i: Int) = arr(i.toInt)

private final class ByteArrayBlob(private val bytes: Array[Byte])
extends Blob {
override def toArray: Array[Byte] = bytes
override def isEmpty: Boolean = bytes.isEmpty
override def size: Int = bytes.length
override def buffer: ByteBuffer =
ByteBuffer.wrap(java.util.Arrays.copyOf(bytes, bytes.length))
override def asByteBuffer(start: Int, size: Int): ByteBuffer =
asByteBufferUnsafe(start, size).asReadOnlyBuffer()

override def toString = {
s"ByteArrayBlob[${Base64.getEncoder().encodeToString(bytes)}]"
override def asByteBufferUnsafe(start: Int, size: Int): ByteBuffer = {
val b = ByteBuffer.wrap(arr, start.toInt, size)
if (start == 0 && size == arr.length) b
else b.slice()
}

override def copyToArray(xs: Array[Byte], start: Int, offset: Int, size: Int): Unit =
System.arraycopy(arr, offset.toInt, xs, start, size)

override def copyToStream(s: OutputStream, offset: Int, size: Int): Unit =
s.write(arr, offset.toInt, size.toInt)

override def copyToBuffer(buffer: ByteBuffer, offset: Int, size: Int): Int = {
val toCopy = buffer.remaining.min(size)
buffer.put(arr, offset.toInt, toCopy)
toCopy
}

override def sameBytesAs(other: Blob): Boolean = other match {
case otherBlob: ByteArrayBlob =>
java.util.Arrays.equals(bytes, otherBlob.bytes)
case otherBlob: ByteBufferBlob =>
ByteBuffer.wrap(bytes).compareTo(otherBlob.buffer) == 0
override def toArray: Array[Byte] = {
val newArray = Array.ofDim[Byte](arr.length)
arr.copyToArray(newArray, 0, arr.length)
newArray
}
override def toArrayUnsafe: Array[Byte] = arr

override def isEmpty: Boolean = arr.isEmpty
override def size: Int = arr.length

override def toString = {
s"ByteArrayBlob[${Base64.getEncoder().encodeToString(arr)}]"
}

override def equals(other: Any): Boolean = {
other.isInstanceOf[ByteArrayBlob] &&
java.util.Arrays.equals(bytes, other.asInstanceOf[ByteArrayBlob].bytes)
java.util.Arrays.equals(arr, other.asInstanceOf[ByteArrayBlob].arr)
}

override def hashCode(): Int = {
var hashCode = 0
var i = 0
while (i < bytes.length) {
hashCode += bytes(i).hashCode()
while (i < arr.length) {
hashCode += arr(i).hashCode()
i += 1
}
hashCode
}
}
private final class ByteBufferBlob(val buffer: ByteBuffer) extends Blob {
override def toString = s"ByteBufferBlob[${buffer.toString()}]"
override def isEmpty: Boolean = !buffer.hasRemaining()
override def size: Int = buffer.remaining()
override def hashCode = buffer.hashCode()
override def sameBytesAs(other: Blob): Boolean = other match {
case otherBlob: ByteBufferBlob =>
buffer.compareTo(otherBlob.buffer) == 0
case otherBlob: ByteArrayBlob =>
buffer.compareTo(ByteBuffer.wrap(otherBlob.toArray)) == 0
final class ByteBufferBlob(val buf: ByteBuffer) extends Blob {
def apply(i: Int) = buf.get(i.toInt)

override def copyToArray(xs: Array[Byte], start: Int, offset: Int, size: Int): Unit = {
val n = buf.duplicate()
n.position(offset.toInt)
n.get(xs, start, size)
()
}

override def toArray: Array[Byte] = {
val arr = Array.ofDim[Byte](buf.remaining())
copyToArray(arr, 0, 0, size)
arr
}

Baccata marked this conversation as resolved.
Show resolved Hide resolved
override def asByteBuffer(offset: Int, size: Int): ByteBuffer =
asByteBufferUnsafe(offset, size).asReadOnlyBuffer()

override def asByteBufferUnsafe(offset: Int, size: Int): ByteBuffer = {
val b = buf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteBuffers are really annoying and hard to work correctly with. This is why I'm skeptical if its worth to give them a designated position in the ADT.

Suggested change
val b = buf
val b = buf.duplicate()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm 😅 I now realize this is possibly my own mistake in scodec-bits. I need to think about this.

https://github.com/scodec/scodec-bits/blob/ad81e86d47dc1859e48e453df5da402bc8f13abb/core/shared/src/main/scala/scodec/bits/ByteVector.scala#L1466-L1474

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this was pretty much copied verbatim from scodecs-bits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in aabf826

if (offset == 0 && b.position() == 0 && size == b.remaining()) b
else {
b.position(offset.toInt)
b.limit(offset.toInt + size)
b.slice()
}
}

override def asByteBufferUnsafe: ByteBuffer = buf

override def copyToBuffer(buffer: ByteBuffer, offset: Int, size: Int): Int = {
val toCopy = buffer.remaining.min(size)
buffer.put(asByteBuffer(offset, toCopy))
toCopy
}

override def toString = s"ByteBufferBlob[${buf.toString()}]"
override def isEmpty: Boolean = !buf.hasRemaining()
override def size: Int = buf.remaining()
override def hashCode = buf.hashCode()

override def equals(other: Any): Boolean = {
other.isInstanceOf[ByteBufferBlob] &&
buffer.compareTo(other.asInstanceOf[ByteBufferBlob].buffer) == 0
buf.compareTo(other.asInstanceOf[ByteBufferBlob].buf) == 0
}
}

final class ArraySliceBlob(val arr: Array[Byte], val offset: Int, val length: Int) extends Blob {

require(
offset >= 0 && offset <= arr.size && length >= 0 && length <= arr.size && offset + length <= arr.size
)
def apply(i: Int): Byte =
if (i >= length) throw new IndexOutOfBoundsException()
else arr((offset + i).toInt)

def size: Int = length
def isEmpty: Boolean = (length == 0)

override def toArray: Array[Byte] = {
val result = Array.ofDim[Byte](length)
arr.copyToArray(arr, offset, length)
result
}

override def hashCode(): Int = {
import util.hashing.MurmurHash3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well that's nice

var h = MurmurHash3.stringHash("ArraySliceBlob")
h = MurmurHash3.mix(h, MurmurHash3.arrayHash(arr))
h = MurmurHash3.mix(h, offset)
MurmurHash3.mixLast(h, length)
}

private var arr: Array[Byte] = null
override def equals(other: Any): Boolean = {
other.isInstanceOf[ArraySliceBlob] && {
val o = other.asInstanceOf[ArraySliceBlob]
offset == o.offset &&
length == o.length &&
java.util.Arrays.equals(arr, o.arr)
}
}

}

def toArray: Array[Byte] = {
if (arr == null) {
this.synchronized {
if (arr == null) {
arr = Array.ofDim[Byte](buffer.remaining())
val _ = buffer.get(arr)
}
final class QueueBlob private[smithy4s] (val blobs: Queue[Blob], val size: Int) extends Blob {
def apply(i: Int): Byte = {
if (i > size) throw new IndexOutOfBoundsException()
Baccata marked this conversation as resolved.
Show resolved Hide resolved
else {
var localIndex = i
var (currentHead, currentTail) = blobs.dequeue
while (localIndex > currentHead.size) {
localIndex = localIndex - currentHead.size
val dq = currentTail.dequeue
currentHead = dq._1
currentTail = dq._2
}
currentHead(localIndex)
}
}
override def foreach(f: Byte => Unit): Unit =
blobs.foreach(
_.foreach(f)
)
override def foreachWithIndex(f: (Byte, Int) => Unit): Unit = {
var i = 0
blobs.foreach { blob =>
blob.foreach { byte => f(byte, i); i = i + 1 }
}
arr
}
def isEmpty: Boolean = size == 0
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,18 @@ private[json] case class JsonPayloadCodecCompilerImpl(
private class JsonPayloadDecoder[A](jcodec: JsonCodec[A])
extends PayloadDecoder[A] {
def decode(blob: Blob): Either[PayloadError, A] = {
val nonEmpty =
if (blob.isEmpty) "{}".getBytes
else blob.toArray
try {
Right {
readFromArray(nonEmpty, jsoniterReaderConfig)(jcodec)
if (blob.isEmpty) readFromString("{}", jsoniterReaderConfig)(jcodec)
else
blob match {
case b: Blob.ByteArrayBlob =>
readFromArray(b.arr, jsoniterReaderConfig)(jcodec)
case b: Blob.ByteBufferBlob =>
readFromByteBuffer(b.buf, jsoniterReaderConfig)(jcodec)
case other =>
readFromArray(other.toArray, jsoniterReaderConfig)(jcodec)
}
}
} catch {
case e: PayloadError => Left(e)
Expand Down