Skip to content

Commit

Permalink
reorg config
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 15, 2024
1 parent 766dc86 commit 9e941db
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 14 deletions.
12 changes: 11 additions & 1 deletion cmd/rainbow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ func run(args []string) {
EnvVars: []string{"SPLITTER_PERSIST_HOURS"},
Usage: "hours to buffer (float, may be fractional)",
},
&cli.Int64Flag{
Name: "persist-bytes",
Value: 1_000_000_000,
Usage: "max bytes target for event cache",
EnvVars: []string{"SPLITTER_PERSIST_BYTES"},
},
}

app.Action = Splitter
Expand Down Expand Up @@ -127,7 +133,11 @@ func Splitter(cctx *cli.Context) error {
var err error
if persistPath != "" {
log.Infof("building splitter with storage at: %s", persistPath)
spl, err = splitter.NewDiskSplitter(upstreamHost, persistPath, cctx.Float64("persist-hours"))
spl, err = splitter.NewDiskSplitter(
upstreamHost,
persistPath,
cctx.Float64("persist-hours"),
cctx.Int64("persist-bytes"))
if err != nil {
log.Fatalw("failed to create splitter", "path", persistPath, "error", err)
return err
Expand Down
52 changes: 43 additions & 9 deletions events/pebblepersist.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"time"

"github.com/bluesky-social/indigo/models"
"github.com/cockroachdb/pebble"
"time"
)

type PebblePersist struct {
Expand All @@ -18,15 +20,41 @@ type PebblePersist struct {
prevSeqExtra uint32

cancel func()

options PebblePersistOptions
}

func NewPebblePersistance(path string) (*PebblePersist, error) {
type PebblePersistOptions struct {
// Throw away posts older than some time ago
PersistDuration time.Duration

// Throw away old posts every so often
GCPeriod time.Duration

// MaxBytes is what we _try_ to keep disk usage under
MaxBytes uint64
}

var DefaultPebblePersistOptions = PebblePersistOptions{
PersistDuration: time.Minute * 20,
GCPeriod: time.Minute * 5,
MaxBytes: 1024 * 1024 * 1024, // 1 GiB
}

// Create a new EventPersistence which stores data in pebbledb
// nil opts is ok
func NewPebblePersistance(path string, opts *PebblePersistOptions) (*PebblePersist, error) {
db, err := pebble.Open(path, &pebble.Options{})
if err != nil {
return nil, fmt.Errorf("%s: %w", path, err)
}
pp := new(PebblePersist)
pp.db = db
if opts == nil {
pp.options = DefaultPebblePersistOptions
} else {
pp.options = *opts
}
return pp, nil
}

Expand Down Expand Up @@ -150,15 +178,15 @@ func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *X
// pp := NewPebblePersistance("/tmp/foo.pebble")
// go pp.GCThread(context.Background(), 48 * time.Hour, 5 * time.Minute)
// ```
func (pp *PebblePersist) GCThread(ctx context.Context, retention, gcPeriod time.Duration) {
func (pp *PebblePersist) GCThread(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
pp.cancel = cancel
ticker := time.NewTicker(gcPeriod)
ticker := time.NewTicker(pp.options.GCPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := pp.GarbageCollect(ctx, retention)
err := pp.GarbageCollect(ctx)
if err != nil {
log.Errorw("GC err", "err", err)
}
Expand All @@ -169,14 +197,18 @@ func (pp *PebblePersist) GCThread(ctx context.Context, retention, gcPeriod time.
}

var zeroKey [16]byte
var ffffKey [16]byte

func init() {
setKeySeqMillis(zeroKey[:], 0, 0)
for i := range ffffKey {
ffffKey[i] = 0xff
}
}

func (pp *PebblePersist) GarbageCollect(ctx context.Context, retention time.Duration) error {
func (pp *PebblePersist) GarbageCollect(ctx context.Context) error {
nowMillis := time.Now().UnixMilli()
expired := nowMillis - retention.Milliseconds()
expired := nowMillis - pp.options.PersistDuration.Milliseconds()
iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{})
if err != nil {
return err
Expand All @@ -196,18 +228,20 @@ func (pp *PebblePersist) GarbageCollect(ctx context.Context, retention time.Dura
break
}
}
sizeBefore, _ := pp.db.EstimateDiskUsage(nil, nil)
sizeBefore, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:])
if seq == -1 {
// nothing to delete
log.Infow("pebble gc nop", "size", sizeBefore)
return nil
}
var key [16]byte
setKeySeqMillis(key[:], seq, lastKeyTime)
log.Infow("pebble gc start", "to", hex.EncodeToString(key[:]))
err = pp.db.DeleteRange(zeroKey[:], key[:], pebble.Sync)
if err != nil {
return err
}
sizeAfter, _ := pp.db.EstimateDiskUsage(nil, nil)
sizeAfter, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:])
log.Infow("pebble gc", "before", sizeBefore, "after", sizeAfter)
return nil
}
2 changes: 1 addition & 1 deletion events/pebblepersist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func TestPebblePersist(t *testing.T) {
factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) {
return NewPebblePersistance(filepath.Join(tempPath, "pebble.db"))
return NewPebblePersistance(filepath.Join(tempPath, "pebble.db"), nil)
}
testPersister(t, factory)
}
11 changes: 8 additions & 3 deletions splitter/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ func NewMemSplitter(host string) *Splitter {
consumers: make(map[uint64]*SocketConsumer),
}
}
func NewDiskSplitter(host, path string, persistHours float64) (*Splitter, error) {
pp, err := events.NewPebblePersistance(path)
func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) {
ppopts := events.PebblePersistOptions{
PersistDuration: time.Duration(float64(time.Hour) * persistHours),
GCPeriod: 5 * time.Minute,
MaxBytes: uint64(maxBytes),
}
pp, err := events.NewPebblePersistance(path, &ppopts)
if err != nil {
return nil, err
}

go pp.GCThread(context.Background(), time.Duration(float64(time.Hour)*persistHours), 5*time.Minute)
go pp.GCThread(context.Background())
em := events.NewEventManager(pp)
return &Splitter{
cursorFile: "cursor-file",
Expand Down

0 comments on commit 9e941db

Please sign in to comment.