diff --git a/cmd/rainbow/main.go b/cmd/rainbow/main.go index 6f556ad01..47a7fe5bd 100644 --- a/cmd/rainbow/main.go +++ b/cmd/rainbow/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "github.com/bluesky-social/indigo/events" "os" "os/signal" "syscall" @@ -57,6 +58,11 @@ func run(args []string) { Value: "", Usage: "path to persistence db", }, + &cli.StringFlag{ + Name: "cursor-file", + Value: "", + Usage: "write upstream cursor number to this file", + }, &cli.StringFlag{ Name: "api-listen", Value: ":2480", @@ -133,18 +139,29 @@ func Splitter(cctx *cli.Context) error { var err error if persistPath != "" { log.Infof("building splitter with storage at: %s", persistPath) - spl, err = splitter.NewDiskSplitter( - upstreamHost, - persistPath, - cctx.Float64("persist-hours"), - cctx.Int64("persist-bytes")) - if err != nil { - log.Fatalw("failed to create splitter", "path", persistPath, "error", err) - return err + ppopts := events.PebblePersistOptions{ + DbPath: persistPath, + PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")), + GCPeriod: 5 * time.Minute, + MaxBytes: uint64(cctx.Int64("persist-bytes")), } + conf := splitter.SplitterConfig{ + UpstreamHost: upstreamHost, + CursorFile: cctx.String("cursor-file"), + PebbleOptions: &ppopts, + } + spl, err = splitter.NewSplitter(conf) } else { log.Info("building in-memory splitter") - spl = splitter.NewMemSplitter(upstreamHost) + conf := splitter.SplitterConfig{ + UpstreamHost: upstreamHost, + CursorFile: cctx.String("cursor-file"), + } + spl, err = splitter.NewSplitter(conf) + } + if err != nil { + log.Fatalw("failed to create splitter", "path", persistPath, "error", err) + return err } // set up metrics endpoint diff --git a/events/pebblepersist.go b/events/pebblepersist.go index 0dc8a0001..164208c4c 100644 --- a/events/pebblepersist.go +++ b/events/pebblepersist.go @@ -25,6 +25,9 @@ type PebblePersist struct { } type PebblePersistOptions struct { + // path where pebble will create a directory full of files + DbPath string + // Throw away posts older than some time ago PersistDuration time.Duration @@ -43,18 +46,17 @@ var DefaultPebblePersistOptions = PebblePersistOptions{ // Create a new EventPersistence which stores data in pebbledb // nil opts is ok -func NewPebblePersistance(path string, opts *PebblePersistOptions) (*PebblePersist, error) { - db, err := pebble.Open(path, &pebble.Options{}) +func NewPebblePersistance(opts *PebblePersistOptions) (*PebblePersist, error) { + if opts == nil { + opts = &DefaultPebblePersistOptions + } + db, err := pebble.Open(opts.DbPath, &pebble.Options{}) if err != nil { - return nil, fmt.Errorf("%s: %w", path, err) + return nil, fmt.Errorf("%s: %w", opts.DbPath, err) } pp := new(PebblePersist) + pp.options = *opts pp.db = db - if opts == nil { - pp.options = DefaultPebblePersistOptions - } else { - pp.options = *opts - } return pp, nil } diff --git a/events/pebblepersist_test.go b/events/pebblepersist_test.go index 6495ce24f..901365c5d 100644 --- a/events/pebblepersist_test.go +++ b/events/pebblepersist_test.go @@ -8,7 +8,9 @@ import ( func TestPebblePersist(t *testing.T) { factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { - return NewPebblePersistance(filepath.Join(tempPath, "pebble.db"), nil) + opts := DefaultPebblePersistOptions + opts.DbPath = filepath.Join(tempPath, "pebble.db") + return NewPebblePersistance(&opts) } testPersister(t, factory) } diff --git a/splitter/splitter.go b/splitter/splitter.go index ccbccbbc5..3cad7f793 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -28,39 +28,81 @@ import ( var log = logging.Logger("splitter") type Splitter struct { - Host string erb *EventRingBuffer pp *events.PebblePersist events *events.EventManager - // cursor storage - cursorFile string - // Management of Socket Consumers consumersLk sync.RWMutex nextConsumerID uint64 consumers map[uint64]*SocketConsumer + + conf SplitterConfig +} + +type SplitterConfig struct { + UpstreamHost string + CursorFile string + PebbleOptions *events.PebblePersistOptions } func NewMemSplitter(host string) *Splitter { + conf := SplitterConfig{ + UpstreamHost: host, + CursorFile: "cursor-file", + } + erb := NewEventRingBuffer(20_000, 10_000) em := events.NewEventManager(erb) return &Splitter{ - cursorFile: "cursor-file", - Host: host, - erb: erb, - events: em, - consumers: make(map[uint64]*SocketConsumer), + conf: conf, + erb: erb, + events: em, + consumers: make(map[uint64]*SocketConsumer), + } +} +func NewSplitter(conf SplitterConfig) (*Splitter, error) { + if conf.PebbleOptions == nil { + // mem splitter + erb := NewEventRingBuffer(20_000, 10_000) + + em := events.NewEventManager(erb) + return &Splitter{ + conf: conf, + erb: erb, + events: em, + consumers: make(map[uint64]*SocketConsumer), + }, nil + } else { + pp, err := events.NewPebblePersistance(conf.PebbleOptions) + if err != nil { + return nil, err + } + + go pp.GCThread(context.Background()) + em := events.NewEventManager(pp) + return &Splitter{ + conf: conf, + pp: pp, + events: em, + consumers: make(map[uint64]*SocketConsumer), + }, nil } } func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { ppopts := events.PebblePersistOptions{ + DbPath: path, PersistDuration: time.Duration(float64(time.Hour) * persistHours), GCPeriod: 5 * time.Minute, MaxBytes: uint64(maxBytes), } - pp, err := events.NewPebblePersistance(path, &ppopts) + conf := SplitterConfig{ + UpstreamHost: host, + CursorFile: "cursor-file", + PebbleOptions: &ppopts, + } + pp, err := events.NewPebblePersistance(&ppopts) if err != nil { return nil, err } @@ -68,11 +110,10 @@ func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (* go pp.GCThread(context.Background()) em := events.NewEventManager(pp) return &Splitter{ - cursorFile: "cursor-file", - Host: host, - pp: pp, - events: em, - consumers: make(map[uint64]*SocketConsumer), + conf: conf, + pp: pp, + events: em, + consumers: make(map[uint64]*SocketConsumer), }, nil } @@ -86,7 +127,7 @@ func (s *Splitter) Start(addr string) error { return fmt.Errorf("loading cursor failed: %w", err) } - go s.subscribeWithRedialer(context.Background(), s.Host, curs) + go s.subscribeWithRedialer(context.Background(), s.conf.UpstreamHost, curs) li, err := lc.Listen(ctx, "tcp", addr) if err != nil { @@ -442,7 +483,7 @@ func (s *Splitter) getLastCursor() (int64, error) { } } - fi, err := os.Open(s.cursorFile) + fi, err := os.Open(s.conf.CursorFile) if err != nil { if os.IsNotExist(err) { return 0, nil @@ -464,5 +505,5 @@ func (s *Splitter) getLastCursor() (int64, error) { } func (s *Splitter) writeCursor(curs int64) error { - return os.WriteFile(s.cursorFile, []byte(fmt.Sprint(curs)), 0664) + return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) }