diff --git a/modules/bootstrapped/test/src/smithy4s/BlobSpec.scala b/modules/bootstrapped/test/src/smithy4s/BlobSpec.scala index 7971bbca7..87a35d2c9 100644 --- a/modules/bootstrapped/test/src/smithy4s/BlobSpec.scala +++ b/modules/bootstrapped/test/src/smithy4s/BlobSpec.scala @@ -19,7 +19,8 @@ package smithy4s import munit._ import java.nio.ByteBuffer - +import java.io.ByteArrayOutputStream +import scala.util.Using class BlobSpec() extends FunSuite { test("sameBytesAs works across data structures") { @@ -37,13 +38,6 @@ class BlobSpec() extends FunSuite { ) } - test("ByteBufferBlob.toArray is idempotent, instantiation-wise") { - val blob = Blob(ByteBuffer.wrap("foo".getBytes)) - assert(blob.toArray != null) - assert(blob.toArray.eq(blob.toArray)) - assertEquals(Blob(blob.toArray), Blob("foo")) - } - test("ByteArrayBlob.hashcode is consistent") { def makeBlob(str: String) = Blob(str.getBytes) val blob1 = makeBlob("foo") @@ -62,4 +56,97 @@ class BlobSpec() extends FunSuite { assertNotEquals(blob1.hashCode, blob3.hashCode) } + test("Concat works as expected") { + val blob = Blob("foo") ++ Blob("bar") + assertEquals(blob.size, 6) + assertEquals(blob(2), 'o'.toByte) + assertEquals(blob(4), 'a'.toByte) + java.util.Arrays + .equals(blob.toArray, "foo".getBytes ++ "bar".getBytes()) + } + + val all = List( + "Queue" -> (Blob("foo") ++ Blob("bar")), + "Array" -> Blob("foobar"), + "Buffer" -> Blob(ByteBuffer.wrap("foobar".getBytes())) + ) + + for { + x <- all + y <- all + } { + test(s"${x._1} and ${y._1} : same bytes") { + assert(x._2.sameBytesAs(y._2)) + } + } + + all.foreach { case (name, data) => + test(s"$name: size") { + assertEquals(data.size, 6) + } + + test(s"$name: index access") { + assertEquals(data(2), 'o'.toByte) + } + + test(s"$name: out of bounds access") { + intercept[IndexOutOfBoundsException] { data(6) } + } + + test(s"$name: utf8String") { + assertEquals(data.toUTF8String, "foobar") + } + + test(s"$name: toArraySliceBlob") { + assertEquals[Blob, Blob]( + data.toArraySliceBlob, + Blob.slice("foobar".getBytes(), 0, 6) + ) + } + + test(s"$name: copyToArray") { + val target = Array.fill[Byte](6)(0) + data.copyToArray(target, 0, 0, data.size) + assert(target.sameElements("foobar".getBytes())) + } + + test(s"$name: copyToBuffer") { + val target = ByteBuffer.wrap(Array.fill[Byte](6)(0)) + data.copyToBuffer(target, 0, data.size) + assert(target.array.sameElements("foobar".getBytes())) + } + + test(s"$name: copyToStream") { + Using.resource(new ByteArrayOutputStream()) { stream => + data.copyToStream(stream, 0, data.size) + assert(stream.toByteArray().sameElements("foobar".getBytes())) + } + } + } + + test("asByteBufferUnsafe") { + val arr = "Hello, world!".getBytes + assert(arr eq Blob.view(ByteBuffer.wrap(arr)).asByteBufferUnsafe.array) + assert( + arr eq Blob.view(ByteBuffer.wrap(arr)).asByteBufferUnsafe.array + ) + } + + test("asByteBufferUnsafe has independent position+limit") { + val bv = Blob.view(ByteBuffer.wrap("Hello, world!".getBytes)) + val bb1 = bv.asByteBufferUnsafe + assertEquals(bb1.position(), 0) + assertEquals(bb1.limit(), 13) + val bb2 = bv.asByteBufferUnsafe + bb2.position(1) + bb2.limit(2) + assertEquals(bb1.position(), 0) + assertEquals(bb1.limit(), 13) + } + + test("Array slice: access") { + val slice = Blob.slice("foobar".getBytes(), 1, 4) + assert(slice.sameBytesAs(Blob("ooba"))) + } + } diff --git a/modules/core/src/smithy4s/Blob.scala b/modules/core/src/smithy4s/Blob.scala index 306133cbc..ecdf16c2c 100644 --- a/modules/core/src/smithy4s/Blob.scala +++ b/modules/core/src/smithy4s/Blob.scala @@ -18,101 +18,241 @@ package smithy4s import java.nio.ByteBuffer import java.util.Base64 +import scala.collection.immutable.Queue import java.nio.charset.StandardCharsets +import java.io.OutputStream +// scalafmt: {maxColumn = 120} +/** + * A Blob represents an arbitrary piece of binary data that fits in memory. + * + * Its underlying data structure enables several types of layouts, as well as efficient concatenation. + */ sealed trait Blob { - def toArray: Array[Byte] - def buffer: ByteBuffer - def isEmpty: Boolean + + def apply(i: Int): Byte def size: Int - def sameBytesAs(other: Blob): Boolean - final def toArrayBlob: Blob = Blob(toArray) + def isEmpty: Boolean + + def foreach(f: Byte => Unit) = { + var i = 0 + while (i < size) { f(apply(i)); i += 1 } + } + + def foreachWithIndex(f: (Byte, Int) => Unit) = { + var i = 0 + while (i < size) { f(apply(i), i); i += 1 } + } + + def toArraySliceBlob: Blob.ArraySliceBlob = + new Blob.ArraySliceBlob(toArrayUnsafe, 0, size) + + def toArray: Array[Byte] = { + val result = Array.ofDim[Byte](size) + foreachWithIndex((b, i) => result(i) = b) + result + } + + def toArrayUnsafe: Array[Byte] = toArray + + def sameBytesAs(other: Blob): Boolean = { + size == other.size && { + var i = 0 + var result = true + while (i < size && result) { + result = this(i) == other(i) + i += 1 + } + result + } + } + + def asByteBuffer(offset: Int, size: Int): ByteBuffer = { + val arr = new Array[Byte](size) + copyToArray(arr, 0, offset, size) + ByteBuffer.wrap(arr) + } + def asByteBuffer: ByteBuffer = asByteBuffer(0, size) + def asByteBufferUnsafe(offset: Int, size: Int): ByteBuffer = asByteBuffer(offset, size) + def asByteBufferUnsafe: ByteBuffer = asByteBuffer(0, size) + + def copyToArray(xs: Array[Byte], start: Int, offset: Int, size: Int): Unit = { + var i = 0 + while (i < size) { + xs(start + i) = apply(offset + i) + i += 1 + } + } + + def copyToBuffer(buffer: ByteBuffer, offset: Int, size: Int): Int = { + var i = 0 + while (i < size && buffer.remaining > 0) { + buffer.put(apply(offset + i)) + i += 1 + } + i + } + + def copyToStream(s: OutputStream, offset: Int, size: Int): Unit = { + var i = 0 + while (i < size) { + s.write(apply(offset + i).toInt) + i += 1 + } + } + final def toBase64String: String = Base64.getEncoder().encodeToString(toArray) final def toUTF8String: String = new String(toArray, StandardCharsets.UTF_8) + + def concat(other: Blob) = + if (this.isEmpty) other + else + this match { + case qb: Blob.QueueBlob => new Blob.QueueBlob(qb.blobs :+ other, qb.size + other.size) + case b => new Blob.QueueBlob(Queue(b, other), this.size + other.size) + } + + final def ++(other: Blob) = concat(other) } object Blob { - val empty: Blob = new Blob.ByteArrayBlob(Array.emptyByteArray) + val empty: Blob = new Blob.ArraySliceBlob(Array.emptyByteArray, 0, 0) - def apply(bytes: Array[Byte]): Blob = new ByteArrayBlob(bytes) - def apply(buffer: ByteBuffer): Blob = new ByteBufferBlob(buffer) - def apply(string: String): Blob = new ByteArrayBlob( - string.getBytes(StandardCharsets.UTF_8) - ) + def apply(bytes: Array[Byte]): Blob = new ArraySliceBlob(bytes, 0, bytes.length) + def apply(string: String): Blob = apply(string.getBytes(StandardCharsets.UTF_8)) + def apply(buffer: ByteBuffer): Blob = view(buffer.duplicate()) - trait Encoder[A] { - def encode(a: A): Blob - } + def slice(bytes: Array[Byte], offset: Int, size: Int): Blob = new ArraySliceBlob(bytes, offset, size) + def view(buffer: ByteBuffer): Blob = new ByteBufferBlob(buffer) + def queue(blobs: Queue[Blob], size: Int) = new QueueBlob(blobs, size) - trait Decoder[E, A] { - def decode(blob: Blob): Either[E, A] - } + final class ByteBufferBlob private[smithy4s] (val buf: ByteBuffer) extends Blob { + def apply(i: Int) = buf.get(i.toInt) - private final class ByteArrayBlob(private val bytes: Array[Byte]) - extends Blob { - override def toArray: Array[Byte] = bytes - override def isEmpty: Boolean = bytes.isEmpty - override def size: Int = bytes.length - override def buffer: ByteBuffer = - ByteBuffer.wrap(java.util.Arrays.copyOf(bytes, bytes.length)) + override def toArraySliceBlob: ArraySliceBlob = if (buf.hasArray()) { + new ArraySliceBlob(buf.array, buf.arrayOffset, size) + } else super.toArraySliceBlob - override def toString = { - s"ByteArrayBlob[${Base64.getEncoder().encodeToString(bytes)}]" + override def copyToArray(xs: Array[Byte], start: Int, offset: Int, size: Int): Unit = { + val n = buf.duplicate() + n.position(offset.toInt) + n.get(xs, start, size) + () } - override def sameBytesAs(other: Blob): Boolean = other match { - case otherBlob: ByteArrayBlob => - java.util.Arrays.equals(bytes, otherBlob.bytes) - case otherBlob: ByteBufferBlob => - ByteBuffer.wrap(bytes).compareTo(otherBlob.buffer) == 0 + override def toArray: Array[Byte] = { + val arr = Array.ofDim[Byte](buf.remaining()) + copyToArray(arr, 0, 0, size) + arr } - override def equals(other: Any): Boolean = { - other.isInstanceOf[ByteArrayBlob] && - java.util.Arrays.equals(bytes, other.asInstanceOf[ByteArrayBlob].bytes) - } + override def asByteBuffer(offset: Int, size: Int): ByteBuffer = + asByteBufferUnsafe(offset, size).asReadOnlyBuffer() - override def hashCode(): Int = { - var hashCode = 0 - var i = 0 - while (i < bytes.length) { - hashCode += bytes(i).hashCode() - i += 1 + override def asByteBufferUnsafe(offset: Int, size: Int): ByteBuffer = { + val b = buf.duplicate() + if (offset == 0 && b.position() == 0 && size == b.remaining()) b + else { + b.position(offset.toInt) + b.limit(offset.toInt + size) + b.slice() } - hashCode } - } - private final class ByteBufferBlob(val buffer: ByteBuffer) extends Blob { - override def toString = s"ByteBufferBlob[${buffer.toString()}]" - override def isEmpty: Boolean = !buffer.hasRemaining() - override def size: Int = buffer.remaining() - override def hashCode = buffer.hashCode() - override def sameBytesAs(other: Blob): Boolean = other match { - case otherBlob: ByteBufferBlob => - buffer.compareTo(otherBlob.buffer) == 0 - case otherBlob: ByteArrayBlob => - buffer.compareTo(ByteBuffer.wrap(otherBlob.toArray)) == 0 + + override def asByteBufferUnsafe: ByteBuffer = buf.duplicate() + + override def copyToBuffer(buffer: ByteBuffer, offset: Int, size: Int): Int = { + val toCopy = buffer.remaining.min(size) + buffer.put(asByteBuffer(offset, toCopy)) + toCopy } + override def toString = s"ByteBufferBlob(...)" + override def isEmpty: Boolean = !buf.hasRemaining() + override def size: Int = buf.remaining() + override def hashCode = buf.hashCode() + override def equals(other: Any): Boolean = { other.isInstanceOf[ByteBufferBlob] && - buffer.compareTo(other.asInstanceOf[ByteBufferBlob].buffer) == 0 + buf.compareTo(other.asInstanceOf[ByteBufferBlob].buf) == 0 + } + } + + final class ArraySliceBlob private[smithy4s] (val arr: Array[Byte], val offset: Int, val length: Int) extends Blob { + + override def toArraySliceBlob: ArraySliceBlob = this + + require( + offset >= 0 && offset <= arr.size && length >= 0 && length <= arr.size && offset + length <= arr.size + ) + + def apply(i: Int): Byte = { + if (i >= length) { + throw new IndexOutOfBoundsException() + } else arr(offset + i) + } + + def size: Int = length + def isEmpty: Boolean = (length == 0) + + override def toArray: Array[Byte] = { + val result = Array.ofDim[Byte](length) + arr.copyToArray(result, offset, length) + result } - private var arr: Array[Byte] = null + override def toArrayUnsafe: Array[Byte] = if (arr.length == length && offset == 0) arr else toArray + + override def toString(): String = s"ArraySliceBlob(..., $offset, $length)" - def toArray: Array[Byte] = { - if (arr == null) { - this.synchronized { - if (arr == null) { - arr = Array.ofDim[Byte](buffer.remaining()) - val _ = buffer.get(arr) - } + override def hashCode(): Int = { + import util.hashing.MurmurHash3 + var h = MurmurHash3.stringHash("ArraySliceBlob") + h = MurmurHash3.mix(h, MurmurHash3.arrayHash(arr)) + h = MurmurHash3.mix(h, offset) + MurmurHash3.mixLast(h, length) + } + + override def equals(other: Any): Boolean = { + other.isInstanceOf[ArraySliceBlob] && { + val o = other.asInstanceOf[ArraySliceBlob] + offset == o.offset && + length == o.length && + java.util.Arrays.equals(arr, o.arr) + } + } + + } + + final class QueueBlob private[smithy4s] (val blobs: Queue[Blob], val size: Int) extends Blob { + def apply(i: Int): Byte = { + if (i >= size) throw new IndexOutOfBoundsException() + else { + var localIndex = i + var (currentHead, currentTail) = blobs.dequeue + while (localIndex >= currentHead.size) { + localIndex = localIndex - currentHead.size + val dq = currentTail.dequeue + currentHead = dq._1 + currentTail = dq._2 } + currentHead(localIndex) } - arr } + override def foreach(f: Byte => Unit): Unit = + blobs.foreach( + _.foreach(f) + ) + override def foreachWithIndex(f: (Byte, Int) => Unit): Unit = { + var i = 0 + blobs.foreach { blob => + blob.foreach { byte => f(byte, i); i = i + 1 } + } + } + def isEmpty: Boolean = size == 0 + + override def toString(): String = s"QueueBlob(..., $size)" } } diff --git a/modules/json/src/smithy4s/json/internals/JsonPayloadCodecCompilerImpl.scala b/modules/json/src/smithy4s/json/internals/JsonPayloadCodecCompilerImpl.scala index 728e831fd..5992399a4 100644 --- a/modules/json/src/smithy4s/json/internals/JsonPayloadCodecCompilerImpl.scala +++ b/modules/json/src/smithy4s/json/internals/JsonPayloadCodecCompilerImpl.scala @@ -81,12 +81,23 @@ private[json] case class JsonPayloadCodecCompilerImpl( private class JsonPayloadDecoder[A](jcodec: JsonCodec[A]) extends PayloadDecoder[A] { def decode(blob: Blob): Either[PayloadError, A] = { - val nonEmpty = - if (blob.isEmpty) "{}".getBytes - else blob.toArray try { Right { - readFromArray(nonEmpty, jsoniterReaderConfig)(jcodec) + if (blob.isEmpty) readFromString("{}", jsoniterReaderConfig)(jcodec) + else + blob match { + case b: Blob.ArraySliceBlob => + readFromSubArray( + b.arr, + b.offset, + b.offset + b.size, + jsoniterReaderConfig + )(jcodec) + case b: Blob.ByteBufferBlob => + readFromByteBuffer(b.buf, jsoniterReaderConfig)(jcodec) + case other => + readFromArray(other.toArray, jsoniterReaderConfig)(jcodec) + } } } catch { case e: PayloadError => Left(e)