Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay splitter daemon + pebble persistence #810

Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c39f17c
wip, start pebble persister
brianolson Nov 14, 2024
61c836c
pebble persister
brianolson Nov 14, 2024
44ce34e
fix events tests
brianolson Nov 14, 2024
3f77d63
GC thread with retention and period
brianolson Nov 14, 2024
81a9ce8
connect rainbow to pebble persister
brianolson Nov 14, 2024
31c5697
Merge remote-tracking branch 'origin/feat/splitter-rebase' into feat/…
brianolson Nov 15, 2024
a71ce5a
delete dead code
brianolson Nov 15, 2024
096fc7c
PR feedback, use pebble DeleteRange
brianolson Nov 15, 2024
e8c9d2e
identity: default dir with 100 max idle conns, and 1sec idle
bnewbold Nov 15, 2024
ed0a5c6
identity: drop default HTTP timeout from 15s to 10s
bnewbold Nov 15, 2024
d88346a
identity: drop default DNS timeout from 5s to 3s
bnewbold Nov 15, 2024
5dc06c0
add gauge spl_active_clients
brianolson Nov 15, 2024
586cc75
--persist-hours
brianolson Nov 15, 2024
43c1d8e
fix err log
brianolson Nov 15, 2024
f5a301b
fix broadcast
brianolson Nov 15, 2024
0fc4c7e
identity package default tweaks (#811)
bnewbold Nov 15, 2024
9780691
last seq from pebble
brianolson Nov 15, 2024
766dc86
log gc sizes
brianolson Nov 15, 2024
1b7e54e
allow carstore to use multiple directories, round robin style
whyrusleeping Nov 15, 2024
fd6ae47
fixup build
whyrusleeping Nov 15, 2024
ffe7fb6
more test fixups
whyrusleeping Nov 15, 2024
9e941db
reorg config
brianolson Nov 15, 2024
be9e655
Merge remote-tracking branch 'origin/feat/splitter-rebase' into feat/…
brianolson Nov 16, 2024
05b8751
allow carstore to use multiple directories, round robin style (#812)
whyrusleeping Nov 16, 2024
0fb485d
add a user cache on the bgs
whyrusleeping Nov 16, 2024
7057577
fixup refactor
whyrusleeping Nov 16, 2024
4a15b38
fix lint
whyrusleeping Nov 16, 2024
a422903
add a user cache on the bgs (#816)
whyrusleeping Nov 16, 2024
75d29f8
add compaction to gc
brianolson Nov 17, 2024
30a2725
Add more metrics to the relay
ericvolp12 Nov 17, 2024
bd727d1
dockerize
brianolson Nov 17, 2024
15004ab
whoops
ericvolp12 Nov 17, 2024
3a4136f
Rename series
ericvolp12 Nov 17, 2024
1eeb496
Track disk vs meta write durations
ericvolp12 Nov 17, 2024
be57b0d
Add more metrics to the relay (#817)
ericvolp12 Nov 17, 2024
bf3ffd5
reorg config, make --cursor-file
brianolson Nov 17, 2024
93497b7
no-cursor subscribe
brianolson Nov 18, 2024
83811b9
Merge remote-tracking branch 'origin/main' into feat/splitter-rebase
brianolson Nov 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion cmd/rainbow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func run(args []string) {
Name: "splitter-host",
Value: "bsky.network",
},
&cli.StringFlag{
Name: "persist-db",
Value: "",
Usage: "path to persistence db",
},
&cli.StringFlag{
Name: "api-listen",
Value: ":2480",
Expand Down Expand Up @@ -110,7 +115,21 @@ func Splitter(cctx *cli.Context) error {
otel.SetTracerProvider(tp)
}

spl := splitter.NewSplitter(cctx.String("splitter-host"))
persistPath := cctx.String("persist-db")
upstreamHost := cctx.String("splitter-host")
var spl *splitter.Splitter
var err error
if persistPath != "" {
log.Infof("building splitter with storage at: %s", persistPath)
spl, err = splitter.NewDiskSplitter(upstreamHost, persistPath)
if err != nil {
log.Fatalw("failed to create splitter", "path", persistPath, "error", err)
return err
}
} else {
log.Info("building in-memory splitter")
spl = splitter.NewMemSplitter(upstreamHost)
}

// set up metrics endpoint
go func() {
Expand Down
29 changes: 14 additions & 15 deletions events/dbpersist_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events_test
package events

import (
"context"
Expand All @@ -11,19 +11,18 @@ import (
atproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/carstore"
"github.com/bluesky-social/indigo/events"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/models"
"github.com/bluesky-social/indigo/pds"
pds "github.com/bluesky-social/indigo/pds/data"
"github.com/bluesky-social/indigo/repomgr"
"github.com/bluesky-social/indigo/util"
"github.com/ipfs/go-log/v2"
logging "github.com/ipfs/go-log/v2"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)

func init() {
log.SetAllLoggers(log.LevelDebug)
logging.SetAllLoggers(logging.LevelDebug)
}

func BenchmarkDBPersist(b *testing.B) {
Expand Down Expand Up @@ -61,24 +60,24 @@ func BenchmarkDBPersist(b *testing.B) {
defer os.RemoveAll(tempPath)

// Initialize a DBPersister
dbp, err := events.NewDbPersistence(db, cs, nil)
dbp, err := NewDbPersistence(db, cs, nil)
if err != nil {
b.Fatal(err)
}

// Create a bunch of events
evtman := events.NewEventManager(dbp)
evtman := NewEventManager(dbp)

userRepoHead, err := mgr.GetRepoRoot(ctx, 1)
if err != nil {
b.Fatal(err)
}

inEvts := make([]*events.XRPCStreamEvent, b.N)
inEvts := make([]*XRPCStreamEvent, b.N)
for i := 0; i < b.N; i++ {
cidLink := lexutil.LexLink(cid)
headLink := lexutil.LexLink(userRepoHead)
inEvts[i] = &events.XRPCStreamEvent{
inEvts[i] = &XRPCStreamEvent{
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
Repo: "did:example:123",
Commit: headLink,
Expand Down Expand Up @@ -136,7 +135,7 @@ func BenchmarkDBPersist(b *testing.B) {

b.StopTimer()

dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error {
outEvtCount++
return nil
})
Expand Down Expand Up @@ -183,24 +182,24 @@ func BenchmarkPlayback(b *testing.B) {
defer os.RemoveAll(tempPath)

// Initialize a DBPersister
dbp, err := events.NewDbPersistence(db, cs, nil)
dbp, err := NewDbPersistence(db, cs, nil)
if err != nil {
b.Fatal(err)
}

// Create a bunch of events
evtman := events.NewEventManager(dbp)
evtman := NewEventManager(dbp)

userRepoHead, err := mgr.GetRepoRoot(ctx, 1)
if err != nil {
b.Fatal(err)
}

inEvts := make([]*events.XRPCStreamEvent, n)
inEvts := make([]*XRPCStreamEvent, n)
for i := 0; i < n; i++ {
cidLink := lexutil.LexLink(cid)
headLink := lexutil.LexLink(userRepoHead)
inEvts[i] = &events.XRPCStreamEvent{
inEvts[i] = &XRPCStreamEvent{
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
Repo: "did:example:123",
Commit: headLink,
Expand Down Expand Up @@ -256,7 +255,7 @@ func BenchmarkPlayback(b *testing.B) {

b.ResetTimer()

dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error {
outEvtCount++
return nil
})
Expand Down
Loading
Loading