Skip to content

Commit

Permalink
Added Tests (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored Mar 7, 2019
1 parent 761d076 commit d0e593a
Show file tree
Hide file tree
Showing 19 changed files with 450 additions and 135 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ Always benchmark with your own workload. The result depends heavily on the data

## TODO

* Improve test coverage, docs and examples
* Logical Types
* Schema
* Refactor parsing to be cleaner
Expand Down
15 changes: 15 additions & 0 deletions codec_skip_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package avro

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCreateSkipDecoder_UnsupportedType(t *testing.T) {
schema := NewPrimitiveSchema(Type("test"))

dec := createSkipDecoder(schema)

assert.IsType(t, &errorDecoder{}, dec)
}
53 changes: 53 additions & 0 deletions config_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package avro

import (
"testing"

"github.com/modern-go/reflect2"
"github.com/stretchr/testify/assert"
)

func TestConfig_Freeze(t *testing.T) {
api := Config{
TagKey: "test",
BlockLength: 2,
}.Freeze()
cfg := api.(*frozenConfig)

assert.Equal(t, "test", cfg.getTagKey())
assert.Equal(t, 2, cfg.getBlockLength())
}

func TestConfig_ReusesDecoders(t *testing.T) {
api := Config{
TagKey: "test",
BlockLength: 2,
}.Freeze()
cfg := api.(*frozenConfig)

schema := MustParse(`"long"`)
var long int64
typ := reflect2.TypeOfPtr(&long)

dec1 := cfg.DecoderOf(schema, typ)
dec2 := cfg.DecoderOf(schema, typ)

assert.Equal(t, dec1, dec2)
}

func TestConfig_ReusesEncoders(t *testing.T) {
api := Config{
TagKey: "test",
BlockLength: 2,
}.Freeze()
cfg := api.(*frozenConfig)

schema := MustParse(`"long"`)
var long int64
typ := reflect2.TypeOfPtr(long)

enc1 := cfg.EncoderOf(schema, typ)
enc2 := cfg.EncoderOf(schema, typ)

assert.Equal(t, enc1, enc2)
}
78 changes: 29 additions & 49 deletions container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io"

"github.com/hamba/avro"
"github.com/hamba/avro/internal/bytesx"
)

