-
Notifications
You must be signed in to change notification settings - Fork 137
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
785ad18
commit b885639
Showing
2 changed files
with
365 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/bluesky-social/indigo/bgs" | ||
"github.com/bluesky-social/indigo/util/version" | ||
_ "go.uber.org/automaxprocs" | ||
|
||
_ "net/http/pprof" | ||
|
||
_ "github.com/joho/godotenv/autoload" | ||
|
||
logging "github.com/ipfs/go-log" | ||
"github.com/urfave/cli/v2" | ||
"go.opentelemetry.io/otel" | ||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" | ||
"go.opentelemetry.io/otel/sdk/resource" | ||
tracesdk "go.opentelemetry.io/otel/sdk/trace" | ||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" | ||
) | ||
|
||
var log = logging.Logger("splitter") | ||
|
||
func init() { | ||
// control log level using, eg, GOLOG_LOG_LEVEL=debug | ||
//logging.SetAllLoggers(logging.LevelDebug) | ||
} | ||
|
||
func main() { | ||
run(os.Args) | ||
} | ||
|
||
func run(args []string) { | ||
app := cli.App{ | ||
Name: "splitter", | ||
Usage: "firehose proxy", | ||
Version: version.Version, | ||
} | ||
|
||
app.Flags = []cli.Flag{ | ||
&cli.BoolFlag{ | ||
Name: "crawl-insecure-ws", | ||
Usage: "when connecting to PDS instances, use ws:// instead of wss://", | ||
}, | ||
&cli.StringFlag{ | ||
Name: "api-listen", | ||
Value: ":2480", | ||
}, | ||
&cli.StringFlag{ | ||
Name: "metrics-listen", | ||
Value: ":2481", | ||
EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, | ||
}, | ||
} | ||
|
||
app.Action = Splitter | ||
err := app.Run(os.Args) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
} | ||
|
||
func Splitter(cctx *cli.Context) error { | ||
// Trap SIGINT to trigger a shutdown. | ||
signals := make(chan os.Signal, 1) | ||
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) | ||
|
||
// Enable OTLP HTTP exporter | ||
// For relevant environment variables: | ||
// https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables | ||
// At a minimum, you need to set | ||
// OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 | ||
if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { | ||
log.Infow("setting up trace exporter", "endpoint", ep) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
exp, err := otlptracehttp.New(ctx) | ||
if err != nil { | ||
log.Fatalw("failed to create trace exporter", "error", err) | ||
} | ||
defer func() { | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
if err := exp.Shutdown(ctx); err != nil { | ||
log.Errorw("failed to shutdown trace exporter", "error", err) | ||
} | ||
}() | ||
|
||
tp := tracesdk.NewTracerProvider( | ||
tracesdk.WithBatcher(exp), | ||
tracesdk.WithResource(resource.NewWithAttributes( | ||
semconv.SchemaURL, | ||
semconv.ServiceNameKey.String("splitter"), | ||
attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog | ||
attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others | ||
attribute.Int64("ID", 1), | ||
)), | ||
) | ||
otel.SetTracerProvider(tp) | ||
} | ||
|
||
spl := splitter.New(cctx.String("bgs-host")) | ||
|
||
// set up metrics endpoint | ||
go func() { | ||
if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { | ||
log.Fatalf("failed to start metrics endpoint: %s", err) | ||
} | ||
}() | ||
|
||
runErr := make(chan error, 1) | ||
|
||
go func() { | ||
err := spl.Start(cctx.String("api-listen")) | ||
runErr <- err | ||
}() | ||
|
||
log.Infow("startup complete") | ||
select { | ||
case <-signals: | ||
log.Info("received shutdown signal") | ||
errs := spl.Shutdown() | ||
for err := range errs { | ||
log.Errorw("error during Splitter shutdown", "err", err) | ||
} | ||
case err := <-runErr: | ||
if err != nil { | ||
log.Errorw("error during Splitter startup", "err", err) | ||
} | ||
log.Info("shutting down") | ||
errs := bgs.Shutdown() | ||
for err := range errs { | ||
log.Errorw("error during Splitter shutdown", "err", err) | ||
} | ||
} | ||
|
||
log.Info("shutdown complete") | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
package splitter | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
"sync" | ||
"time" | ||
|
||
events "github.com/bluesky-social/indigo/events" | ||
"github.com/bluesky-social/indigo/events/schedulers/sequential" | ||
"github.com/bluesky-social/indigo/models" | ||
"github.com/gorilla/websocket" | ||
logging "github.com/ipfs/go-log" | ||
) | ||
|
||
var log = logging.Logger("splitter") | ||
|
||
type Splitter struct { | ||
Host string | ||
erb *EventRingBuffer | ||
events *events.EventManager | ||
} | ||
|
||
func NewSplitter(host string, persister events.EventPersistence) *Splitter { | ||
erb := NewEventRingBuffer(20000, 1000) | ||
|
||
em := events.NewEventManager(erb) | ||
return &Splitter{ | ||
Host: host, | ||
erb: erb, | ||
events: em, | ||
} | ||
} | ||
|
||
func (s *Splitter) Start() error { | ||
return nil | ||
} | ||
|
||
func sleepForBackoff(b int) time.Duration { | ||
if b == 0 { | ||
return 0 | ||
} | ||
|
||
if b < 50 { | ||
return time.Millisecond * time.Duration(rand.Intn(100)+(5*b)) | ||
} | ||
|
||
return time.Second * 5 | ||
} | ||
|
||
func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, cursor int64) { | ||
d := websocket.Dialer{} | ||
|
||
protocol := "wss" | ||
|
||
var backoff int | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
} | ||
|
||
url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) | ||
con, res, err := d.DialContext(ctx, url, nil) | ||
if err != nil { | ||
log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff) | ||
time.Sleep(sleepForBackoff(backoff)) | ||
backoff++ | ||
|
||
continue | ||
} | ||
|
||
log.Info("event subscription response code: ", res.StatusCode) | ||
|
||
if err := s.handleConnection(ctx, host, con, &cursor); err != nil { | ||
log.Warnf("connection to %q failed: %s", host, err) | ||
} | ||
} | ||
} | ||
|
||
func (s *Splitter) handleConnection(ctx context.Context, host string, con *websocket.Conn, lastCursor *int64) error { | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
|
||
sched := sequential.NewScheduler("splitter", s.events.AddEvent) | ||
return events.HandleRepoStream(ctx, con, sched) | ||
} | ||
|
||
func sequenceForEvent(evt *events.XRPCStreamEvent) int64 { | ||
switch { | ||
case evt.RepoCommit != nil: | ||
return evt.RepoCommit.Seq | ||
case evt.RepoHandle != nil: | ||
return evt.RepoHandle.Seq | ||
case evt.RepoMigrate != nil: | ||
return evt.RepoMigrate.Seq | ||
case evt.RepoTombstone != nil: | ||
return evt.RepoTombstone.Seq | ||
case evt.RepoInfo != nil: | ||
return -1 | ||
default: | ||
return -1 | ||
} | ||
} | ||
|
||
func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { | ||
return &EventRingBuffer{ | ||
chunkSize: chunkSize, | ||
maxChunkCount: nchunks, | ||
} | ||
} | ||
|
||
type EventRingBuffer struct { | ||
lk sync.Mutex | ||
chunks []*ringChunk | ||
chunkSize int | ||
maxChunkCount int | ||
|
||
broadcast func(*events.XRPCStreamEvent) | ||
} | ||
|
||
type ringChunk struct { | ||
lk sync.Mutex | ||
buf []*events.XRPCStreamEvent | ||
} | ||
|
||
func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { | ||
rc.lk.Lock() | ||
defer rc.lk.Unlock() | ||
rc.buf = append(rc.buf, evt) | ||
} | ||
|
||
func (rc *ringChunk) events() []*events.XRPCStreamEvent { | ||
rc.lk.Lock() | ||
defer rc.lk.Unlock() | ||
return rc.buf | ||
} | ||
|
||
func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { | ||
er.lk.Lock() | ||
defer er.lk.Unlock() | ||
|
||
if len(er.chunks) == 0 { | ||
er.chunks = []*ringChunk{new(ringChunk)} | ||
} | ||
|
||
last := er.chunks[len(er.chunks)-1] | ||
if len(last.buf) >= er.chunkSize { | ||
last = new(ringChunk) | ||
er.chunks = append(er.chunks, last) | ||
if len(er.chunks) > er.maxChunkCount { | ||
er.chunks = er.chunks[1:] | ||
} | ||
} | ||
|
||
last.append(evt) | ||
|
||
er.broadcast(evt) | ||
return nil | ||
} | ||
|
||
func (er *EventRingBuffer) Flush(context.Context) error { | ||
return nil | ||
} | ||
|
||
func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { | ||
// grab a snapshot of the current chunks | ||
er.lk.Lock() | ||
chunks := er.chunks | ||
er.lk.Unlock() | ||
|
||
i := len(chunks) - 1 | ||
for ; i >= 0; i-- { | ||
c := chunks[i] | ||
evts := c.events() | ||
if since > sequenceForEvent(evts[len(evts)-1]) { | ||
i++ | ||
break | ||
} | ||
} | ||
|
||
for _, c := range chunks[i:] { | ||
var nread int | ||
evts := c.events() | ||
for nread < len(evts) { | ||
for _, e := range evts[nread:] { | ||
if since > 0 && sequenceForEvent(e) < since { | ||
continue | ||
} | ||
since = 0 | ||
|
||
if err := cb(e); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// recheck evts buffer to see if more were added while we were here | ||
evts = c.events() | ||
} | ||
} | ||
|
||
// TODO: probably also check for if new chunks were added while we were iterating... | ||
return nil | ||
} | ||
|
||
func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { | ||
er.broadcast = brc | ||
} | ||
|
||
func (er *EventRingBuffer) Shutdown(context.Context) error { | ||
return nil | ||
} | ||
|
||
func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { | ||
return nil | ||
} |