Skip to content

Commit

Permalink
fix edge cases around playback-to-live transition
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Nov 14, 2024
1 parent 8fcce34 commit a1875ea
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 111 deletions.
149 changes: 149 additions & 0 deletions cmd/rainbow/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package main

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/bluesky-social/indigo/splitter"
"github.com/bluesky-social/indigo/util/version"
_ "go.uber.org/automaxprocs"

_ "net/http/pprof"

_ "github.com/joho/godotenv/autoload"

logging "github.com/ipfs/go-log"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

var log = logging.Logger("splitter")

func init() {
// control log level using, eg, GOLOG_LOG_LEVEL=debug
logging.SetAllLoggers(logging.LevelDebug)
}

func main() {
run(os.Args)
}

func run(args []string) {
app := cli.App{
Name: "splitter",
Usage: "firehose proxy",
Version: version.Version,
}

app.Flags = []cli.Flag{
&cli.BoolFlag{
Name: "crawl-insecure-ws",
Usage: "when connecting to PDS instances, use ws:// instead of wss://",
},
&cli.StringFlag{
Name: "splitter-host",
Value: "bsky.network",
},
&cli.StringFlag{
Name: "api-listen",
Value: ":2480",
},
&cli.StringFlag{
Name: "metrics-listen",
Value: ":2481",
EnvVars: []string{"SPLITTER_METRICS_LISTEN"},
},
}

app.Action = Splitter
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}

func Splitter(cctx *cli.Context) error {
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

// Enable OTLP HTTP exporter
// For relevant environment variables:
// https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables
// At a minimum, you need to set
// OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" {
log.Infow("setting up trace exporter", "endpoint", ep)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exp, err := otlptracehttp.New(ctx)
if err != nil {
log.Fatalw("failed to create trace exporter", "error", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
log.Errorw("failed to shutdown trace exporter", "error", err)
}
}()

tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("splitter"),
attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog
attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others
attribute.Int64("ID", 1),
)),
)
otel.SetTracerProvider(tp)
}

spl := splitter.NewSplitter(cctx.String("splitter-host"))

// set up metrics endpoint
go func() {
if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil {
log.Fatalf("failed to start metrics endpoint: %s", err)
}
}()

runErr := make(chan error, 1)

go func() {
err := spl.Start(cctx.String("api-listen"))
runErr <- err
}()

log.Infow("startup complete")
select {
case <-signals:
log.Info("received shutdown signal")
if err := spl.Shutdown(); err != nil {
log.Errorw("error during Splitter shutdown", "err", err)
}
case err := <-runErr:
if err != nil {
log.Errorw("error during Splitter startup", "err", err)
}
log.Info("shutting down")
if err := spl.Shutdown(); err != nil {
log.Errorw("error during Splitter shutdown", "err", err)
}
}

log.Info("shutdown complete")

return nil
}
11 changes: 11 additions & 0 deletions splitter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package splitter

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "spl_events_sent_counter",
Help: "The total number of events sent to consumers",
}, []string{"remote_addr", "user_agent"})
144 changes: 144 additions & 0 deletions splitter/ringbuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package splitter

import (
"context"
"sync"

events "github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/models"
)

func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer {
return &EventRingBuffer{
chunkSize: chunkSize,
maxChunkCount: nchunks,
}
}

type EventRingBuffer struct {
lk sync.Mutex
chunks []*ringChunk
chunkSize int
maxChunkCount int

broadcast func(*events.XRPCStreamEvent)
}

type ringChunk struct {
lk sync.Mutex
buf []*events.XRPCStreamEvent
}

func (rc *ringChunk) append(evt *events.XRPCStreamEvent) {
rc.lk.Lock()
defer rc.lk.Unlock()
rc.buf = append(rc.buf, evt)
}

func (rc *ringChunk) events() []*events.XRPCStreamEvent {
rc.lk.Lock()
defer rc.lk.Unlock()
return rc.buf
}

func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error {
er.lk.Lock()
defer er.lk.Unlock()

if len(er.chunks) == 0 {
er.chunks = []*ringChunk{new(ringChunk)}
}

last := er.chunks[len(er.chunks)-1]
if len(last.buf) >= er.chunkSize {
last = new(ringChunk)
er.chunks = append(er.chunks, last)
if len(er.chunks) > er.maxChunkCount {
er.chunks = er.chunks[1:]
}
}

last.append(evt)

er.broadcast(evt)
return nil
}

func (er *EventRingBuffer) Flush(context.Context) error {
return nil
}

func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
// run playback a few times to get as close to 'live' as possible before returning
for i := 0; i < 10; i++ {
n, err := er.playbackRound(ctx, since, cb)
if err != nil {
return err
}

// playback had no new events
if n-since == 0 {
return nil
}
since = n
}

return nil
}

func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) (int64, error) {
// grab a snapshot of the current chunks
er.lk.Lock()
chunks := er.chunks
er.lk.Unlock()

i := len(chunks) - 1
for ; i >= 0; i-- {
c := chunks[i]
evts := c.events()
if since > sequenceForEvent(evts[len(evts)-1]) {
i++
break
}
}
if i < 0 {
i = 0
}

var lastSeq int64 = since
for _, c := range chunks[i:] {
var nread int
evts := c.events()
for nread < len(evts) {
for _, e := range evts[nread:] {
nread++
seq := sequenceForEvent(e)
if seq <= since {
continue
}

if err := cb(e); err != nil {
return 0, err
}
lastSeq = seq
}

// recheck evts buffer to see if more were added while we were here
evts = c.events()
}
}

return lastSeq, nil
}

func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
er.broadcast = brc
}

func (er *EventRingBuffer) Shutdown(context.Context) error {
return nil
}

func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error {
return nil
}
Loading

0 comments on commit a1875ea

Please sign in to comment.