Skip to content

Commit

Permalink
Merge pull request #13 from bluesky-social/deprecate_fields
Browse files Browse the repository at this point in the history
Deprecate old event and commit type fields
  • Loading branch information
ericvolp12 authored Oct 31, 2024
2 parents 75f2991 + 4a517eb commit 7b414e0
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 35 deletions.
6 changes: 5 additions & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ func main() {
config.WebsocketURL = serverAddr
config.Compress = true

h := &handler{
seenSeqs: make(map[int64]struct{}),
}

scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent)

c, err := client.NewClient(config, logger, scheduler)
if err != nil {
log.Fatalf("failed to create client: %v", err)
}

cursor := time.Now().Add(90 * -time.Minute).UnixMicro()
cursor := time.Now().Add(5 * -time.Minute).UnixMicro()

// Every 5 seconds print the events read and bytes read and average event size
go func() {
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down Expand Up @@ -101,8 +102,6 @@ require (
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240904181319-8dc02b38228c // indirect
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect
gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
Expand Down
22 changes: 8 additions & 14 deletions pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE

// Emit identity update
e := models.Event{
Did: xe.RepoIdentity.Did,
Kind: models.EventKindIdentity,
EventType: models.EventIdentity,
Identity: xe.RepoIdentity,
Did: xe.RepoIdentity.Did,
Kind: models.EventKindIdentity,
Identity: xe.RepoIdentity,
}
// Send to the sequencer
c.buf <- &e
Expand All @@ -168,10 +167,9 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE

// Emit account update
e := models.Event{
Did: xe.RepoAccount.Did,
Kind: models.EventKindAccount,
EventType: models.EventAccount,
Account: xe.RepoAccount,
Did: xe.RepoAccount.Did,
Kind: models.EventKindAccount,
Account: xe.RepoAccount,
}
// Send to the sequencer
c.buf <- &e
Expand Down Expand Up @@ -234,9 +232,8 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
span.SetAttributes(attribute.String("event_kind", op.Action))

e := models.Event{
Did: evt.Repo,
EventType: models.EventCommit,
Kind: models.EventKindCommit,
Did: evt.Repo,
Kind: models.EventKindCommit,
}

switch ek {
Expand Down Expand Up @@ -273,7 +270,6 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
e.Commit = &models.Commit{
Rev: evt.Rev,
Operation: models.CommitOperationCreate,
OpType: models.CommitCreateRecord,
Collection: collection,
RKey: rkey,
Record: recJSON,
Expand Down Expand Up @@ -312,7 +308,6 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
e.Commit = &models.Commit{
Rev: evt.Rev,
Operation: models.CommitOperationUpdate,
OpType: models.CommitUpdateRecord,
Collection: collection,
RKey: rkey,
Record: recJSON,
Expand All @@ -323,7 +318,6 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
e.Commit = &models.Commit{
Rev: evt.Rev,
Operation: models.CommitOperationDelete,
OpType: models.CommitDeleteRecord,
Collection: collection,
RKey: rkey,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/consumer/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event, asJSON,
// Key structure for events in PebbleDB
// {{event_time_us}}_{{repo}}_{{collection}}
var key []byte
if evt.EventType == models.EventCommit && evt.Commit != nil {
if evt.Kind == models.EventKindCommit && evt.Commit != nil {
key = []byte(fmt.Sprintf("%d_%s_%s", evt.TimeUS, evt.Did, evt.Commit.Collection))
} else {
key = []byte(fmt.Sprintf("%d_%s", evt.TimeUS, evt.Did))
Expand Down
22 changes: 6 additions & 16 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@ import (
var ZSTDDictionary []byte

type Event struct {
Did string `json:"did"`
TimeUS int64 `json:"time_us"`
EventType string `json:"type"`
Kind string `json:"kind,omitempty"`
Commit *Commit `json:"commit,omitempty"`
Account *comatproto.SyncSubscribeRepos_Account `json:"account,omitempty"`
Identity *comatproto.SyncSubscribeRepos_Identity `json:"identity,omitempty"`
Did string `json:"did"`
TimeUS int64 `json:"time_us"`
Kind string `json:"kind,omitempty"`
Commit *Commit `json:"commit,omitempty"`
Account *comatproto.SyncSubscribeRepos_Account `json:"account,omitempty"`
Identity *comatproto.SyncSubscribeRepos_Identity `json:"identity,omitempty"`
}

type Commit struct {
Rev string `json:"rev,omitempty"`
OpType string `json:"type"`
Operation string `json:"operation,omitempty"`
Collection string `json:"collection,omitempty"`
RKey string `json:"rkey,omitempty"`
Expand All @@ -32,14 +30,6 @@ type Commit struct {
}

var (
EventCommit = "com"
EventAccount = "acc"
EventIdentity = "id"

CommitCreateRecord = "c"
CommitUpdateRecord = "u"
CommitDeleteRecord = "d"

EventKindCommit = "commit"
EventKindAccount = "account"
EventKindIdentity = "identity"
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []
bytesEmitted.Add(evtSize)

collection := ""
if e.EventType == models.EventCommit && e.Commit != nil {
if e.Kind == models.EventKindCommit && e.Commit != nil {
collection = e.Commit.Collection
}

Expand Down

0 comments on commit 7b414e0

Please sign in to comment.