forked from abdullin/cellar
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdb.go
198 lines (159 loc) · 4.32 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package cellar
import (
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/boltdb/bolt"
"github.com/gofrs/flock"
"github.com/pkg/errors"
)
const lockfile = "cellar.lock"
// DB is a godlevel/convenience wrapper around Writer and Reader, ensuring only one writer exists per
// folder, and storing the cipher for faster performance.
type DB struct {
folder string
buffer int64
mu *sync.Mutex
writer *Writer
cipher Cipher
fileLock FileLock
compressor Compressor
decompressor Decompressor
meta MetaDB
readonly bool
logger *zap.Logger
}
// New is the constructor for DB
func New(folder string, options ...Option) (*DB, error) {
db := &DB{
folder: folder,
buffer: 100000,
mu: &sync.Mutex{},
readonly: false,
}
for _, opt := range options {
err := opt(db)
if err != nil {
return nil, err
}
}
if db.logger == nil {
db.logger = defaultLogger()
db.logger.Info("using default logger")
}
// checking for nil allows us to create an options which supersede these routines.
if db.fileLock == nil {
db.logger.Info("creating file lock")
// Create the lockile
file := flock.New(fmt.Sprintf("%s/%s", folder, lockfile))
locked, err := file.TryLock()
if err != nil {
return nil, err
}
if !locked {
return nil, errors.New("cellar: unable to acquire filelock")
}
db.fileLock = file
}
//TODO create a mock cipher which does not decrypt and encrypt
if db.cipher == nil {
db.logger.Info("creating cipher")
db.cipher = NewAES(defaultEncryptionKey)
}
if db.compressor == nil {
db.logger.Info("creating ChainCompressor")
db.compressor = ChainCompressor{CompressionLevel: 10}
}
if db.decompressor == nil {
db.logger.Info("creating ChainDecompressor")
db.decompressor = ChainDecompressor{}
}
if db.meta == nil {
db.logger.Info("creating metadb", zap.String("IMPLEMENTATION", "BOLTDB"))
blt, err := bolt.Open(fmt.Sprintf("%s/%s", folder, "meta.bolt"), 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return nil, err
}
db.meta = &BoltMetaDB{DB: blt}
err = db.meta.Init()
if err != nil {
return nil, err
}
}
if db.writer == nil && !db.readonly {
db.logger.Info("creating new writer", zap.Bool("READONLY", false))
err := db.newWriter()
if err != nil {
return nil, err
}
}
return db, nil
}
// Write creates a writer using sync.Once, and then reuses the writer over procedures
func (db *DB) Append(data []byte) (pos int64, err error) {
db.mu.Lock()
defer db.mu.Unlock()
return db.writer.Append(data)
}
// Close ensures filelocks are cleared and resources closed. Readers derived from this DB instance will remain functional.
func (db *DB) Close() (err error) {
db.logger.Info("closing db")
db.mu.Lock()
defer db.mu.Unlock()
defer db.meta.Close()
err = db.fileLock.Unlock()
if err != nil {
return
}
return db.writer.Close()
}
// Checkpoint creates an anonymous checkpoint at the current cursor's location.
func (db *DB) Checkpoint() (pos int64, err error) {
db.mu.Lock()
defer db.mu.Unlock()
return db.writer.Checkpoint()
}
// SealTheBuffer explicitly flushes the old buffer and creates a new buffer
func (db *DB) Flush() (err error) {
db.logger.Info("flushing db")
db.mu.Lock()
defer db.mu.Unlock()
return db.writer.Flush()
}
// GetUserCheckpoint returns the position of a named checkpoint
func (db *DB) GetUserCheckpoint(name string) (pos int64, err error) {
return db.writer.GetUserCheckpoint(name)
}
// PutUserCheckpoint creates a named checkpoint at a given position.
func (db *DB) PutUserCheckpoint(name string, pos int64) (err error) {
db.mu.Lock()
defer db.mu.Unlock()
return db.writer.PutUserCheckpoint(name, pos)
}
// VolatilePos returns the current cursors location
func (db *DB) VolatilePos() int64 {
db.mu.Lock()
defer db.mu.Unlock()
return db.writer.VolatilePos()
}
// Reader returns a new db reader. The reader remains active even if the DB is closed
func (db *DB) Reader() *Reader {
return NewReader(db.folder, db.cipher, db.decompressor, db.meta, db.logger)
}
// Folder returns the DB folder
func (db *DB) Folder() string {
return db.folder
}
// Buffer returs the max buffer size of the DB
func (db *DB) Buffer() int64 {
return db.buffer
}
func (db *DB) newWriter() error {
w, err := NewWriter(db.folder, db.buffer, db.cipher, db.compressor, db.meta, db.logger)
if err != nil {
return err
}
db.writer = w
return nil
}