Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter what Jetstream ingests, persists, and emits #35

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cmd/jetstream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -100,6 +101,24 @@ func main() {
Value: 15 * time.Second,
EnvVars: []string{"JETSTREAM_LIVENESS_TTL"},
},
&cli.BoolFlag{
Name: "should-emit-identity",
Usage: "whether to emit identity events",
Value: true,
EnvVars: []string{"JETSTREAM_SHOULD_EMIT_IDENTITY"},
},
&cli.BoolFlag{
Name: "should-emit-account",
Usage: "whether to emit account events",
Value: true,
EnvVars: []string{"JETSTREAM_SHOULD_EMIT_ACCOUNT"},
},
&cli.StringFlag{
Name: "wanted-collections",
Usage: "Collections this Jetstream should emit, space delimited",
Value: ".*",
EnvVars: []string{"JETSTREAM_WANTED_COLLECTIONS"},
},
}

app.Action = Jetstream
Expand Down Expand Up @@ -129,12 +148,17 @@ func Jetstream(cctx *cli.Context) error {
return fmt.Errorf("failed to create server: %w", err)
}

var wantedCollections = strings.Split(cctx.String("wanted-collections"), " ")

c, err := consumer.NewConsumer(
ctx,
log,
u.String(),
cctx.String("data-dir"),
cctx.Duration("event-ttl"),
wantedCollections,
cctx.Bool("should-emit-identity"),
cctx.Bool("should-emit-account"),
s.Emit,
)
if err != nil {
Expand Down
118 changes: 98 additions & 20 deletions pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/data"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/repomgr"
Expand All @@ -23,19 +24,30 @@ import (
"go.opentelemetry.io/otel/attribute"
)

type WantedCollections struct {
Prefixes []string
FullPaths map[string]struct{}
}

// ErrInvalidOptions is returned when the consumer options are invalid
var ErrInvalidOptions = fmt.Errorf("invalid consumer options")

// Consumer is the consumer of the firehose
type Consumer struct {
SocketURL string
Progress *Progress
Emit func(context.Context, *models.Event, []byte, []byte) error
UncompressedDB *pebble.DB
CompressedDB *pebble.DB
encoder *zstd.Encoder
EventTTL time.Duration
logger *slog.Logger
clock *monotonic.Clock
buf chan *models.Event
sequencerShutdown chan chan struct{}
SocketURL string
Progress *Progress
Emit func(context.Context, *models.Event, []byte, []byte) error
UncompressedDB *pebble.DB
CompressedDB *pebble.DB
encoder *zstd.Encoder
EventTTL time.Duration
logger *slog.Logger
clock *monotonic.Clock
buf chan *models.Event
sequencerShutdown chan chan struct{}
wantedCollections *WantedCollections
shouldEmitIdentity bool
shouldEmitAccount bool

sequenced prometheus.Counter
persisted prometheus.Counter
Expand All @@ -51,6 +63,9 @@ func NewConsumer(
socketURL string,
dataDir string,
eventTTL time.Duration,
wantedCollectionsProvided []string,
shouldEmitIdentity bool,
shouldEmitAccount bool,
emit func(context.Context, *models.Event, []byte, []byte) error,
) (*Consumer, error) {
uDBPath := dataDir + "/jetstream.uncompressed.db"
Expand Down Expand Up @@ -78,20 +93,45 @@ func NewConsumer(
return nil, fmt.Errorf("failed to create zstd encoder: %w", err)
}

var wantedCol *WantedCollections
if len(wantedCollectionsProvided) > 0 {
wantedCol = &WantedCollections{
Prefixes: []string{},
FullPaths: make(map[string]struct{}),
}

for _, providedCol := range wantedCollectionsProvided {
if strings.HasSuffix(providedCol, ".*") {
wantedCol.Prefixes = append(wantedCol.Prefixes, strings.TrimSuffix(providedCol, "*"))
continue
}

col, err := syntax.ParseNSID(providedCol)
if err != nil {

return nil, fmt.Errorf("%w: invalid collection: %s", ErrInvalidOptions, providedCol)
}
wantedCol.FullPaths[col.String()] = struct{}{}
}
}

c := Consumer{
SocketURL: socketURL,
Progress: &Progress{
LastSeq: -1,
},
EventTTL: eventTTL,
Emit: emit,
UncompressedDB: uDB,
CompressedDB: cDB,
encoder: encoder,
logger: log,
clock: clock,
buf: make(chan *models.Event, 10_000),
sequencerShutdown: make(chan chan struct{}),
EventTTL: eventTTL,
Emit: emit,
UncompressedDB: uDB,
CompressedDB: cDB,
encoder: encoder,
logger: log,
clock: clock,
buf: make(chan *models.Event, 10_000),
sequencerShutdown: make(chan chan struct{}),
wantedCollections: wantedCol,
shouldEmitIdentity: shouldEmitIdentity,
shouldEmitAccount: shouldEmitAccount,

sequenced: eventsSequencedCounter.WithLabelValues(socketURL),
persisted: eventsPersistedCounter.WithLabelValues(socketURL),
Expand Down Expand Up @@ -132,6 +172,9 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE
}
return c.HandleRepoCommit(ctx, xe.RepoCommit)
case xe.RepoIdentity != nil:
if !c.shouldEmitIdentity {
return nil
}
eventsProcessedCounter.WithLabelValues("identity", c.SocketURL).Inc()
now := time.Now()
c.Progress.Update(xe.RepoIdentity.Seq, now)
Expand All @@ -155,6 +198,9 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE
lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(c.SocketURL).Set(float64(now.Sub(t).Seconds()))
lastSeqGauge.WithLabelValues(c.SocketURL).Set(float64(xe.RepoIdentity.Seq))
case xe.RepoAccount != nil:
if !c.shouldEmitAccount {
return nil
}
eventsProcessedCounter.WithLabelValues("account", c.SocketURL).Inc()
now := time.Now()
c.Progress.Update(xe.RepoAccount.Seq, now)
Expand Down Expand Up @@ -217,6 +263,10 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub

for _, op := range evt.Ops {
collection := strings.Split(op.Path, "/")[0]
if !c.WantsCollection(collection) {
continue
}

rkey := strings.Split(op.Path, "/")[1]

ek := repomgr.EventKind(op.Action)
Expand Down Expand Up @@ -393,3 +443,31 @@ func (c *Consumer) Shutdown() {
c.logger.Info("sequencer shutdown complete")
}
}

// WantsCollection returns true if the consumer wants the given collection
func (c *Consumer) WantsCollection(collection string) bool {
if c.wantedCollections == nil || collection == "" {
return true
}

// Check for the root wildcard first, indicating all collections
if len(c.wantedCollections.Prefixes) == 1 && c.wantedCollections.Prefixes[0] == "." {
return true
}

// Next full paths for fast lookup
if len(c.wantedCollections.FullPaths) > 0 {
if _, match := c.wantedCollections.FullPaths[collection]; match {
return true
}
}

// Check the prefixes (shortest first)
for _, prefix := range c.wantedCollections.Prefixes {
if strings.HasPrefix(collection, prefix) {
return true
}
}

return false
}