Skip to content

Commit

Permalink
added API to allow encoding and decoding to/from Generic record, to a… (
Browse files Browse the repository at this point in the history
#573)

* added API to allow encoding and decoding to/from Generic record, to allow integration with Schema Registry (AWS Glue)

* fixed imports

* added API to allow encoding and decoding to/from Generic record, to allow integration with Schema Registry (AWS Glue)

* fixed imports

* fix: fixed README

---------

Co-authored-by: John A. De Goes <[email protected]>
  • Loading branch information
devsprint and jdegoes authored Aug 22, 2023
1 parent 9420122 commit 620d57d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ import zio.{ Chunk, Unsafe, ZIO }

object AvroCodec {

implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
new BinaryCodec[A] {
trait ExtendedBinaryCodec[A] extends BinaryCodec[A] {
def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record
def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A]
}

implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): ExtendedBinaryCodec[A] =
new ExtendedBinaryCodec[A] {

val avroSchema: SchemaAvro =
AvroSchemaCodec.encodeToApacheAvro(schema).getOrElse(throw new Exception("Avro schema could not be generated."))
Expand Down Expand Up @@ -59,6 +64,12 @@ object AvroCodec {
decode(chunk).map(Chunk(_))
)
}

override def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record =
encodeValue(value, schema).asInstanceOf[GenericData.Record]

override def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A] =
decodeValue(value, schema)
}

private def decodeValue[A](raw: Any, schema: Schema[A]): Either[DecodeError, A] = schema match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.time.{
}
import java.util.UUID

import org.apache.avro.generic.GenericData

import zio._
import zio.schema.codec.AvroAnnotations.avroEnum
import zio.schema.{ DeriveSchema, Schema }
Expand Down Expand Up @@ -140,7 +142,8 @@ object AvroCodecSpec extends ZIOSpecDefault {
sequenceDecoderSpec,
genericRecordDecoderSpec,
enumDecoderSpec,
streamEncodingDecodingSpec
streamEncodingDecodingSpec,
genericRecordEncodeDecodeSpec
)

private val primitiveEncoderSpec = suite("Avro Codec - Encoder primitive spec")(
Expand Down Expand Up @@ -695,4 +698,13 @@ object AvroCodecSpec extends ZIOSpecDefault {

})

private val genericRecordEncodeDecodeSpec = suite("AvroCodec - encode/decode Generic Record")(
test("Encode/Decode") {
val codec = AvroCodec.schemaBasedBinaryCodec[Record]
val generic: GenericData.Record = codec.encodeGenericRecord(Record("John", 42))
val result = codec.decodeGenericRecord(generic)
assertTrue(result == Right(Record("John", 42)))
}
)

}

0 comments on commit 620d57d

Please sign in to comment.