Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic relay splitter daemon #807

Merged
merged 7 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions cmd/rainbow/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package main

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/bluesky-social/indigo/splitter"
"github.com/carlmjohnson/versioninfo"
_ "go.uber.org/automaxprocs"

_ "net/http/pprof"

_ "github.com/joho/godotenv/autoload"

logging "github.com/ipfs/go-log"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

var log = logging.Logger("splitter")

func init() {
// control log level using, eg, GOLOG_LOG_LEVEL=debug
logging.SetAllLoggers(logging.LevelDebug)
}

func main() {
run(os.Args)
}

func run(args []string) {
app := cli.App{
Name: "splitter",
Usage: "firehose proxy",
Version: versioninfo.Short(),
}

app.Flags = []cli.Flag{
&cli.BoolFlag{
Name: "crawl-insecure-ws",
Usage: "when connecting to PDS instances, use ws:// instead of wss://",
},
&cli.StringFlag{
Name: "splitter-host",
Value: "bsky.network",
},
&cli.StringFlag{
Name: "api-listen",
Value: ":2480",
},
&cli.StringFlag{
Name: "metrics-listen",
Value: ":2481",
EnvVars: []string{"SPLITTER_METRICS_LISTEN"},
},
}

app.Action = Splitter
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}

func Splitter(cctx *cli.Context) error {
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

// Enable OTLP HTTP exporter
// For relevant environment variables:
// https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables
// At a minimum, you need to set
// OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" {
log.Infow("setting up trace exporter", "endpoint", ep)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exp, err := otlptracehttp.New(ctx)
if err != nil {
log.Fatalw("failed to create trace exporter", "error", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
log.Errorw("failed to shutdown trace exporter", "error", err)
}
}()

tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("splitter"),
attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog
attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others
attribute.Int64("ID", 1),
)),
)
otel.SetTracerProvider(tp)
}

spl := splitter.NewSplitter(cctx.String("splitter-host"))

// set up metrics endpoint
go func() {
if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil {
log.Fatalf("failed to start metrics endpoint: %s", err)
}
}()

runErr := make(chan error, 1)

go func() {
err := spl.Start(cctx.String("api-listen"))
runErr <- err
}()

log.Infow("startup complete")
select {
case <-signals:
log.Info("received shutdown signal")
if err := spl.Shutdown(); err != nil {
log.Errorw("error during Splitter shutdown", "err", err)
}
case err := <-runErr:
if err != nil {
log.Errorw("error during Splitter startup", "err", err)
}
log.Info("shutting down")
if err := spl.Shutdown(); err != nil {
log.Errorw("error during Splitter shutdown", "err", err)
}
}

log.Info("shutdown complete")

return nil
}
8 changes: 4 additions & 4 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
case <-done:
return ErrPlaybackShutdown
case out <- e:
seq := sequenceForEvent(e)
seq := SequenceForEvent(e)
if seq > 0 {
lastSeq = seq
}
Expand All @@ -315,8 +315,8 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func

// run playback again to get us to the events that have started buffering
if err := em.persister.Playback(ctx, lastSeq, func(e *XRPCStreamEvent) error {
seq := sequenceForEvent(e)
if seq > sequenceForEvent(first) {
seq := SequenceForEvent(e)
if seq > SequenceForEvent(first) {
return ErrCaughtUp
}

Expand Down Expand Up @@ -351,7 +351,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
return out, sub.cleanup, nil
}

func sequenceForEvent(evt *XRPCStreamEvent) int64 {
func SequenceForEvent(evt *XRPCStreamEvent) int64 {
switch {
case evt == nil:
return -1
Expand Down
11 changes: 11 additions & 0 deletions splitter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package splitter

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "spl_events_sent_counter",
Help: "The total number of events sent to consumers",
}, []string{"remote_addr", "user_agent"})
144 changes: 144 additions & 0 deletions splitter/ringbuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package splitter

import (
"context"
"sync"

events "github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/models"
)

func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer {
return &EventRingBuffer{
chunkSize: chunkSize,
maxChunkCount: nchunks,
}
}

type EventRingBuffer struct {
lk sync.Mutex
chunks []*ringChunk
chunkSize int
maxChunkCount int

broadcast func(*events.XRPCStreamEvent)
}

type ringChunk struct {
lk sync.Mutex
buf []*events.XRPCStreamEvent
}

func (rc *ringChunk) append(evt *events.XRPCStreamEvent) {
rc.lk.Lock()
defer rc.lk.Unlock()
rc.buf = append(rc.buf, evt)
}

func (rc *ringChunk) events() []*events.XRPCStreamEvent {
rc.lk.Lock()
defer rc.lk.Unlock()
return rc.buf
}

func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error {
er.lk.Lock()
defer er.lk.Unlock()

if len(er.chunks) == 0 {
er.chunks = []*ringChunk{new(ringChunk)}
}

last := er.chunks[len(er.chunks)-1]
if len(last.buf) >= er.chunkSize {
last = new(ringChunk)
er.chunks = append(er.chunks, last)
if len(er.chunks) > er.maxChunkCount {
er.chunks = er.chunks[1:]
}
}

last.append(evt)

er.broadcast(evt)
return nil
}

func (er *EventRingBuffer) Flush(context.Context) error {
return nil
}

func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
// run playback a few times to get as close to 'live' as possible before returning
for i := 0; i < 10; i++ {
n, err := er.playbackRound(ctx, since, cb)
if err != nil {
return err
}

// playback had no new events
if n-since == 0 {
return nil
}
since = n
}

return nil
}

func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) (int64, error) {
// grab a snapshot of the current chunks
er.lk.Lock()
chunks := er.chunks
er.lk.Unlock()

i := len(chunks) - 1
for ; i >= 0; i-- {
c := chunks[i]
evts := c.events()
if since > events.SequenceForEvent(evts[len(evts)-1]) {
i++
break
}
}
if i < 0 {
i = 0
}

var lastSeq int64 = since
for _, c := range chunks[i:] {
var nread int
evts := c.events()
for nread < len(evts) {
for _, e := range evts[nread:] {
nread++
seq := events.SequenceForEvent(e)
if seq <= since {
continue
}

if err := cb(e); err != nil {
return 0, err
}
lastSeq = seq
}

// recheck evts buffer to see if more were added while we were here
evts = c.events()
}
}

return lastSeq, nil
}

func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
er.broadcast = brc
}

func (er *EventRingBuffer) Shutdown(context.Context) error {
return nil
}

func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error {
return nil
}
Loading
Loading