diff --git a/cmd/client/main.go b/cmd/client/main.go index 7e9cc58..43e9abf 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -31,6 +31,10 @@ func main() { config.WebsocketURL = serverAddr config.Compress = true + h := &handler{ + seenSeqs: make(map[int64]struct{}), + } + scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) c, err := client.NewClient(config, logger, scheduler) @@ -38,7 +42,7 @@ func main() { log.Fatalf("failed to create client: %v", err) } - cursor := time.Now().Add(90 * -time.Minute).UnixMicro() + cursor := time.Now().Add(5 * -time.Minute).UnixMicro() // Every 5 seconds print the events read and bytes read and average event size go func() { diff --git a/go.mod b/go.mod index f4a48be..e3c42bc 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -101,8 +102,6 @@ require ( github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect github.com/whyrusleeping/cbor-gen v0.1.3-0.20240904181319-8dc02b38228c // indirect github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect - gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect - gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/trace v1.21.0 // indirect diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index 27c8ebe..3357b90 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -144,10 +144,9 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE // Emit identity update e := models.Event{ - Did: xe.RepoIdentity.Did, - Kind: models.EventKindIdentity, - EventType: models.EventIdentity, - Identity: xe.RepoIdentity, + Did: xe.RepoIdentity.Did, + Kind: models.EventKindIdentity, + Identity: xe.RepoIdentity, } // Send to the sequencer c.buf <- &e @@ -168,10 +167,9 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE // Emit account update e := models.Event{ - Did: xe.RepoAccount.Did, - Kind: models.EventKindAccount, - EventType: models.EventAccount, - Account: xe.RepoAccount, + Did: xe.RepoAccount.Did, + Kind: models.EventKindAccount, + Account: xe.RepoAccount, } // Send to the sequencer c.buf <- &e @@ -234,9 +232,8 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub span.SetAttributes(attribute.String("event_kind", op.Action)) e := models.Event{ - Did: evt.Repo, - EventType: models.EventCommit, - Kind: models.EventKindCommit, + Did: evt.Repo, + Kind: models.EventKindCommit, } switch ek { @@ -273,7 +270,6 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub e.Commit = &models.Commit{ Rev: evt.Rev, Operation: models.CommitOperationCreate, - OpType: models.CommitCreateRecord, Collection: collection, RKey: rkey, Record: recJSON, @@ -312,7 +308,6 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub e.Commit = &models.Commit{ Rev: evt.Rev, Operation: models.CommitOperationUpdate, - OpType: models.CommitUpdateRecord, Collection: collection, RKey: rkey, Record: recJSON, @@ -323,7 +318,6 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub e.Commit = &models.Commit{ Rev: evt.Rev, Operation: models.CommitOperationDelete, - OpType: models.CommitDeleteRecord, Collection: collection, RKey: rkey, } diff --git a/pkg/consumer/persist.go b/pkg/consumer/persist.go index 4d2bbb6..f4b08ba 100644 --- a/pkg/consumer/persist.go +++ b/pkg/consumer/persist.go @@ -94,7 +94,7 @@ func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event, asJSON, // Key structure for events in PebbleDB // {{event_time_us}}_{{repo}}_{{collection}} var key []byte - if evt.EventType == models.EventCommit && evt.Commit != nil { + if evt.Kind == models.EventKindCommit && evt.Commit != nil { key = []byte(fmt.Sprintf("%d_%s_%s", evt.TimeUS, evt.Did, evt.Commit.Collection)) } else { key = []byte(fmt.Sprintf("%d_%s", evt.TimeUS, evt.Did)) diff --git a/pkg/models/models.go b/pkg/models/models.go index 536b1af..a1a7dea 100644 --- a/pkg/models/models.go +++ b/pkg/models/models.go @@ -12,18 +12,16 @@ import ( var ZSTDDictionary []byte type Event struct { - Did string `json:"did"` - TimeUS int64 `json:"time_us"` - EventType string `json:"type"` - Kind string `json:"kind,omitempty"` - Commit *Commit `json:"commit,omitempty"` - Account *comatproto.SyncSubscribeRepos_Account `json:"account,omitempty"` - Identity *comatproto.SyncSubscribeRepos_Identity `json:"identity,omitempty"` + Did string `json:"did"` + TimeUS int64 `json:"time_us"` + Kind string `json:"kind,omitempty"` + Commit *Commit `json:"commit,omitempty"` + Account *comatproto.SyncSubscribeRepos_Account `json:"account,omitempty"` + Identity *comatproto.SyncSubscribeRepos_Identity `json:"identity,omitempty"` } type Commit struct { Rev string `json:"rev,omitempty"` - OpType string `json:"type"` Operation string `json:"operation,omitempty"` Collection string `json:"collection,omitempty"` RKey string `json:"rkey,omitempty"` @@ -32,14 +30,6 @@ type Commit struct { } var ( - EventCommit = "com" - EventAccount = "acc" - EventIdentity = "id" - - CommitCreateRecord = "c" - CommitUpdateRecord = "u" - CommitDeleteRecord = "d" - EventKindCommit = "commit" EventKindAccount = "account" EventKindIdentity = "identity" diff --git a/pkg/server/server.go b/pkg/server/server.go index 78bd318..aa2342a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -281,7 +281,7 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes [] bytesEmitted.Add(evtSize) collection := "" - if e.EventType == models.EventCommit && e.Commit != nil { + if e.Kind == models.EventKindCommit && e.Commit != nil { collection = e.Commit.Collection }