const (
Expand Down Expand Up @@ -44,7 +45,7 @@ type Header struct {
// Decoder reads and decodes Avro values from a container file.
type Decoder struct {
reader *avro.Reader
resetReader *resetReader
resetReader *bytesx.ResetReader
decoder *avro.Decoder
sync [16]byte

Expand All @@ -58,18 +59,18 @@ func NewDecoder(r io.Reader) (*Decoder, error) {
var h Header
reader.ReadVal(HeaderSchema, &h)
if reader.Error != nil {
return nil, fmt.Errorf("file: unexpected error: %v", reader.Error)
return nil, fmt.Errorf("decoder: unexpected error: %v", reader.Error)
}

if h.Magic != magicBytes {
return nil, errors.New("file: invalid avro file")
return nil, errors.New("decoder: invalid avro file")
}
schema, err := avro.Parse(string(h.Meta[schemaKey]))
if err != nil {
return nil, err
}

decReader := &resetReader{}
decReader := bytesx.NewResetReader([]byte{})

// TODO: File Codecs
// codec, ok := codecs[string(h.Meta[codecKey])]
Expand All @@ -88,7 +89,7 @@ func NewDecoder(r io.Reader) (*Decoder, error) {
// HasNext determines if there is another value to read.
func (d *Decoder) HasNext() bool {
if d.count <= 0 {
count, _ := d.readBlock() // err handled in Error function
count := d.readBlock()
d.count = count
}

Expand All @@ -103,11 +104,7 @@ func (d *Decoder) Decode(v interface{}) error {

d.count--

err := d.decoder.Decode(v)
if err == io.EOF {
return nil
}
return err
return d.decoder.Decode(v)
}

// Error returns the last reader error.
Expand All @@ -119,7 +116,7 @@ func (d *Decoder) Error() error {
return d.reader.Error
}

func (d *Decoder) readBlock() (int64, error) {
func (d *Decoder) readBlock() int64 {
count := d.reader.ReadLong()
size := d.reader.ReadLong()

Expand All @@ -130,13 +127,20 @@ func (d *Decoder) readBlock() (int64, error) {
var sync [16]byte
d.reader.Read(sync[:])
if d.sync != sync && d.reader.Error != io.EOF {
return count, errors.New("file: invalid block")
d.reader.Error = errors.New("decoder: invalid block")
}

if d.reader.Error == io.EOF {
return count, nil
return count
}

// EncoderFunc represents an configuration function for Encoder
type EncoderFunc func(e *Encoder)

// WithBlockLength sets the block length on the encoder.
func WithBlockLength(length int) EncoderFunc {
return func(e *Encoder) {
e.blockLength = length
}
return count, d.reader.Error
}

// Encoder writes Avro container file to an output stream.
Expand All @@ -151,7 +155,7 @@ type Encoder struct {
}

// NewEncoder returns a new encoder that writes to w using schema s.
func NewEncoder(s string, w io.Writer) (*Encoder, error) {
func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
schema, err := avro.Parse(s)
if err != nil {
return nil, err
Expand All @@ -165,25 +169,24 @@ func NewEncoder(s string, w io.Writer) (*Encoder, error) {
schemaKey: []byte(schema.String()),
},
}
_, err = rand.Read(header.Sync[:])
if err != nil {
return nil, err
}

_, _ = rand.Read(header.Sync[:])
writer.WriteVal(HeaderSchema, header)
if writer.Error != nil {
return nil, writer.Error
}

buf := &bytes.Buffer{}

return &Encoder{
e := &Encoder{
writer: writer,
buf: buf,
encoder: avro.NewEncoderForSchema(schema, buf),
sync: header.Sync,
blockLength: 100,
}, nil
}

for _, opt := range opts {
opt(e)
}

return e, nil
}

// Encode writes the Avro encoding of v to the stream.
Expand Down Expand Up @@ -224,26 +227,3 @@ func (e *Encoder) writerBlock() error {
e.buf.Reset()
return e.writer.Flush()
}

type resetReader struct {
buf []byte
head int
tail int
}

func (r *resetReader) Read(p []byte) (int, error) {
if r.head == r.tail {
return 0, io.EOF
}

n := copy(p, r.buf)
r.head += n

return n, nil
}

func (r *resetReader) Reset(buf []byte) {
r.buf = buf
r.head = 0
r.tail = len(buf)
}
124 changes: 124 additions & 0 deletions container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package container_test

import (
"bytes"
"errors"
"os"
"testing"

Expand Down Expand Up @@ -54,6 +55,30 @@ type TestRecord struct {
Bool bool `avro:"bool"`
}

func TestNewDecoder_InvalidHeader(t *testing.T) {
data := []byte{'O', 'b', 'j'}

_, err := container.NewDecoder(bytes.NewReader(data))

assert.Error(t, err)
}

func TestNewDecoder_InvalidMagic(t *testing.T) {
data := []byte{'f', 'o', 'o', 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}

_, err := container.NewDecoder(bytes.NewReader(data))

assert.Error(t, err)
}

func TestNewDecoder_InvalidSchema(t *testing.T) {
data := []byte{'O', 'b', 'j', 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}

_, err := container.NewDecoder(bytes.NewReader(data))

assert.Error(t, err)
}

func TestDecoder(t *testing.T) {
unionStr := "union value"
want := FullRecord{
Expand Down Expand Up @@ -106,6 +131,59 @@ func TestDecoder(t *testing.T) {
assert.Equal(t, 1, count)
}

func TestDecoder_DecodeAvroError(t *testing.T) {
data := []byte{'O', 'b', 'j', 0x01, 0x01, 0x26, 0x16, 'a', 'v', 'r', 'o', '.', 's', 'c', 'h', 'e', 'm', 'a',
0x0c, '"', 'l', 'o', 'n', 'g', '"', 0x00, 0xfb, 0x2b, 0x0f, 0x1a, 0xdd, 0xfd, 0x90, 0x7d, 0x87, 0x12,
0x15, 0x29, 0xd7, 0x1d, 0x1c, 0xdd, 0x02, 0x16, 0xe2, 0xa2, 0xf3, 0xad, 0xad, 0xad, 0xe2, 0xa2, 0xf3,
0xad, 0xad, 0xfb, 0x2b, 0x0f, 0x1a, 0xdd, 0xfd, 0x90, 0x7d, 0x87, 0x12, 0x15, 0x29, 0xd7, 0x1d, 0x1c, 0xdd,
}

dec, _ := container.NewDecoder(bytes.NewReader(data))
_ = dec.HasNext()

var l int64
err := dec.Decode(&l)

assert.Error(t, err)
}

func TestDecoder_DecodeMustCallHasNext(t *testing.T) {
data := []byte{'O', 'b', 'j', 0x01, 0x01, 0x26, 0x16, 'a', 'v', 'r', 'o', '.', 's', 'c', 'h', 'e', 'm', 'a',
0x0c, '"', 'l', 'o', 'n', 'g', '"', 0x00, 0xfb, 0x2b, 0x0f, 0x1a, 0xdd, 0xfd, 0x90, 0x7d, 0x87, 0x12,
0x15, 0x29, 0xd7, 0x1d, 0x1c, 0xdd, 0x02, 0x02, 0x02, 0xfb, 0x2b, 0x0f, 0x1a, 0xdd, 0xfd, 0x90, 0x7d,
0x87, 0x12, 0x15, 0x29, 0xd7, 0x1d, 0x1c, 0xdd,
}

dec, _ := container.NewDecoder(bytes.NewReader(data))

var l int64
err := dec.Decode(&l)

assert.Error(t, err)
}

func TestDecoder_InvalidBlock(t *testing.T) {
data := []byte{'O', 'b', 'j', 0x01, 0x01, 0x26, 0x16, 'a', 'v', 'r', 'o', '.', 's', 'c', 'h', 'e', 'm', 'a',
0x0c, '"', 'l', 'o', 'n', 'g', '"', 0x00, 0xfa, 0x2b, 0x0f, 0x1a, 0xdd, 0xfd, 0x90, 0x7d, 0x87, 0x12,
0x15, 0x29, 0xd7, 0x1d, 0x1c, 0xdd, 0x02, 0x02, 0x02, 0xfb, 0x2b, 0x0f, 0x1a, 0xdd, 0xfd, 0x90, 0x7d,
0x87, 0x12, 0x15, 0x29, 0xd7, 0x1d, 0x1c, 0xdd,
}

dec, _ := container.NewDecoder(bytes.NewReader(data))

dec.HasNext()

assert.Error(t, dec.Error())
}

func TestNewEncoder_InvalidSchema(t *testing.T) {
buf := &bytes.Buffer{}

_, err := container.NewEncoder(``, buf)

assert.Error(t, err)
}

func TestEncoder(t *testing.T) {
unionStr := "union value"
record := FullRecord{
Expand Down Expand Up @@ -147,3 +225,49 @@ func TestEncoder(t *testing.T) {

assert.NoError(t, err)
}

func TestEncoder_EncodeError(t *testing.T) {
buf := &bytes.Buffer{}
enc, _ := container.NewEncoder(`"long"`, buf)

err := enc.Encode("test")

assert.Error(t, err)
}

func TestEncoder_EncodeWritesBlocks(t *testing.T) {
buf := &bytes.Buffer{}
enc, _ := container.NewEncoder(`"long"`, buf, container.WithBlockLength(1))
defer enc.Close()

err := enc.Encode(int64(1))

assert.NoError(t, err)
assert.Equal(t, 61, buf.Len())
}

func TestEncoder_EncodeHandlesWriteBlockError(t *testing.T) {
w := &errorWriter{}
enc, _ := container.NewEncoder(`"long"`, w, container.WithBlockLength(1))
defer enc.Close()

err := enc.Encode(int64(1))

assert.Error(t, err)
}

func TestEncoder_CloseHandlesWriteBlockError(t *testing.T) {
w := &errorWriter{}
enc, _ := container.NewEncoder(`"long"`, w)
_ = enc.Encode(int64(1))

err := enc.Close()

assert.Error(t, err)
}

type errorWriter struct{}

func (*errorWriter) Write(p []byte) (n int, err error) {
return 0, errors.New("test")
}
Loading

0 comments on commit d0e593a

Please sign in to comment.