diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..11f0a0d --- /dev/null +++ b/buffer.go @@ -0,0 +1,176 @@ +package cellar + +import ( + "bufio" + "crypto/cipher" + "io" + "log" + "os" + "path" + + "github.com/pierrec/lz4" + "github.com/pkg/errors" +) + +type Buffer struct { + fileName string + maxBytes int64 + startPos int64 + + records int64 + pos int64 + + writer *bufio.Writer + stream *os.File +} + +func openBuffer(d *BufferDto, folder string) (*Buffer, error) { + + if len(d.FileName) == 0 { + return nil, errors.New("empty filename") + } + + fullPath := path.Join(folder, d.FileName) + + f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return nil, errors.Wrap(err, "Open file") + } + f.Truncate(int64(d.MaxBytes)) + if _, err := f.Seek(int64(d.Pos), io.SeekStart); err != nil { + return nil, errors.Wrap(err, "Seek") + } + + b := &Buffer{ + fileName: d.FileName, + startPos: d.StartPos, + maxBytes: d.MaxBytes, + pos: d.Pos, + records: d.Records, + stream: f, + writer: bufio.NewWriter(f), + } + return b, nil +} + +func (b *Buffer) getState() *BufferDto { + return &BufferDto{ + FileName: b.fileName, + MaxBytes: b.maxBytes, + StartPos: b.startPos, + Pos: b.pos, + Records: b.records, + } +} + +func (b *Buffer) fits(bytes int64) bool { + return (b.pos + bytes) <= b.maxBytes +} + +func (b *Buffer) writeBytes(bs []byte) error { + if _, err := b.writer.Write(bs); err != nil { + return errors.Wrap(err, "Write") + } + b.pos += int64(len(bs)) + return nil +} + +func (b *Buffer) endRecord() { + b.records++ +} + +func (b *Buffer) flush() error { + if err := b.writer.Flush(); err != nil { + return errors.Wrap(err, "Flush") + } + return nil +} + +func (b *Buffer) close() error { + if b.stream == nil { + return nil + } + var err error + if err = b.stream.Close(); err != nil { + return errors.Wrap(err, "stream.Close") + } + b.stream = nil + return nil +} + +func (b *Buffer) compress(key []byte) (dto *ChunkDto, err error) { + + loc := b.stream.Name() + ".lz4" + + if err = b.writer.Flush(); err != nil { + log.Panicf("Failed to flush buffer: %s", err) + } + if err = b.stream.Sync(); err != nil { + log.Panicf("Failed to Fsync buffer: %s", err) + } + + if _, err = b.stream.Seek(0, io.SeekStart); err != nil { + log.Panicf("Failed to seek to 0 in buffer: %s", err) + } + + // create chunk file + var chunkFile *os.File + if chunkFile, err = os.Create(loc); err != nil { + return nil, errors.Wrap(err, "os.Create") + } + + defer func() { + if err := chunkFile.Sync(); err != nil { + panic("Failed to sync") + } + if err := chunkFile.Close(); err != nil { + panic("Failed to close") + } + }() + + // buffer writes to file + buffer := bufio.NewWriter(chunkFile) + + defer buffer.Flush() + + // encrypt before buffering + var encryptor *cipher.StreamWriter + if encryptor, err = chainEncryptor(key, buffer); err != nil { + log.Panicf("Failed to chain encryptor for %s: %s", loc, err) + } + + defer encryptor.Close() + + // compress before encrypting + zw := lz4.NewWriter(encryptor) + zw.Header.HighCompression = true + + defer func() { + if err := zw.Close(); err != nil { + panic("Failed to close zw") + } + }() + + // copy chunk to the chain + if _, err = io.CopyN(zw, b.stream, b.pos); err != nil { + return nil, errors.Wrap(err, "CopyN") + } + + zw.Flush() + chunkFile.Sync() + b.close() + + var size int64 + if size, err = chunkFile.Seek(0, io.SeekEnd); err != nil { + return nil, errors.Wrap(err, "Seek") + } + + dto = &ChunkDto{ + FileName: b.fileName + ".lz4", + Records: b.records, + UncompressedByteSize: b.pos, + StartPos: b.startPos, + CompressedDiskSize: size, + } + return dto, nil +} diff --git a/chains.go b/chains.go new file mode 100644 index 0000000..ed0242d --- /dev/null +++ b/chains.go @@ -0,0 +1,62 @@ +package cellar + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "io" + "log" + + "github.com/pierrec/lz4" + "github.com/pkg/errors" +) + +func chainCompressor(w io.Writer) (*lz4.Writer, error) { + zw := lz4.NewWriter(w) + zw.Header.HighCompression = true + return zw, nil +} + +func chainDecryptor(key []byte, src io.Reader) (io.Reader, error) { + var ( + block cipher.Block + err error + ) + if block, err = aes.NewCipher(key); err != nil { + log.Panic("Failed to create a new cipher from the key") + } + + iv := make([]byte, aes.BlockSize) + + if _, err = src.Read(iv); err != nil { + return nil, errors.Wrap(err, "Failed to read IV") + } + + stream := cipher.NewCFBDecrypter(block, iv) + reader := &cipher.StreamReader{R: src, S: stream} + return reader, nil +} + +func chainEncryptor(key []byte, w io.Writer) (*cipher.StreamWriter, error) { + + var ( + block cipher.Block + err error + ) + if block, err = aes.NewCipher(key); err != nil { + log.Panic("Failed to create a new cipher from the key") + } + + iv := make([]byte, aes.BlockSize) + if _, err = io.ReadFull(rand.Reader, iv); err != nil { + panic(err) + } + + if _, err = w.Write(iv); err != nil { + return nil, errors.Wrap(err, "Write") + } + stream := cipher.NewCFBEncrypter(block, iv) + + writer := &cipher.StreamWriter{S: stream, W: w} + return writer, nil +} diff --git a/dto.pb.go b/dto.pb.go new file mode 100644 index 0000000..63aeb24 --- /dev/null +++ b/dto.pb.go @@ -0,0 +1,95 @@ +// Code generated by protoc-gen-go. +// source: dto.proto +// DO NOT EDIT! + +/* +Package cellar is a generated protocol buffer package. + +It is generated from these files: + dto.proto + +It has these top-level messages: + ChunkDto + BufferDto + MetaDto +*/ +package cellar + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type ChunkDto struct { + UncompressedByteSize int64 `protobuf:"varint,1,opt,name=uncompressedByteSize" json:"uncompressedByteSize,omitempty"` + CompressedDiskSize int64 `protobuf:"varint,2,opt,name=compressedDiskSize" json:"compressedDiskSize,omitempty"` + Records int64 `protobuf:"varint,3,opt,name=records" json:"records,omitempty"` + FileName string `protobuf:"bytes,4,opt,name=fileName" json:"fileName,omitempty"` + StartPos int64 `protobuf:"varint,5,opt,name=startPos" json:"startPos,omitempty"` +} + +func (m *ChunkDto) Reset() { *m = ChunkDto{} } +func (m *ChunkDto) String() string { return proto.CompactTextString(m) } +func (*ChunkDto) ProtoMessage() {} +func (*ChunkDto) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type BufferDto struct { + StartPos int64 `protobuf:"varint,1,opt,name=startPos" json:"startPos,omitempty"` + MaxBytes int64 `protobuf:"varint,2,opt,name=maxBytes" json:"maxBytes,omitempty"` + Records int64 `protobuf:"varint,3,opt,name=records" json:"records,omitempty"` + Pos int64 `protobuf:"varint,4,opt,name=pos" json:"pos,omitempty"` + FileName string `protobuf:"bytes,5,opt,name=fileName" json:"fileName,omitempty"` +} + +func (m *BufferDto) Reset() { *m = BufferDto{} } +func (m *BufferDto) String() string { return proto.CompactTextString(m) } +func (*BufferDto) ProtoMessage() {} +func (*BufferDto) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type MetaDto struct { + MaxKeySize int64 `protobuf:"varint,1,opt,name=maxKeySize" json:"maxKeySize,omitempty"` + MaxValSize int64 `protobuf:"varint,2,opt,name=maxValSize" json:"maxValSize,omitempty"` +} + +func (m *MetaDto) Reset() { *m = MetaDto{} } +func (m *MetaDto) String() string { return proto.CompactTextString(m) } +func (*MetaDto) ProtoMessage() {} +func (*MetaDto) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func init() { + proto.RegisterType((*ChunkDto)(nil), "cellar.ChunkDto") + proto.RegisterType((*BufferDto)(nil), "cellar.BufferDto") + proto.RegisterType((*MetaDto)(nil), "cellar.MetaDto") +} + +func init() { proto.RegisterFile("dto.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 246 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x91, 0xcd, 0x4a, 0xc3, 0x40, + 0x10, 0xc7, 0x59, 0x63, 0x3f, 0x32, 0x27, 0x59, 0x3c, 0x2c, 0x1e, 0xa4, 0xe4, 0xd4, 0x53, 0x0f, + 0xfa, 0x06, 0xb5, 0x17, 0x11, 0x45, 0x22, 0x78, 0x5f, 0x93, 0x09, 0x86, 0xee, 0x76, 0xc2, 0xce, + 0x06, 0x5a, 0x5f, 0xc1, 0x97, 0xf2, 0xd1, 0x64, 0x97, 0x1a, 0x37, 0x50, 0x7a, 0xcb, 0xff, 0x0b, + 0x7e, 0x99, 0x85, 0xbc, 0xf6, 0xb4, 0xea, 0x1c, 0x79, 0x92, 0xd3, 0x0a, 0x8d, 0xd1, 0xae, 0xf8, + 0x11, 0x30, 0x7f, 0xf8, 0xec, 0x77, 0xdb, 0x8d, 0x27, 0x79, 0x07, 0xd7, 0xfd, 0xae, 0x22, 0xdb, + 0x39, 0x64, 0xc6, 0x7a, 0x7d, 0xf0, 0xf8, 0xd6, 0x7e, 0xa1, 0x12, 0x0b, 0xb1, 0xcc, 0xca, 0x93, + 0x99, 0x5c, 0x81, 0xfc, 0x77, 0x37, 0x2d, 0x6f, 0xe3, 0xe2, 0x22, 0x2e, 0x4e, 0x24, 0x52, 0xc1, + 0xcc, 0x61, 0x45, 0xae, 0x66, 0x95, 0xc5, 0xd2, 0x9f, 0x94, 0x37, 0x30, 0x6f, 0x5a, 0x83, 0x2f, + 0xda, 0xa2, 0xba, 0x5c, 0x88, 0x65, 0x5e, 0x0e, 0x3a, 0x64, 0xec, 0xb5, 0xf3, 0xaf, 0xc4, 0x6a, + 0x12, 0x67, 0x83, 0x2e, 0xbe, 0x05, 0xe4, 0xeb, 0xbe, 0x69, 0xd0, 0x85, 0x7f, 0x48, 0x9b, 0x62, + 0xdc, 0x0c, 0x99, 0xd5, 0xfb, 0x80, 0xce, 0x47, 0xc2, 0x41, 0x9f, 0xe1, 0xba, 0x82, 0xac, 0x23, + 0x8e, 0x48, 0x59, 0x19, 0x3e, 0x47, 0xa4, 0x93, 0x31, 0x69, 0xf1, 0x08, 0xb3, 0x67, 0xf4, 0x3a, + 0xa0, 0xdc, 0x02, 0x58, 0xbd, 0x7f, 0xc2, 0x43, 0x72, 0xc4, 0xc4, 0x39, 0xe6, 0xef, 0xda, 0x24, + 0x27, 0x4b, 0x9c, 0x8f, 0x69, 0x7c, 0xaa, 0xfb, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8a, 0x58, + 0x12, 0xdc, 0xb7, 0x01, 0x00, 0x00, +} diff --git a/lmdb.go b/lmdb.go new file mode 100644 index 0000000..32622d3 --- /dev/null +++ b/lmdb.go @@ -0,0 +1,150 @@ +package cellar + +import ( + "bytes" + "encoding/binary" + "log" + + "github.com/abdullin/lex-go/tuple" + "github.com/abdullin/mdb" + "github.com/bmatsuo/lmdb-go/lmdb" + "github.com/bmatsuo/lmdb-go/lmdbscan" + proto "github.com/golang/protobuf/proto" + "github.com/pkg/errors" +) + +const ( + BufferTable byte = 3 + ChunkTable byte = 1 + MetaTable byte = 2 + CellarTable byte = 4 + UserIndexTable byte = 5 +) + +func lmdbAddChunk(tx *mdb.Tx, chunkStartPos int64, dto *ChunkDto) error { + key := mdb.CreateKey(ChunkTable, chunkStartPos) + + if err := tx.PutProto(key, dto); err != nil { + return errors.Wrap(err, "PutProto") + } + + log.Printf("Added chunk %s with %d records and %d bytes", dto.FileName, dto.Records, dto.UncompressedByteSize) + return nil +} + +func lmdbListChunks(tx *mdb.Tx) ([]*ChunkDto, error) { + + tpl := mdb.CreateKey(ChunkTable) + + scanner := lmdbscan.New(tx.Tx, tx.DB) + + defer scanner.Close() + scanner.Set(tpl, nil, lmdb.SetRange) + + var chunks []*ChunkDto + + for scanner.Scan() { + key := scanner.Key() + + if !bytes.HasPrefix(key, tpl) { + break + } + + var chunk = &ChunkDto{} + val := scanner.Val() + if err := proto.Unmarshal(val, chunk); err != nil { + return nil, errors.Wrapf(err, "Unmarshal %x at %x", val, key) + } + + chunks = append(chunks, chunk) + } + + if err := scanner.Err(); err != nil { + return nil, errors.Wrap(err, "Scanner.Scan") + } + return chunks, nil +} + +func lmdbPutBuffer(tx *mdb.Tx, dto *BufferDto) error { + tpl := tuple.Tuple([]tuple.Element{BufferTable}) + + key := tpl.Pack() + var val []byte + var err error + + if val, err = proto.Marshal(dto); err != nil { + return errors.Wrap(err, "Marshal") + } + if err = tx.Put(key, val); err != nil { + return errors.Wrap(err, "tx.Put") + } + return nil +} + +func lmdbGetBuffer(tx *mdb.Tx) (*BufferDto, error) { + + tpl := tuple.Tuple([]tuple.Element{BufferTable}) + key := tpl.Pack() + var data []byte + var err error + + if data, err = tx.Get(key); err != nil { + return nil, errors.Wrap(err, "tx.Get") + } + if data == nil { + return nil, nil + } + dto := &BufferDto{} + if err = proto.Unmarshal(data, dto); err != nil { + return nil, errors.Wrap(err, "Unmarshal") + } + return dto, nil +} + +func lmdbIndexPosition(tx *mdb.Tx, stream string, k uint64, pos int64) error { + tpl := tuple.Tuple([]tuple.Element{MetaTable, stream, k}) + key := tpl.Pack() + var err error + + buf := make([]byte, binary.MaxVarintLen64) + + n := binary.PutVarint(buf, pos) + if err = tx.Put(key, buf[0:n]); err != nil { + return errors.Wrap(err, "tx.Put") + } + return nil +} + +func lmdbLookupPosition(tx *mdb.Tx, stream string, k uint64) (int64, error) { + + tpl := tuple.Tuple([]tuple.Element{MetaTable, stream, k}) + key := tpl.Pack() + var err error + + var val []byte + if val, err = tx.Get(key); err != nil { + return 0, errors.Wrap(err, "tx.Get") + } + var pos int64 + + pos, _ = binary.Varint(val) + return pos, nil +} + +func lmdbSetCellarMeta(tx *mdb.Tx, m *MetaDto) error { + key := mdb.CreateKey(CellarTable) + return tx.PutProto(key, m) +} + +func lmdbGetCellarMeta(tx *mdb.Tx) (*MetaDto, error) { + + key := mdb.CreateKey(CellarTable) + dto := &MetaDto{} + var err error + + if err = tx.ReadProto(key, dto); err != nil { + return nil, errors.Wrap(err, "ReadProto") + } + return dto, nil + +} diff --git a/reader.go b/reader.go new file mode 100644 index 0000000..4d3fb54 --- /dev/null +++ b/reader.go @@ -0,0 +1,287 @@ +package cellar + +import ( + "encoding/binary" + "io" + "log" + "os" + "path" + + "github.com/abdullin/mdb" + "github.com/pierrec/lz4" + "github.com/pkg/errors" +) + +type ReadFlag int + +const ( + RF_None ReadFlag = 0 + RF_LoadBuffer ReadFlag = 1 << 1 + RF_PrintChunks ReadFlag = 1 << 2 +) + +type Reader struct { + Folder string + Key []byte + Flags ReadFlag + StartPos int64 + EndPos int64 + LimitChunks int +} + +func NewReader(folder string, key []byte) *Reader { + return &Reader{folder, key, RF_LoadBuffer, 0, 0, 0} +} + +type ReaderInfo struct { + // can be used to convert to file name + ChunkPos int64 + // global start pos + StartPos int64 + // global read pos + NextPos int64 +} + +type ReadOp func(pos *ReaderInfo, data []byte) error + +func (r *Reader) ReadDB(op mdb.TxOp) error { + var db *mdb.DB + var err error + + cfg := mdb.NewConfig() + if db, err = mdb.New(r.Folder, cfg); err != nil { + return errors.Wrap(err, "mdb.New") + } + + defer db.Close() + + return db.Read(op) +} + +func (r *Reader) Scan(op ReadOp) error { + + var db *mdb.DB + var err error + + cfg := mdb.NewConfig() + if db, err = mdb.New(r.Folder, cfg); err != nil { + return errors.Wrap(err, "mdb.New") + } + + defer db.Close() + + var b *BufferDto + var meta *MetaDto + var chunks []*ChunkDto + + loadBuffer := (r.Flags & RF_LoadBuffer) == RF_LoadBuffer + printChunks := (r.Flags & RF_PrintChunks) == RF_PrintChunks + + err = db.Read(func(tx *mdb.Tx) error { + var err error + if b, err = lmdbGetBuffer(tx); err != nil { + return errors.Wrap(err, "lmdbGetBuffer") + } + if meta, err = lmdbGetCellarMeta(tx); err != nil { + return errors.Wrap(err, "lmdbGetCellarMeta") + } + if chunks, err = lmdbListChunks(tx); err != nil { + return errors.Wrap(err, "lmdbListChunks") + } + return nil + + }) + + if err != nil { + return errors.Wrap(err, "db.Read") + } + + if b == nil && len(chunks) == 0 { + return nil + } + + info := &ReaderInfo{} + + log.Printf("Found %d chunks and limit is %d", len(chunks), r.LimitChunks) + + if len(chunks) > 0 { + + if r.LimitChunks > 0 && len(chunks) > r.LimitChunks { + log.Printf("Truncating input from %d to %d chunks", len(chunks), r.LimitChunks) + chunks = chunks[:r.LimitChunks] + } + + for i, c := range chunks { + + endPos := c.StartPos + c.UncompressedByteSize + + if r.StartPos != 0 && endPos < r.StartPos { + // skip chunk if it ends before range we are interested in + continue + } + + if r.EndPos != 0 && c.StartPos > r.EndPos { + // skip the chunk if it starts after the range we are interested in + continue + } + + chunk := make([]byte, c.UncompressedByteSize) + var file = path.Join(r.Folder, c.FileName) + + if printChunks { + log.Printf("Loading chunk %d %s with size %d", i, c.FileName, c.UncompressedByteSize) + } + + if chunk, err = loadChunkIntoBuffer(file, r.Key, c.UncompressedByteSize, chunk); err != nil { + log.Panicf("Failed to load chunk %s", c.FileName) + } + + info.ChunkPos = c.StartPos + + chunkPos := 0 + if r.StartPos != 0 && r.StartPos > c.StartPos { + // reader starts in the middle + chunkPos = int(r.StartPos - c.StartPos) + } + + if err = replayChunk(info, chunk, op, chunkPos); err != nil { + return errors.Wrap(err, "Failed to read chunk") + } + } + } + + if loadBuffer && b != nil && b.Pos > 0 { + + if r.EndPos != 0 && b.StartPos > r.EndPos { + // if buffer starts after the end of our search interval - skip it + return nil + } + + loc := path.Join(r.Folder, b.FileName) + + var f *os.File + + if f, err = os.Open(loc); err != nil { + log.Panicf("Failed to open buffer file %s", loc) + } + + curChunk := make([]byte, b.Pos) + + var n int + if n, err = f.Read(curChunk); err != nil { + log.Panicf("Failed to read %d bytes from buffer %s", b.Pos, loc) + } + if n != int(b.Pos) { + log.Panic("Failed to read bytes") + } + + info.ChunkPos = b.StartPos + + chunkPos := 0 + + if r.StartPos > b.StartPos { + chunkPos = int(r.StartPos - b.StartPos) + } + + if err = replayChunk(info, curChunk, op, chunkPos); err != nil { + return errors.Wrap(err, "Failed to read chunk") + } + + } + + return nil + +} + +func readVarint(b []byte) (val int64, n int) { + + val, n = binary.Varint(b) + if n <= 0 { + log.Panicf("Failed to read varint %d", n) + } + + return + +} + +func replayChunk(info *ReaderInfo, chunk []byte, op ReadOp, pos int) error { + + max := len(chunk) + + var err error + + // while we are not at the end, + // read first len + // then pass the bytes to the op + for pos < max { + + info.StartPos = int64(pos) + info.ChunkPos + + recordSize, shift := readVarint(chunk[pos:]) + + // move position by the header size + pos += shift + + // get chunk + record := chunk[pos : pos+int(recordSize)] + // apply chunk + + pos += int(recordSize) + + info.NextPos = int64(pos) + info.ChunkPos + + if err = op(info, record); err != nil { + return errors.Wrap(err, "Failed to execute op") + } + // shift pos + + } + return nil + +} + +func getMaxByteSize(cs []*ChunkDto, b *BufferDto) int64 { + + var bufferSize int64 + + for _, c := range cs { + if c.UncompressedByteSize > bufferSize { + bufferSize = c.UncompressedByteSize + } + } + + if b != nil && b.MaxBytes > bufferSize { + bufferSize = b.MaxBytes + } + return bufferSize +} + +func loadChunkIntoBuffer(loc string, key []byte, size int64, b []byte) ([]byte, error) { + + var decryptor io.Reader + var err error + + var chunkFile *os.File + if chunkFile, err = os.Open(loc); err != nil { + log.Panicf("Failed to open chunk %s", loc) + } + + defer chunkFile.Close() + + if decryptor, err = chainDecryptor(key, chunkFile); err != nil { + log.Panicf("Failed to chain decryptor for %s: %s", loc, err) + } + + zr := lz4.NewReader(decryptor) + zr.Header.HighCompression = true + var readBytes int + if readBytes, err = zr.Read(b); err != nil { + log.Panicf("Failed to read from chunk %s (%d): %s", loc, size, err) + } + + if int64(readBytes) != size { + log.Panicf("Read %d bytes but expected %d", readBytes, size) + } + return b[0:readBytes], nil + +} diff --git a/reader_async.go b/reader_async.go new file mode 100644 index 0000000..9e9b465 --- /dev/null +++ b/reader_async.go @@ -0,0 +1,32 @@ +package cellar + +import ( + "log" +) + +type Rec struct { + Data []byte + ChunkPos int64 + StartPos int64 + NextPos int64 +} + +func (reader *Reader) ScanAsync(buffer int) chan *Rec { + + vals := make(chan *Rec, buffer) + + go func() { + err := reader.Scan(func(ri *ReaderInfo, data []byte) error { + vals <- &Rec{data, ri.ChunkPos, ri.StartPos, ri.NextPos} + return nil + }) + + if err != nil { + log.Panic(err) + } + + close(vals) + }() + + return vals +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..759c0f0 --- /dev/null +++ b/util.go @@ -0,0 +1,29 @@ +package cellar + +import ( + "os" + + "github.com/pkg/errors" +) + +func ensureFolder(folder string) (err error) { + + var stat os.FileInfo + if stat, err = os.Stat(folder); err == nil { + if stat.IsDir() { + return nil + } + return errors.Errorf("Path is a file: %s", folder) + } + + if os.IsNotExist(err) { + // file does not exist - create + if err = os.MkdirAll(folder, 0644); err != nil { + return errors.Wrap(err, "MkdirAll") + } + return nil + + } + return errors.Wrap(err, "os.Stat") + +} diff --git a/writer.go b/writer.go new file mode 100644 index 0000000..bf406f5 --- /dev/null +++ b/writer.go @@ -0,0 +1,238 @@ +package cellar + +import ( + "encoding/binary" + fmt "fmt" + "log" + "os" + "path" + + "github.com/abdullin/mdb" + "github.com/pkg/errors" +) + +type Writer struct { + db *mdb.DB + b *Buffer + maxKeySize int64 + maxValSize int64 + folder string + maxBufferSize int64 + key []byte + encodingBuf []byte +} + +func NewWriter(folder string, maxBufferSize int64, key []byte) (*Writer, error) { + ensureFolder(folder) + + var db *mdb.DB + var err error + + cfg := mdb.NewConfig() + // make sure we are writing sync + cfg.EnvFlags = 0 + + if db, err = mdb.New(folder, cfg); err != nil { + return nil, errors.Wrap(err, "mdb.New") + } + + var meta *MetaDto + var b *Buffer + + err = db.Update(func(tx *mdb.Tx) error { + var err error + + var dto *BufferDto + if dto, err = lmdbGetBuffer(tx); err != nil { + return errors.Wrap(err, "lmdbGetBuffer") + } + + if dto == nil { + if b, err = createBuffer(tx, 0, maxBufferSize, folder); err != nil { + return errors.Wrap(err, "SetNewBuffer") + } + return nil + + } else if b, err = openBuffer(dto, folder); err != nil { + return errors.Wrap(err, "openBuffer") + } + + if meta, err = lmdbGetCellarMeta(tx); err != nil { + return errors.Wrap(err, "lmdbGetCellarMeta") + } + return nil + }) + + if err != nil { + return nil, errors.Wrap(err, "Update") + } + + wr := &Writer{ + folder: folder, + maxBufferSize: maxBufferSize, + key: key, + encodingBuf: make([]byte, binary.MaxVarintLen64), + db: db, + b: b, + } + + if meta != nil { + wr.maxKeySize = meta.MaxKeySize + wr.maxValSize = meta.MaxValSize + } + + return wr, nil + +} + +func (w *Writer) VolatilePos() int64 { + if w.b != nil { + return w.b.startPos + w.b.pos + } + return 0 +} + +func (w *Writer) Append(data []byte) (pos int64, err error) { + + dataLen := int64(len(data)) + n := binary.PutVarint(w.encodingBuf, dataLen) + + totalSize := n + len(data) + + if !w.b.fits(int64(totalSize)) { + if err = w.SealTheBuffer(); err != nil { + return 0, errors.Wrap(err, "SealTheBuffer") + } + } + + if err = w.b.writeBytes(w.encodingBuf[0:n]); err != nil { + return 0, errors.Wrap(err, "write len prefix") + } + if err = w.b.writeBytes(data); err != nil { + return 0, errors.Wrap(err, "write body") + } + + w.b.endRecord() + + // update statistics + if dataLen > w.maxValSize { + w.maxValSize = dataLen + } + + pos = w.b.startPos + w.b.pos + + return pos, nil +} + +func createBuffer(tx *mdb.Tx, startPos int64, maxSize int64, folder string) (*Buffer, error) { + name := fmt.Sprintf("%012d", startPos) + dto := &BufferDto{ + Pos: 0, + StartPos: startPos, + MaxBytes: maxSize, + Records: 0, + FileName: name, + } + var err error + var buf *Buffer + + if buf, err = openBuffer(dto, folder); err != nil { + return nil, errors.Wrapf(err, "openBuffer %s", folder) + } + + if err = lmdbPutBuffer(tx, dto); err != nil { + return nil, errors.Wrap(err, "lmdbPutBuffer") + } + return buf, nil + +} + +func (w *Writer) SealTheBuffer() error { + + var err error + + oldBuffer := w.b + var newBuffer *Buffer + + if err = oldBuffer.flush(); err != nil { + return errors.Wrap(err, "buffer.Flush") + } + + var dto *ChunkDto + + if dto, err = oldBuffer.compress(w.key); err != nil { + return errors.Wrap(err, "compress") + } + + newStartPos := dto.StartPos + dto.UncompressedByteSize + + err = w.db.Update(func(tx *mdb.Tx) error { + + if err = lmdbAddChunk(tx, dto.StartPos, dto); err != nil { + return errors.Wrap(err, "lmdbAddChunk") + } + + if newBuffer, err = createBuffer(tx, newStartPos, w.maxBufferSize, w.folder); err != nil { + return errors.Wrap(err, "createBuffer") + } + return nil + + }) + + if err != nil { + return errors.Wrap(err, "w.db.Update") + } + + w.b = newBuffer + + oldBufferPath := path.Join(w.folder, oldBuffer.fileName) + + if err = os.Remove(oldBufferPath); err != nil { + log.Printf("Can't remove old buffer %s: %s", oldBufferPath, err) + } + return nil + +} + +// Close disposes all resources +func (w *Writer) Close() error { + + // TODO: flush, checkpoint and close current buffer + return w.db.Close() +} + +func (w *Writer) Checkpoint() (int64, error) { + w.b.flush() + + var err error + + dto := w.b.getState() + + current := dto.StartPos + dto.Pos + + err = w.db.Update(func(tx *mdb.Tx) error { + var err error + + if err = lmdbPutBuffer(tx, dto); err != nil { + return errors.Wrap(err, "lmdbPutBuffer") + } + + meta := &MetaDto{ + MaxKeySize: w.maxKeySize, + MaxValSize: w.maxValSize, + } + + if err = lmdbSetCellarMeta(tx, meta); err != nil { + return errors.Wrap(err, "lmdbSetCellarMeta") + } + return nil + + }) + + if err != nil { + return 0, errors.Wrap(err, "txn.Update") + } + + return current, nil + +}