Skip to content

Commit

Permalink
core: import core cellar code
Browse files Browse the repository at this point in the history
  • Loading branch information
abdullin committed Dec 24, 2017
1 parent 0295279 commit de8c1a2
Show file tree
Hide file tree
Showing 8 changed files with 1,069 additions and 0 deletions.
176 changes: 176 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package cellar

import (
"bufio"
"crypto/cipher"
"io"
"log"
"os"
"path"

"github.com/pierrec/lz4"
"github.com/pkg/errors"
)

type Buffer struct {
fileName string
maxBytes int64
startPos int64

records int64
pos int64

writer *bufio.Writer
stream *os.File
}

func openBuffer(d *BufferDto, folder string) (*Buffer, error) {

if len(d.FileName) == 0 {
return nil, errors.New("empty filename")
}

fullPath := path.Join(folder, d.FileName)

f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, errors.Wrap(err, "Open file")
}
f.Truncate(int64(d.MaxBytes))
if _, err := f.Seek(int64(d.Pos), io.SeekStart); err != nil {
return nil, errors.Wrap(err, "Seek")
}

b := &Buffer{
fileName: d.FileName,
startPos: d.StartPos,
maxBytes: d.MaxBytes,
pos: d.Pos,
records: d.Records,
stream: f,
writer: bufio.NewWriter(f),
}
return b, nil
}

func (b *Buffer) getState() *BufferDto {
return &BufferDto{
FileName: b.fileName,
MaxBytes: b.maxBytes,
StartPos: b.startPos,
Pos: b.pos,
Records: b.records,
}
}

func (b *Buffer) fits(bytes int64) bool {
return (b.pos + bytes) <= b.maxBytes
}

func (b *Buffer) writeBytes(bs []byte) error {
if _, err := b.writer.Write(bs); err != nil {
return errors.Wrap(err, "Write")
}
b.pos += int64(len(bs))
return nil
}

func (b *Buffer) endRecord() {
b.records++
}

func (b *Buffer) flush() error {
if err := b.writer.Flush(); err != nil {
return errors.Wrap(err, "Flush")
}
return nil
}

func (b *Buffer) close() error {
if b.stream == nil {
return nil
}
var err error
if err = b.stream.Close(); err != nil {
return errors.Wrap(err, "stream.Close")
}
b.stream = nil
return nil
}

func (b *Buffer) compress(key []byte) (dto *ChunkDto, err error) {

loc := b.stream.Name() + ".lz4"

if err = b.writer.Flush(); err != nil {
log.Panicf("Failed to flush buffer: %s", err)
}
if err = b.stream.Sync(); err != nil {
log.Panicf("Failed to Fsync buffer: %s", err)
}

if _, err = b.stream.Seek(0, io.SeekStart); err != nil {
log.Panicf("Failed to seek to 0 in buffer: %s", err)
}

// create chunk file
var chunkFile *os.File
if chunkFile, err = os.Create(loc); err != nil {
return nil, errors.Wrap(err, "os.Create")
}

defer func() {
if err := chunkFile.Sync(); err != nil {
panic("Failed to sync")
}
if err := chunkFile.Close(); err != nil {
panic("Failed to close")
}
}()

// buffer writes to file
buffer := bufio.NewWriter(chunkFile)

defer buffer.Flush()

// encrypt before buffering
var encryptor *cipher.StreamWriter
if encryptor, err = chainEncryptor(key, buffer); err != nil {
log.Panicf("Failed to chain encryptor for %s: %s", loc, err)
}

defer encryptor.Close()

// compress before encrypting
zw := lz4.NewWriter(encryptor)
zw.Header.HighCompression = true

defer func() {
if err := zw.Close(); err != nil {
panic("Failed to close zw")
}
}()

// copy chunk to the chain
if _, err = io.CopyN(zw, b.stream, b.pos); err != nil {
return nil, errors.Wrap(err, "CopyN")
}

zw.Flush()
chunkFile.Sync()
b.close()

var size int64
if size, err = chunkFile.Seek(0, io.SeekEnd); err != nil {
return nil, errors.Wrap(err, "Seek")
}

dto = &ChunkDto{
FileName: b.fileName + ".lz4",
Records: b.records,
UncompressedByteSize: b.pos,
StartPos: b.startPos,
CompressedDiskSize: size,
}
return dto, nil
}
62 changes: 62 additions & 0 deletions chains.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cellar

import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"io"
"log"

"github.com/pierrec/lz4"
"github.com/pkg/errors"
)

func chainCompressor(w io.Writer) (*lz4.Writer, error) {
zw := lz4.NewWriter(w)
zw.Header.HighCompression = true
return zw, nil
}

func chainDecryptor(key []byte, src io.Reader) (io.Reader, error) {
var (
block cipher.Block
err error
)
if block, err = aes.NewCipher(key); err != nil {
log.Panic("Failed to create a new cipher from the key")
}

iv := make([]byte, aes.BlockSize)

if _, err = src.Read(iv); err != nil {
return nil, errors.Wrap(err, "Failed to read IV")
}

stream := cipher.NewCFBDecrypter(block, iv)
reader := &cipher.StreamReader{R: src, S: stream}
return reader, nil
}

func chainEncryptor(key []byte, w io.Writer) (*cipher.StreamWriter, error) {

var (
block cipher.Block
err error
)
if block, err = aes.NewCipher(key); err != nil {
log.Panic("Failed to create a new cipher from the key")
}

iv := make([]byte, aes.BlockSize)
if _, err = io.ReadFull(rand.Reader, iv); err != nil {
panic(err)
}

if _, err = w.Write(iv); err != nil {
return nil, errors.Wrap(err, "Write")
}
stream := cipher.NewCFBEncrypter(block, iv)

writer := &cipher.StreamWriter{S: stream, W: w}
return writer, nil
}
95 changes: 95 additions & 0 deletions dto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit de8c1a2

Please sign in to comment.