diff --git a/api/atproto/admindefs.go b/api/atproto/admindefs.go index 083401de0..f001a40cc 100644 --- a/api/atproto/admindefs.go +++ b/api/atproto/admindefs.go @@ -20,10 +20,12 @@ type AdminDefs_ActionReversal struct { // AdminDefs_ActionView is a "actionView" in the com.atproto.admin.defs schema. type AdminDefs_ActionView struct { - Action *string `json:"action" cborgen:"action"` - CreateLabelVals []string `json:"createLabelVals,omitempty" cborgen:"createLabelVals,omitempty"` - CreatedAt string `json:"createdAt" cborgen:"createdAt"` - CreatedBy string `json:"createdBy" cborgen:"createdBy"` + Action *string `json:"action" cborgen:"action"` + CreateLabelVals []string `json:"createLabelVals,omitempty" cborgen:"createLabelVals,omitempty"` + CreatedAt string `json:"createdAt" cborgen:"createdAt"` + CreatedBy string `json:"createdBy" cborgen:"createdBy"` + // durationInHours: Indicates how long this action was meant to be in effect before automatically expiring. + DurationInHours *int64 `json:"durationInHours,omitempty" cborgen:"durationInHours,omitempty"` Id int64 `json:"id" cborgen:"id"` NegateLabelVals []string `json:"negateLabelVals,omitempty" cborgen:"negateLabelVals,omitempty"` Reason string `json:"reason" cborgen:"reason"` @@ -36,15 +38,19 @@ type AdminDefs_ActionView struct { // AdminDefs_ActionViewCurrent is a "actionViewCurrent" in the com.atproto.admin.defs schema. type AdminDefs_ActionViewCurrent struct { Action *string `json:"action" cborgen:"action"` - Id int64 `json:"id" cborgen:"id"` + // durationInHours: Indicates how long this action was meant to be in effect before automatically expiring. + DurationInHours *int64 `json:"durationInHours,omitempty" cborgen:"durationInHours,omitempty"` + Id int64 `json:"id" cborgen:"id"` } // AdminDefs_ActionViewDetail is a "actionViewDetail" in the com.atproto.admin.defs schema. type AdminDefs_ActionViewDetail struct { - Action *string `json:"action" cborgen:"action"` - CreateLabelVals []string `json:"createLabelVals,omitempty" cborgen:"createLabelVals,omitempty"` - CreatedAt string `json:"createdAt" cborgen:"createdAt"` - CreatedBy string `json:"createdBy" cborgen:"createdBy"` + Action *string `json:"action" cborgen:"action"` + CreateLabelVals []string `json:"createLabelVals,omitempty" cborgen:"createLabelVals,omitempty"` + CreatedAt string `json:"createdAt" cborgen:"createdAt"` + CreatedBy string `json:"createdBy" cborgen:"createdBy"` + // durationInHours: Indicates how long this action was meant to be in effect before automatically expiring. + DurationInHours *int64 `json:"durationInHours,omitempty" cborgen:"durationInHours,omitempty"` Id int64 `json:"id" cborgen:"id"` NegateLabelVals []string `json:"negateLabelVals,omitempty" cborgen:"negateLabelVals,omitempty"` Reason string `json:"reason" cborgen:"reason"` diff --git a/api/atproto/admintakeModerationAction.go b/api/atproto/admintakeModerationAction.go index 2678f9e73..d95f88e85 100644 --- a/api/atproto/admintakeModerationAction.go +++ b/api/atproto/admintakeModerationAction.go @@ -15,9 +15,11 @@ import ( // AdminTakeModerationAction_Input is the input argument to a com.atproto.admin.takeModerationAction call. type AdminTakeModerationAction_Input struct { - Action string `json:"action" cborgen:"action"` - CreateLabelVals []string `json:"createLabelVals,omitempty" cborgen:"createLabelVals,omitempty"` - CreatedBy string `json:"createdBy" cborgen:"createdBy"` + Action string `json:"action" cborgen:"action"` + CreateLabelVals []string `json:"createLabelVals,omitempty" cborgen:"createLabelVals,omitempty"` + CreatedBy string `json:"createdBy" cborgen:"createdBy"` + // durationInHours: Indicates how long this action was meant to be in effect before automatically expiring. + DurationInHours *int64 `json:"durationInHours,omitempty" cborgen:"durationInHours,omitempty"` NegateLabelVals []string `json:"negateLabelVals,omitempty" cborgen:"negateLabelVals,omitempty"` Reason string `json:"reason" cborgen:"reason"` Subject *AdminTakeModerationAction_Input_Subject `json:"subject" cborgen:"subject"` diff --git a/api/atproto/cbor_gen.go b/api/atproto/cbor_gen.go index 460020988..455bea188 100644 --- a/api/atproto/cbor_gen.go +++ b/api/atproto/cbor_gen.go @@ -194,7 +194,7 @@ func (t *SyncSubscribeRepos_Commit) MarshalCBOR(w io.Writer) error { cw := cbg.NewCborWriter(w) - if _, err := cw.Write([]byte{170}); err != nil { + if _, err := cw.Write([]byte{172}); err != nil { return err } @@ -223,6 +223,29 @@ func (t *SyncSubscribeRepos_Commit) MarshalCBOR(w io.Writer) error { } } + // t.Rev (string) (string) + if len("rev") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"rev\" was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil { + return err + } + if _, err := cw.WriteString(string("rev")); err != nil { + return err + } + + if len(t.Rev) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Rev was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil { + return err + } + if _, err := cw.WriteString(string(t.Rev)); err != nil { + return err + } + // t.Seq (int64) (int64) if len("seq") > cbg.MaxLength { return xerrors.Errorf("Value in field \"seq\" was too long") @@ -332,6 +355,35 @@ func (t *SyncSubscribeRepos_Commit) MarshalCBOR(w io.Writer) error { } } + // t.Since (string) (string) + if len("since") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"since\" was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("since"))); err != nil { + return err + } + if _, err := cw.WriteString(string("since")); err != nil { + return err + } + + if t.Since == nil { + if _, err := cw.Write(cbg.CborNull); err != nil { + return err + } + } else { + if len(*t.Since) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Since was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.Since))); err != nil { + return err + } + if _, err := cw.WriteString(string(*t.Since)); err != nil { + return err + } + } + // t.Blocks (util.LexBytes) (slice) if len("blocks") > cbg.MaxLength { return xerrors.Errorf("Value in field \"blocks\" was too long") @@ -474,6 +526,17 @@ func (t *SyncSubscribeRepos_Commit) UnmarshalCBOR(r io.Reader) (err error) { t.Ops[i] = &v } + // t.Rev (string) (string) + case "rev": + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Rev = string(sval) + } // t.Seq (int64) (int64) case "seq": { @@ -572,6 +635,27 @@ func (t *SyncSubscribeRepos_Commit) UnmarshalCBOR(r io.Reader) (err error) { t.Blobs[i] = v } + // t.Since (string) (string) + case "since": + + { + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Since = (*string)(&sval) + } + } // t.Blocks (util.LexBytes) (slice) case "blocks": @@ -1606,10 +1690,10 @@ func (t *LabelDefs_SelfLabels) MarshalCBOR(w io.Writer) error { return err } - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("com.atproto.label.defs"))); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("com.atproto.label.defs#selfLabels"))); err != nil { return err } - if _, err := cw.WriteString(string("com.atproto.label.defs")); err != nil { + if _, err := cw.WriteString(string("com.atproto.label.defs#selfLabels")); err != nil { return err } diff --git a/api/atproto/repostrongRef.go b/api/atproto/repostrongRef.go index 6c16ec210..232cec613 100644 --- a/api/atproto/repostrongRef.go +++ b/api/atproto/repostrongRef.go @@ -13,7 +13,7 @@ func init() { } // RepoStrongRef is a "main" in the com.atproto.repo.strongRef schema. // RECORDTYPE: RepoStrongRef type RepoStrongRef struct { - LexiconTypeID string `json:"$type,const=com.atproto.repo.strongRef#main,omitempty" cborgen:"$type,const=com.atproto.repo.strongRef#main,omitempty"` + LexiconTypeID string `json:"$type,const=com.atproto.repo.strongRef,omitempty" cborgen:"$type,const=com.atproto.repo.strongRef,omitempty"` Cid string `json:"cid" cborgen:"cid"` Uri string `json:"uri" cborgen:"uri"` } diff --git a/api/atproto/syncgetCheckout.go b/api/atproto/syncgetCheckout.go index ea82b9860..301b7cfcd 100644 --- a/api/atproto/syncgetCheckout.go +++ b/api/atproto/syncgetCheckout.go @@ -13,14 +13,12 @@ import ( // SyncGetCheckout calls the XRPC method "com.atproto.sync.getCheckout". // -// commit: The commit to get the checkout from. Defaults to current HEAD. // did: The DID of the repo. -func SyncGetCheckout(ctx context.Context, c *xrpc.Client, commit string, did string) ([]byte, error) { +func SyncGetCheckout(ctx context.Context, c *xrpc.Client, did string) ([]byte, error) { buf := new(bytes.Buffer) params := map[string]interface{}{ - "commit": commit, - "did": did, + "did": did, } if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.getCheckout", params, nil, buf); err != nil { return nil, err diff --git a/api/atproto/syncgetLatestCommit.go b/api/atproto/syncgetLatestCommit.go new file mode 100644 index 000000000..76631f2a9 --- /dev/null +++ b/api/atproto/syncgetLatestCommit.go @@ -0,0 +1,33 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package atproto + +// schema: com.atproto.sync.getLatestCommit + +import ( + "context" + + "github.com/bluesky-social/indigo/xrpc" +) + +// SyncGetLatestCommit_Output is the output of a com.atproto.sync.getLatestCommit call. +type SyncGetLatestCommit_Output struct { + Cid string `json:"cid" cborgen:"cid"` + Rev string `json:"rev" cborgen:"rev"` +} + +// SyncGetLatestCommit calls the XRPC method "com.atproto.sync.getLatestCommit". +// +// did: The DID of the repo. +func SyncGetLatestCommit(ctx context.Context, c *xrpc.Client, did string) (*SyncGetLatestCommit_Output, error) { + var out SyncGetLatestCommit_Output + + params := map[string]interface{}{ + "did": did, + } + if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.getLatestCommit", params, nil, &out); err != nil { + return nil, err + } + + return &out, nil +} diff --git a/api/atproto/syncgetRepo.go b/api/atproto/syncgetRepo.go index 495d885db..576670a92 100644 --- a/api/atproto/syncgetRepo.go +++ b/api/atproto/syncgetRepo.go @@ -14,15 +14,13 @@ import ( // SyncGetRepo calls the XRPC method "com.atproto.sync.getRepo". // // did: The DID of the repo. -// earliest: The earliest commit in the commit range (not inclusive) -// latest: The latest commit in the commit range (inclusive) -func SyncGetRepo(ctx context.Context, c *xrpc.Client, did string, earliest string, latest string) ([]byte, error) { +// since: The revision of the repo to catch up from. +func SyncGetRepo(ctx context.Context, c *xrpc.Client, did string, since string) ([]byte, error) { buf := new(bytes.Buffer) params := map[string]interface{}{ - "did": did, - "earliest": earliest, - "latest": latest, + "did": did, + "since": since, } if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.getRepo", params, nil, buf); err != nil { return nil, err diff --git a/api/atproto/synclistBlobs.go b/api/atproto/synclistBlobs.go index d082a2e29..7f27dea96 100644 --- a/api/atproto/synclistBlobs.go +++ b/api/atproto/synclistBlobs.go @@ -12,21 +12,22 @@ import ( // SyncListBlobs_Output is the output of a com.atproto.sync.listBlobs call. type SyncListBlobs_Output struct { - Cids []string `json:"cids" cborgen:"cids"` + Cids []string `json:"cids" cborgen:"cids"` + Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"` } // SyncListBlobs calls the XRPC method "com.atproto.sync.listBlobs". // // did: The DID of the repo. -// earliest: The earliest commit to start from -// latest: The most recent commit -func SyncListBlobs(ctx context.Context, c *xrpc.Client, did string, earliest string, latest string) (*SyncListBlobs_Output, error) { +// since: Optional revision of the repo to list blobs since +func SyncListBlobs(ctx context.Context, c *xrpc.Client, cursor string, did string, limit int64, since string) (*SyncListBlobs_Output, error) { var out SyncListBlobs_Output params := map[string]interface{}{ - "did": did, - "earliest": earliest, - "latest": latest, + "cursor": cursor, + "did": did, + "limit": limit, + "since": since, } if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.listBlobs", params, nil, &out); err != nil { return nil, err diff --git a/api/atproto/syncsubscribeRepos.go b/api/atproto/syncsubscribeRepos.go index 56971754d..47a014bb9 100644 --- a/api/atproto/syncsubscribeRepos.go +++ b/api/atproto/syncsubscribeRepos.go @@ -18,9 +18,13 @@ type SyncSubscribeRepos_Commit struct { Prev *util.LexLink `json:"prev" cborgen:"prev"` Rebase bool `json:"rebase" cborgen:"rebase"` Repo string `json:"repo" cborgen:"repo"` - Seq int64 `json:"seq" cborgen:"seq"` - Time string `json:"time" cborgen:"time"` - TooBig bool `json:"tooBig" cborgen:"tooBig"` + // rev: The rev of the emitted commit + Rev string `json:"rev" cborgen:"rev"` + Seq int64 `json:"seq" cborgen:"seq"` + // since: The rev of the last emitted commit from this repo + Since *string `json:"since" cborgen:"since"` + Time string `json:"time" cborgen:"time"` + TooBig bool `json:"tooBig" cborgen:"tooBig"` } // SyncSubscribeRepos_Handle is a "handle" in the com.atproto.sync.subscribeRepos schema. @@ -46,6 +50,8 @@ type SyncSubscribeRepos_Migrate struct { } // SyncSubscribeRepos_RepoOp is a "repoOp" in the com.atproto.sync.subscribeRepos schema. +// +// A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null. type SyncSubscribeRepos_RepoOp struct { Action string `json:"action" cborgen:"action"` Cid *util.LexLink `json:"cid" cborgen:"cid"` diff --git a/api/atproto/tempupgradeRepoVersion.go b/api/atproto/tempupgradeRepoVersion.go new file mode 100644 index 000000000..c28c904ed --- /dev/null +++ b/api/atproto/tempupgradeRepoVersion.go @@ -0,0 +1,25 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package atproto + +// schema: com.atproto.temp.upgradeRepoVersion + +import ( + "context" + + "github.com/bluesky-social/indigo/xrpc" +) + +// TempUpgradeRepoVersion_Input is the input argument to a com.atproto.temp.upgradeRepoVersion call. +type TempUpgradeRepoVersion_Input struct { + Did string `json:"did" cborgen:"did"` +} + +// TempUpgradeRepoVersion calls the XRPC method "com.atproto.temp.upgradeRepoVersion". +func TempUpgradeRepoVersion(ctx context.Context, c *xrpc.Client, input *TempUpgradeRepoVersion_Input) error { + if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.temp.upgradeRepoVersion", nil, input, nil); err != nil { + return err + } + + return nil +} diff --git a/api/bsky/cbor_gen.go b/api/bsky/cbor_gen.go index adf70c71c..191758a35 100644 --- a/api/bsky/cbor_gen.go +++ b/api/bsky/cbor_gen.go @@ -2867,10 +2867,10 @@ func (t *RichtextFacet_Link) MarshalCBOR(w io.Writer) error { return err } - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.richtext.facet"))); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.richtext.facet#link"))); err != nil { return err } - if _, err := cw.WriteString(string("app.bsky.richtext.facet")); err != nil { + if _, err := cw.WriteString(string("app.bsky.richtext.facet#link")); err != nil { return err } return nil @@ -2992,10 +2992,10 @@ func (t *RichtextFacet_Mention) MarshalCBOR(w io.Writer) error { return err } - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.richtext.facet"))); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.richtext.facet#mention"))); err != nil { return err } - if _, err := cw.WriteString(string("app.bsky.richtext.facet")); err != nil { + if _, err := cw.WriteString(string("app.bsky.richtext.facet#mention")); err != nil { return err } return nil @@ -3280,10 +3280,10 @@ func (t *FeedDefs_NotFoundPost) MarshalCBOR(w io.Writer) error { return err } - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.feed.defs"))); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.feed.defs#notFoundPost"))); err != nil { return err } - if _, err := cw.WriteString(string("app.bsky.feed.defs")); err != nil { + if _, err := cw.WriteString(string("app.bsky.feed.defs#notFoundPost")); err != nil { return err } diff --git a/api/bsky/embedexternal.go b/api/bsky/embedexternal.go index d3ee97b2b..12a6722a8 100644 --- a/api/bsky/embedexternal.go +++ b/api/bsky/embedexternal.go @@ -13,7 +13,7 @@ func init() { } // EmbedExternal is a "main" in the app.bsky.embed.external schema. // RECORDTYPE: EmbedExternal type EmbedExternal struct { - LexiconTypeID string `json:"$type,const=app.bsky.embed.external#main" cborgen:"$type,const=app.bsky.embed.external#main"` + LexiconTypeID string `json:"$type,const=app.bsky.embed.external" cborgen:"$type,const=app.bsky.embed.external"` External *EmbedExternal_External `json:"external" cborgen:"external"` } diff --git a/api/bsky/embedimages.go b/api/bsky/embedimages.go index 86409af72..927f3ff7c 100644 --- a/api/bsky/embedimages.go +++ b/api/bsky/embedimages.go @@ -13,7 +13,7 @@ func init() { } // EmbedImages is a "main" in the app.bsky.embed.images schema. // RECORDTYPE: EmbedImages type EmbedImages struct { - LexiconTypeID string `json:"$type,const=app.bsky.embed.images#main" cborgen:"$type,const=app.bsky.embed.images#main"` + LexiconTypeID string `json:"$type,const=app.bsky.embed.images" cborgen:"$type,const=app.bsky.embed.images"` Images []*EmbedImages_Image `json:"images" cborgen:"images"` } diff --git a/api/bsky/embedrecord.go b/api/bsky/embedrecord.go index baeb812d9..269368837 100644 --- a/api/bsky/embedrecord.go +++ b/api/bsky/embedrecord.go @@ -17,7 +17,7 @@ func init() { } // EmbedRecord is a "main" in the app.bsky.embed.record schema. // RECORDTYPE: EmbedRecord type EmbedRecord struct { - LexiconTypeID string `json:"$type,const=app.bsky.embed.record#main" cborgen:"$type,const=app.bsky.embed.record#main"` + LexiconTypeID string `json:"$type,const=app.bsky.embed.record" cborgen:"$type,const=app.bsky.embed.record"` Record *comatprototypes.RepoStrongRef `json:"record" cborgen:"record"` } diff --git a/api/bsky/embedrecordWithMedia.go b/api/bsky/embedrecordWithMedia.go index 414941567..9d3e0b11f 100644 --- a/api/bsky/embedrecordWithMedia.go +++ b/api/bsky/embedrecordWithMedia.go @@ -19,7 +19,7 @@ func init() { } // EmbedRecordWithMedia is a "main" in the app.bsky.embed.recordWithMedia schema. // RECORDTYPE: EmbedRecordWithMedia type EmbedRecordWithMedia struct { - LexiconTypeID string `json:"$type,const=app.bsky.embed.recordWithMedia#main" cborgen:"$type,const=app.bsky.embed.recordWithMedia#main"` + LexiconTypeID string `json:"$type,const=app.bsky.embed.recordWithMedia" cborgen:"$type,const=app.bsky.embed.recordWithMedia"` Media *EmbedRecordWithMedia_Media `json:"media" cborgen:"media"` Record *EmbedRecord `json:"record" cborgen:"record"` } diff --git a/api/bsky/feedgetActorLikes.go b/api/bsky/feedgetActorLikes.go new file mode 100644 index 000000000..e7bae6265 --- /dev/null +++ b/api/bsky/feedgetActorLikes.go @@ -0,0 +1,33 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package bsky + +// schema: app.bsky.feed.getActorLikes + +import ( + "context" + + "github.com/bluesky-social/indigo/xrpc" +) + +// FeedGetActorLikes_Output is the output of a app.bsky.feed.getActorLikes call. +type FeedGetActorLikes_Output struct { + Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"` + Feed []*FeedDefs_FeedViewPost `json:"feed" cborgen:"feed"` +} + +// FeedGetActorLikes calls the XRPC method "app.bsky.feed.getActorLikes". +func FeedGetActorLikes(ctx context.Context, c *xrpc.Client, actor string, cursor string, limit int64) (*FeedGetActorLikes_Output, error) { + var out FeedGetActorLikes_Output + + params := map[string]interface{}{ + "actor": actor, + "cursor": cursor, + "limit": limit, + } + if err := c.Do(ctx, xrpc.Query, "", "app.bsky.feed.getActorLikes", params, nil, &out); err != nil { + return nil, err + } + + return &out, nil +} diff --git a/api/bsky/notificationregisterPush.go b/api/bsky/notificationregisterPush.go new file mode 100644 index 000000000..15dc5da33 --- /dev/null +++ b/api/bsky/notificationregisterPush.go @@ -0,0 +1,28 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package bsky + +// schema: app.bsky.notification.registerPush + +import ( + "context" + + "github.com/bluesky-social/indigo/xrpc" +) + +// NotificationRegisterPush_Input is the input argument to a app.bsky.notification.registerPush call. +type NotificationRegisterPush_Input struct { + AppId string `json:"appId" cborgen:"appId"` + Platform string `json:"platform" cborgen:"platform"` + ServiceDid string `json:"serviceDid" cborgen:"serviceDid"` + Token string `json:"token" cborgen:"token"` +} + +// NotificationRegisterPush calls the XRPC method "app.bsky.notification.registerPush". +func NotificationRegisterPush(ctx context.Context, c *xrpc.Client, input *NotificationRegisterPush_Input) error { + if err := c.Do(ctx, xrpc.Procedure, "application/json", "app.bsky.notification.registerPush", nil, input, nil); err != nil { + return err + } + + return nil +} diff --git a/api/extra.go b/api/extra.go index a4afa5626..db1013554 100644 --- a/api/extra.go +++ b/api/extra.go @@ -14,6 +14,7 @@ import ( "github.com/bluesky-social/indigo/did" "github.com/bluesky-social/indigo/xrpc" + arc "github.com/hashicorp/golang-lru/arc/v2" logging "github.com/ipfs/go-log" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" otel "go.opentelemetry.io/otel" @@ -75,8 +76,26 @@ type HandleResolver interface { ResolveHandleToDid(ctx context.Context, handle string) (string, error) } +type failCacheItem struct { + err error + count int + expiresAt time.Time +} + type ProdHandleResolver struct { - ReqMod func(*http.Request, string) error + ReqMod func(*http.Request, string) error + FailCache *arc.ARCCache[string, *failCacheItem] +} + +func NewProdHandleResolver(failureCacheSize int) (*ProdHandleResolver, error) { + failureCache, err := arc.NewARC[string, *failCacheItem](failureCacheSize) + if err != nil { + return nil, err + } + + return &ProdHandleResolver{ + FailCache: failureCache, + }, nil } func (dr *ProdHandleResolver) ResolveHandleToDid(ctx context.Context, handle string) (string, error) { @@ -86,6 +105,18 @@ func (dr *ProdHandleResolver) ResolveHandleToDid(ctx context.Context, handle str ctx, span := otel.Tracer("resolver").Start(ctx, "ResolveHandleToDid") defer span.End() + var cachedFailureCount int + + if dr.FailCache != nil { + if item, ok := dr.FailCache.Get(handle); ok { + cachedFailureCount = item.count + if item.expiresAt.After(time.Now()) { + return "", item.err + } + dr.FailCache.Remove(handle) + } + } + var wkres, dnsres string var wkerr, dnserr error @@ -117,7 +148,28 @@ func (dr *ProdHandleResolver) ResolveHandleToDid(ctx context.Context, handle str return wkres, nil } - return "", errors.Join(fmt.Errorf("no did record found for handle %q", handle), dnserr, wkerr) + err := errors.Join(fmt.Errorf("no did record found for handle %q", handle), dnserr, wkerr) + + if dr.FailCache != nil { + cachedFailureCount++ + expireAt := time.Now().Add(time.Millisecond * 100) + if cachedFailureCount > 1 { + // exponential backoff + expireAt = time.Now().Add(time.Millisecond * 100 * time.Duration(cachedFailureCount*cachedFailureCount)) + // Clamp to one hour + if expireAt.After(time.Now().Add(time.Hour)) { + expireAt = time.Now().Add(time.Hour) + } + } + + dr.FailCache.Add(handle, &failCacheItem{ + err: err, + expiresAt: expireAt, + count: cachedFailureCount, + }) + } + + return "", err } func (dr *ProdHandleResolver) resolveWellKnown(ctx context.Context, handle string) (string, error) { @@ -164,7 +216,6 @@ func (dr *ProdHandleResolver) resolveWellKnown(ctx context.Context, handle strin } func (dr *ProdHandleResolver) resolveDNS(ctx context.Context, handle string) (string, error) { - res, err := net.LookupTXT("_atproto." + handle) if err != nil { return "", fmt.Errorf("handle lookup failed: %w", err) diff --git a/bgs/bgs.go b/bgs/bgs.go index 4c99d82e4..4f0edea3f 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -100,8 +100,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm events: evtman, didr: didr, blobs: blobs, - - ssl: ssl, + ssl: ssl, consumersLk: sync.RWMutex{}, consumers: make(map[uint64]*SocketConsumer), @@ -275,7 +274,6 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { e.GET("/xrpc/com.atproto.sync.subscribeRepos", bgs.EventsHandler) e.GET("/xrpc/com.atproto.sync.getCheckout", bgs.HandleComAtprotoSyncGetCheckout) - e.GET("/xrpc/com.atproto.sync.getCommitPath", bgs.HandleComAtprotoSyncGetCommitPath) e.GET("/xrpc/com.atproto.sync.getHead", bgs.HandleComAtprotoSyncGetHead) e.GET("/xrpc/com.atproto.sync.getRecord", bgs.HandleComAtprotoSyncGetRecord) e.GET("/xrpc/com.atproto.sync.getRepo", bgs.HandleComAtprotoSyncGetRepo) @@ -755,7 +753,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) } - if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks, evt.Ops); err != nil { + if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil { log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String()) if errors.Is(err, carstore.ErrRepoBaseMismatch) { @@ -769,15 +767,6 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) } - if errors.Is(err, carstore.ErrRepoFork) { - log.Errorw("detected repo fork", "from", stringLink(evt.Prev), "host", host.Host, "repo", u.Did) - - span.SetAttributes(attribute.Bool("catchup_queue", true)) - span.SetAttributes(attribute.Bool("fork", true)) - - return fmt.Errorf("cannot process repo fork") - } - return fmt.Errorf("handle user event failed: %w", err) } diff --git a/bgs/handlers.go b/bgs/handlers.go index 38b7b7444..68b3e27dc 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -12,11 +12,10 @@ import ( comatprototypes "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/xrpc" - "github.com/ipfs/go-cid" "github.com/labstack/echo/v4" ) -func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context, commit string, did string) (io.Reader, error) { +func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context, did string) (io.Reader, error) { /* u, err := s.Index.LookupUserByDid(ctx, did) if err != nil { @@ -63,35 +62,16 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri return nil, fmt.Errorf("nyi") } -func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, earliest string, latest string) (io.Reader, error) { +func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) { u, err := s.Index.LookupUserByDid(ctx, did) if err != nil { return nil, err } - var earlyCid, lateCid cid.Cid - if earliest != "" { - c, err := cid.Decode(earliest) - if err != nil { - return nil, err - } - - earlyCid = c - } - - if latest != "" { - c, err := cid.Decode(latest) - if err != nil { - return nil, err - } - - lateCid = c - } - // TODO: stream the response buf := new(bytes.Buffer) - if err := s.repoman.ReadRepo(ctx, u.Uid, earlyCid, lateCid, buf); err != nil { - return nil, err + if err := s.repoman.ReadRepo(ctx, u.Uid, since, buf); err != nil { + return nil, fmt.Errorf("failed to read repo: %w", err) } return buf, nil @@ -101,7 +81,8 @@ func (s *BGS) handleComAtprotoSyncGetBlocks(ctx context.Context, cids []string, return nil, fmt.Errorf("NYI") } -func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, host string) error { +func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatprototypes.SyncRequestCrawl_Input) error { + host := body.Hostname if host == "" { return fmt.Errorf("must pass valid hostname") } @@ -151,7 +132,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, host string) return s.slurper.SubscribeToPds(ctx, norm, true) } -func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, hostname string) error { +func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error { // TODO: return nil } @@ -169,10 +150,14 @@ func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context, cid string, did s return bytes.NewReader(b), nil } -func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncListBlobs_Output, error) { +func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, cursor string, did string, limit int, since string) (*comatprototypes.SyncListBlobs_Output, error) { return nil, fmt.Errorf("NYI") } func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) { return nil, fmt.Errorf("NYI") } + +func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) { + return nil, fmt.Errorf("NYI") +} diff --git a/bgs/stubs.go b/bgs/stubs.go index 376a3666d..bbb8777bf 100644 --- a/bgs/stubs.go +++ b/bgs/stubs.go @@ -1,7 +1,6 @@ package bgs import ( - "fmt" "io" "strconv" @@ -18,14 +17,14 @@ func (s *BGS) RegisterHandlersComAtproto(e *echo.Echo) error { e.GET("/xrpc/com.atproto.sync.getBlob", s.HandleComAtprotoSyncGetBlob) e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks) e.GET("/xrpc/com.atproto.sync.getCheckout", s.HandleComAtprotoSyncGetCheckout) - e.GET("/xrpc/com.atproto.sync.getCommitPath", s.HandleComAtprotoSyncGetCommitPath) e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetHead) + e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit) e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord) e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo) e.GET("/xrpc/com.atproto.sync.listBlobs", s.HandleComAtprotoSyncListBlobs) e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) - e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate) - e.GET("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) + e.POST("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate) + e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) return nil } @@ -63,42 +62,39 @@ func (s *BGS) HandleComAtprotoSyncGetBlocks(c echo.Context) error { func (s *BGS) HandleComAtprotoSyncGetCheckout(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCheckout") defer span.End() - commit := c.QueryParam("commit") did := c.QueryParam("did") var out io.Reader var handleErr error - // func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context,commit string,did string) (io.Reader, error) - out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, commit, did) + // func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context,did string) (io.Reader, error) + out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, did) if handleErr != nil { return handleErr } return c.Stream(200, "application/vnd.ipld.car", out) } -func (s *BGS) HandleComAtprotoSyncGetCommitPath(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCommitPath") +func (s *BGS) HandleComAtprotoSyncGetHead(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead") defer span.End() did := c.QueryParam("did") - earliest := c.QueryParam("earliest") - latest := c.QueryParam("latest") - var out *comatprototypes.SyncGetCommitPath_Output + var out *comatprototypes.SyncGetHead_Output var handleErr error - // func (s *BGS) handleComAtprotoSyncGetCommitPath(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncGetCommitPath_Output, error) - out, handleErr = s.handleComAtprotoSyncGetCommitPath(ctx, did, earliest, latest) + // func (s *BGS) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error) + out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did) if handleErr != nil { return handleErr } return c.JSON(200, out) } -func (s *BGS) HandleComAtprotoSyncGetHead(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead") +func (s *BGS) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit") defer span.End() did := c.QueryParam("did") - var out *comatprototypes.SyncGetHead_Output + var out *comatprototypes.SyncGetLatestCommit_Output var handleErr error - // func (s *BGS) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error) - out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did) + // func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context,did string) (*comatprototypes.SyncGetLatestCommit_Output, error) + out, handleErr = s.handleComAtprotoSyncGetLatestCommit(ctx, did) if handleErr != nil { return handleErr } @@ -126,12 +122,11 @@ func (s *BGS) HandleComAtprotoSyncGetRepo(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRepo") defer span.End() did := c.QueryParam("did") - earliest := c.QueryParam("earliest") - latest := c.QueryParam("latest") + since := c.QueryParam("since") var out io.Reader var handleErr error - // func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context,did string,earliest string,latest string) (io.Reader, error) - out, handleErr = s.handleComAtprotoSyncGetRepo(ctx, did, earliest, latest) + // func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context,did string,since string) (io.Reader, error) + out, handleErr = s.handleComAtprotoSyncGetRepo(ctx, did, since) if handleErr != nil { return handleErr } @@ -141,13 +136,24 @@ func (s *BGS) HandleComAtprotoSyncGetRepo(c echo.Context) error { func (s *BGS) HandleComAtprotoSyncListBlobs(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListBlobs") defer span.End() + cursor := c.QueryParam("cursor") did := c.QueryParam("did") - earliest := c.QueryParam("earliest") - latest := c.QueryParam("latest") + + var limit int + if p := c.QueryParam("limit"); p != "" { + var err error + limit, err = strconv.Atoi(p) + if err != nil { + return err + } + } else { + limit = 500 + } + since := c.QueryParam("since") var out *comatprototypes.SyncListBlobs_Output var handleErr error - // func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncListBlobs_Output, error) - out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, did, earliest, latest) + // func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context,cursor string,did string,limit int,since string) (*comatprototypes.SyncListBlobs_Output, error) + out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, cursor, did, limit, since) if handleErr != nil { return handleErr } @@ -182,10 +188,14 @@ func (s *BGS) HandleComAtprotoSyncListRepos(c echo.Context) error { func (s *BGS) HandleComAtprotoSyncNotifyOfUpdate(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncNotifyOfUpdate") defer span.End() - hostname := c.QueryParam("hostname") + + var body comatprototypes.SyncNotifyOfUpdate_Input + if err := c.Bind(&body); err != nil { + return err + } var handleErr error - // func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context,hostname string) error - handleErr = s.handleComAtprotoSyncNotifyOfUpdate(ctx, hostname) + // func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context,body *comatprototypes.SyncNotifyOfUpdate_Input) error + handleErr = s.handleComAtprotoSyncNotifyOfUpdate(ctx, &body) if handleErr != nil { return handleErr } @@ -196,23 +206,13 @@ func (s *BGS) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncRequestCrawl") defer span.End() - var hostname string - switch c.Request().Method { - case "GET": - hostname = c.QueryParam("hostname") - case "POST": - var m map[string]string - if err := c.Bind(&m); err != nil { - return err - } - - hostname = m["hostname"] - default: - return fmt.Errorf("invalid method for handler") + var body comatprototypes.SyncRequestCrawl_Input + if err := c.Bind(&body); err != nil { + return err } var handleErr error - // func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context,hostname string) error - handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, hostname) + // func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatprototypes.SyncRequestCrawl_Input) error + handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, &body) if handleErr != nil { return handleErr } diff --git a/carstore/bs.go b/carstore/bs.go index 1b9bdb9a8..ebbb4d621 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -74,6 +74,7 @@ type CarShard struct { Path string Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"` Rebase bool + Rev string } type blockRef struct { @@ -243,6 +244,7 @@ type DeltaSession struct { rmcids map[cid.Cid]bool base blockstore.Blockstore user models.Uid + baseCid cid.Cid seq int readonly bool cs *CarStore @@ -292,9 +294,7 @@ func (cs *CarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShar var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head") -var ErrRepoFork = fmt.Errorf("repo fork detected") - -func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, prev *cid.Cid) (*DeltaSession, error) { +func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") defer span.End() @@ -305,19 +305,8 @@ func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, prev * return nil, err } - if prev != nil { - if lastShard.Root.CID != *prev { - fork, err := cs.checkFork(ctx, user, *prev) - if err != nil { - return nil, fmt.Errorf("failed to check carstore base mismatch for fork condition: %w", err) - } - - if fork { - return nil, fmt.Errorf("fork at %s: %w", prev.String(), ErrRepoFork) - } - - return nil, fmt.Errorf("mismatch: %s != %s: %w", lastShard.Root.CID, prev.String(), ErrRepoBaseMismatch) - } + if since != nil && *since != lastShard.Rev { + return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) } return &DeltaSession{ @@ -329,9 +318,10 @@ func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, prev * prefetch: true, cache: make(map[cid.Cid]blockformat.Block), }, - user: user, - cs: cs, - seq: lastShard.Seq + 1, + user: user, + baseCid: lastShard.Root.CID, + cs: cs, + seq: lastShard.Seq + 1, }, nil } @@ -349,38 +339,25 @@ func (cs *CarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { }, nil } -func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, earlyCid, lateCid cid.Cid, incremental bool, w io.Writer) error { +func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") defer span.End() - var lateSeq, earlySeq int - - if earlyCid.Defined() { + var earlySeq int + if sinceRev != "" { var untilShard CarShard - if err := cs.meta.First(&untilShard, "root = ? AND usr = ?", models.DbCID{earlyCid}, user).Error; err != nil { + if err := cs.meta.Where("rev >= ? AND usr = ?", sinceRev, user).Order("rev").First(&untilShard).Error; err != nil { return fmt.Errorf("finding early shard: %w", err) } earlySeq = untilShard.Seq } - if lateCid.Defined() { - var fromShard CarShard - if err := cs.meta.First(&fromShard, "root = ? AND usr = ?", models.DbCID{lateCid}, user).Error; err != nil { - return fmt.Errorf("finding late shard: %w", err) - } - lateSeq = fromShard.Seq - } - - q := cs.meta.Order("seq desc").Where("usr = ? AND seq > ?", user, earlySeq) - if lateCid.Defined() { - q = q.Where("seq <= ?", lateSeq) - } var shards []CarShard - if err := q.Find(&shards).Error; err != nil { + if err := cs.meta.Order("seq desc").Where("usr = ? AND seq >= ?", user, earlySeq).Find(&shards).Error; err != nil { return err } - if !incremental && earlyCid.Defined() { + if !incremental && earlySeq > 0 { // have to do it the ugly way return fmt.Errorf("nyi") } @@ -463,6 +440,10 @@ func (cs *CarStore) writeBlockFromShard(ctx context.Context, sh *CarShard, w io. var _ blockstore.Blockstore = (*DeltaSession)(nil) +func (ds *DeltaSession) BaseCid() cid.Cid { + return ds.baseCid +} + func (ds *DeltaSession) Put(ctx context.Context, b blockformat.Block) error { if ds.readonly { return fmt.Errorf("cannot write to readonly deltaSession") @@ -563,8 +544,8 @@ func (cs *CarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { // CloseWithRoot writes all new blocks in a car file to the writer with the // given cid as the 'root' -func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid) ([]byte, error) { - return ds.closeWithRoot(ctx, root, false) +func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error) { + return ds.closeWithRoot(ctx, root, rev, false) } func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { @@ -585,7 +566,7 @@ func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { return hnw, nil } -func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rebase bool) ([]byte, error) { +func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rev string, rebase bool) ([]byte, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot") defer span.End() @@ -640,6 +621,7 @@ func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rebase Seq: ds.seq, Path: path, Usr: ds.user, + Rev: rev, } if err := ds.putShard(ctx, &shard, brefs); err != nil { @@ -733,8 +715,8 @@ func createInBatches(ctx context.Context, tx *gorm.DB, data []map[string]any, ba return nil } -func (ds *DeltaSession) CloseAsRebase(ctx context.Context, root cid.Cid) error { - _, err := ds.closeWithRoot(ctx, root, true) +func (ds *DeltaSession) CloseAsRebase(ctx context.Context, root cid.Cid, rev string) error { + _, err := ds.closeWithRoot(ctx, root, rev, true) if err != nil { return err } @@ -855,7 +837,7 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n return dropset, nil } -func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, prev *cid.Cid, carslice []byte) (cid.Cid, *DeltaSession, error) { +func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") defer span.End() @@ -868,9 +850,9 @@ func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, prev *cid.C return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) } - ds, err := cs.NewDeltaSession(ctx, uid, prev) + ds, err := cs.NewDeltaSession(ctx, uid, since) if err != nil { - return cid.Undef, nil, err + return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) } var cids []cid.Cid @@ -890,14 +872,9 @@ func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, prev *cid.C } } - base := cid.Undef - if prev != nil { - base = *prev - } - - rmcids, err := BlockDiff(ctx, ds, base, cids) + rmcids, err := BlockDiff(ctx, ds, ds.baseCid, cids) if err != nil { - return cid.Undef, nil, err + return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s): %w", ds.baseCid, err) } ds.rmcids = rmcids @@ -917,6 +894,18 @@ func (cs *CarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.C return lastShard.Root.CID, nil } +func (cs *CarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { + lastShard, err := cs.getLastShard(ctx, user) + if err != nil { + return "", err + } + if lastShard.ID == 0 { + return "", nil + } + + return lastShard.Rev, nil +} + type UserStat struct { Seq int Root string diff --git a/carstore/repo_test.go b/carstore/repo_test.go index 2eb4c5b41..6b5629a12 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -83,18 +83,18 @@ func TestBasicOperation(t *testing.T) { t.Fatal(err) } - ncid, err := setupRepo(ctx, ds) + ncid, rev, err := setupRepo(ctx, ds) if err != nil { t.Fatal(err) } - if _, err := ds.CloseWithRoot(ctx, ncid); err != nil { + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { t.Fatal(err) } head := ncid for i := 0; i < 10; i++ { - ds, err := cs.NewDeltaSession(ctx, 1, &head) + ds, err := cs.NewDeltaSession(ctx, 1, &rev) if err != nil { t.Fatal(err) } @@ -111,12 +111,14 @@ func TestBasicOperation(t *testing.T) { } kmgr := &util.FakeKeyManager{} - nroot, err := rr.Commit(ctx, kmgr.SignForUser) + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) if err != nil { t.Fatal(err) } - if _, err := ds.CloseWithRoot(ctx, nroot); err != nil { + rev = nrev + + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { t.Fatal(err) } @@ -124,7 +126,7 @@ func TestBasicOperation(t *testing.T) { } buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, cid.Undef, cid.Undef, true, buf); err != nil { + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) } @@ -132,22 +134,22 @@ func TestBasicOperation(t *testing.T) { } -func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, error) { +func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, string, error) { nr := repo.NewRepo(ctx, "did:foo", bs) if _, _, err := nr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ Text: fmt.Sprintf("hey look its a tweet %s", time.Now()), }); err != nil { - return cid.Undef, err + return cid.Undef, "", err } kmgr := &util.FakeKeyManager{} - ncid, err := nr.Commit(ctx, kmgr.SignForUser) + ncid, rev, err := nr.Commit(ctx, kmgr.SignForUser) if err != nil { - return cid.Undef, fmt.Errorf("commit failed: %w", err) + return cid.Undef, "", fmt.Errorf("commit failed: %w", err) } - return ncid, nil + return ncid, rev, nil } func BenchmarkRepoWritesCarstore(b *testing.B) { @@ -159,24 +161,24 @@ func BenchmarkRepoWritesCarstore(b *testing.B) { } defer cleanup() - ds, err := cs.NewDeltaSession(ctx, 1, &cid.Undef) + ds, err := cs.NewDeltaSession(ctx, 1, nil) if err != nil { b.Fatal(err) } - ncid, err := setupRepo(ctx, ds) + ncid, rev, err := setupRepo(ctx, ds) if err != nil { b.Fatal(err) } - if _, err := ds.CloseWithRoot(ctx, ncid); err != nil { + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { b.Fatal(err) } head := ncid b.ResetTimer() for i := 0; i < b.N; i++ { - ds, err := cs.NewDeltaSession(ctx, 1, &head) + ds, err := cs.NewDeltaSession(ctx, 1, &rev) if err != nil { b.Fatal(err) } @@ -193,12 +195,14 @@ func BenchmarkRepoWritesCarstore(b *testing.B) { } kmgr := &util.FakeKeyManager{} - nroot, err := rr.Commit(ctx, kmgr.SignForUser) + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) if err != nil { b.Fatal(err) } - if _, err := ds.CloseWithRoot(ctx, nroot); err != nil { + rev = nrev + + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { b.Fatal(err) } @@ -215,7 +219,7 @@ func BenchmarkRepoWritesFlatfs(b *testing.B) { } defer cleanup() - ncid, err := setupRepo(ctx, bs) + ncid, _, err := setupRepo(ctx, bs) if err != nil { b.Fatal(err) } @@ -236,7 +240,7 @@ func BenchmarkRepoWritesFlatfs(b *testing.B) { } kmgr := &util.FakeKeyManager{} - nroot, err := rr.Commit(ctx, kmgr.SignForUser) + nroot, _, err := rr.Commit(ctx, kmgr.SignForUser) if err != nil { b.Fatal(err) } @@ -253,7 +257,7 @@ func BenchmarkRepoWritesSqlite(b *testing.B) { b.Fatal(err) } - ncid, err := setupRepo(ctx, bs) + ncid, _, err := setupRepo(ctx, bs) if err != nil { b.Fatal(err) } @@ -274,7 +278,7 @@ func BenchmarkRepoWritesSqlite(b *testing.B) { } kmgr := &util.FakeKeyManager{} - nroot, err := rr.Commit(ctx, kmgr.SignForUser) + nroot, _, err := rr.Commit(ctx, kmgr.SignForUser) if err != nil { b.Fatal(err) } diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 7ed47bca8..f38f98dc3 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "path/filepath" + "strings" "syscall" "time" @@ -24,6 +25,7 @@ import ( "github.com/bluesky-social/indigo/xrpc" _ "go.uber.org/automaxprocs" + "net/http" _ "net/http/pprof" _ "github.com/joho/godotenv/autoload" @@ -295,7 +297,20 @@ func Bigsky(cctx *cli.Context) error { blobstore = &blobs.DiskBlobStore{bsdir} } - var hr api.HandleResolver = &api.ProdHandleResolver{} + prodHR, err := api.NewProdHandleResolver(100_000) + if err != nil { + return fmt.Errorf("failed to set up handle resolver: %w", err) + } + if rlskip != "" { + prodHR.ReqMod = func(req *http.Request, host string) error { + if strings.HasSuffix(host, ".bsky.social") { + req.Header.Set("x-ratelimit-bypass", rlskip) + } + return nil + } + } + + var hr api.HandleResolver = prodHR if cctx.StringSlice("handle-resolver-hosts") != nil { hr = &api.TestHandleResolver{ TrialHosts: cctx.StringSlice("handle-resolver-hosts"), diff --git a/cmd/gosky/admin.go b/cmd/gosky/admin.go index 799814d53..2c7d30605 100644 --- a/cmd/gosky/admin.go +++ b/cmd/gosky/admin.go @@ -459,9 +459,19 @@ var disableInvitesCmd = &cli.Command{ handle = resp } - return atproto.AdminDisableAccountInvites(ctx, xrpcc, &atproto.AdminDisableAccountInvites_Input{ + if err := atproto.AdminDisableAccountInvites(ctx, xrpcc, &atproto.AdminDisableAccountInvites_Input{ Account: handle, - }) + }); err != nil { + return err + } + + if err := atproto.AdminDisableInviteCodes(ctx, xrpcc, &atproto.AdminDisableInviteCodes_Input{ + Accounts: []string{handle}, + }); err != nil { + return err + } + + return nil }, } diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index c843a7a63..dfea3c01e 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -347,7 +347,7 @@ var syncGetRepoCmd = &cli.Command{ ctx := context.TODO() - repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "", "") + repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "") if err != nil { return err } @@ -647,7 +647,7 @@ var listAllPostsCmd = &cli.Command{ arg = xrpcc.Auth.Did } - rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, arg, "", "") + rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, arg, "") if err != nil { return err } @@ -1168,7 +1168,7 @@ var getRecordCmd = &cli.Command{ return err } - rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, rfi, "", "") + rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, rfi, "") if err != nil { return err } @@ -1287,13 +1287,20 @@ var createInviteCmd = &cli.Command{ } } - _, err = comatproto.ServerCreateInviteCodes(context.TODO(), xrpcc, &comatproto.ServerCreateInviteCodes_Input{ - UseCount: int64(count), - ForAccounts: dids, - CodeCount: int64(num), - }) - if err != nil { - return err + for n := 0; n < len(dids); n += 500 { + slice := dids + if len(slice) > 500 { + slice = slice[:500] + } + + _, err = comatproto.ServerCreateInviteCodes(context.TODO(), xrpcc, &comatproto.ServerCreateInviteCodes_Input{ + UseCount: int64(count), + ForAccounts: slice, + CodeCount: int64(num), + }) + if err != nil { + return err + } } return nil diff --git a/cmd/supercollider/main.go b/cmd/supercollider/main.go index f0e222f23..295ef0a4e 100644 --- a/cmd/supercollider/main.go +++ b/cmd/supercollider/main.go @@ -618,7 +618,6 @@ func (s *Server) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) { Time: time.Now().Format(util.ISO8601), Ops: outops, TooBig: toobig, - Rebase: evt.Rebase, }, PrivUid: evt.User, }); err != nil { diff --git a/events/dbpersist.go b/events/dbpersist.go index 4654a7814..f94833c5d 100644 --- a/events/dbpersist.go +++ b/events/dbpersist.go @@ -67,6 +67,8 @@ type DbPersistence struct { type RepoEventRecord struct { Seq uint `gorm:"primarykey"` + Rev string + Since *string Commit *models.DbCID Prev *models.DbCID NewHandle *string // NewHandle is only set if this is a handle change event @@ -276,6 +278,8 @@ func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatprot Blobs: blobs, Time: t, Rebase: evt.Rebase, + Rev: evt.Rev, + Since: evt.Since, } opsb, err := json.Marshal(evt.Ops) @@ -493,6 +497,8 @@ func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) Blobs: blobCIDs, Rebase: rer.Rebase, Ops: ops, + Rev: rer.Rev, + Since: rer.Since, } cs, err := p.readCarSlice(ctx, rer) @@ -511,13 +517,8 @@ func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) { - var early cid.Cid - if rer.Prev != nil && !rer.Rebase { - early = rer.Prev.CID - } - buf := new(bytes.Buffer) - if err := p.cs.ReadUserCar(ctx, rer.Repo, early, rer.Commit.CID, true, buf); err != nil { + if err := p.cs.ReadUserCar(ctx, rer.Repo, rer.Rev, true, buf); err != nil { return nil, err } diff --git a/go.mod b/go.mod index 429b57255..2630f8a7d 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-retryablehttp v0.7.2 github.com/hashicorp/golang-lru v0.5.4 + github.com/hashicorp/golang-lru/arc/v2 v2.0.6 github.com/hashicorp/golang-lru/v2 v2.0.6 github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2 github.com/ipfs/go-block-format v0.1.2 diff --git a/go.sum b/go.sum index ce5ec3c9d..239061263 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/arc/v2 v2.0.6 h1:4NU7uP5vSoK6TbaMj3NtY478TTAWLso/vL1gpNrInHg= +github.com/hashicorp/golang-lru/arc/v2 v2.0.6/go.mod h1:cfdDIX05DWvYV6/shsxDfa/OVcRieOt+q4FnM8x+Xno= github.com/hashicorp/golang-lru/v2 v2.0.6 h1:3xi/Cafd1NaoEnS/yDssIiuVeDVywU0QdFGl3aQaQHM= github.com/hashicorp/golang-lru/v2 v2.0.6/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= diff --git a/indexer/indexer.go b/indexer/indexer.go index da52cb70a..25bebb379 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -136,23 +136,18 @@ func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) toobig = true } - if evt.Rebase { - if err := ix.events.HandleRebase(ctx, evt.User); err != nil { - log.Errorf("failed to handle rebase in events manager: %s", err) - } - } - log.Debugw("Sending event", "did", did) if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{ RepoCommit: &comatproto.SyncSubscribeRepos_Commit{ Repo: did, Prev: (*lexutil.LexLink)(evt.OldRoot), Blocks: slice, + Rev: evt.Rev, + Since: evt.Since, Commit: lexutil.LexLink(evt.NewRoot), Time: time.Now().Format(util.ISO8601), Ops: outops, TooBig: toobig, - Rebase: evt.Rebase, }, PrivUid: evt.User, }); err != nil { @@ -165,16 +160,16 @@ func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) func (ix *Indexer) handleRepoOp(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { switch op.Kind { case repomgr.EvtKindCreateRecord: - if err := ix.crawlRecordReferences(ctx, op); err != nil { - return err - } - if ix.doAggregations { _, err := ix.handleRecordCreate(ctx, evt, op, true) if err != nil { return fmt.Errorf("handle recordCreate: %w", err) } } + if err := ix.crawlRecordReferences(ctx, op); err != nil { + return err + } + case repomgr.EvtKindDeleteRecord: if ix.doAggregations { if err := ix.handleRecordDelete(ctx, evt, op, true); err != nil { @@ -194,6 +189,295 @@ func (ix *Indexer) handleRepoOp(ctx context.Context, evt *repomgr.RepoEvent, op return nil } +func (ix *Indexer) crawlAtUriRef(ctx context.Context, uri string) error { + puri, err := util.ParseAtUri(uri) + if err != nil { + return err + } else { + _, err := ix.GetUserOrMissing(ctx, puri.Did) + if err != nil { + return err + } + } + return nil +} +func (ix *Indexer) crawlRecordReferences(ctx context.Context, op *repomgr.RepoOp) error { + ctx, span := otel.Tracer("indexer").Start(ctx, "crawlRecordReferences") + defer span.End() + + switch rec := op.Record.(type) { + case *bsky.FeedPost: + for _, e := range rec.Entities { + if e.Type == "mention" { + _, err := ix.GetUserOrMissing(ctx, e.Value) + if err != nil { + log.Infow("failed to parse user mention", "ref", e.Value, "err", err) + } + } + } + + if rec.Reply != nil { + if rec.Reply.Parent != nil { + if err := ix.crawlAtUriRef(ctx, rec.Reply.Parent.Uri); err != nil { + log.Infow("failed to crawl reply parent", "cid", op.RecCid, "replyuri", rec.Reply.Parent.Uri, "err", err) + } + } + + if rec.Reply.Root != nil { + if err := ix.crawlAtUriRef(ctx, rec.Reply.Root.Uri); err != nil { + log.Infow("failed to crawl reply root", "cid", op.RecCid, "rooturi", rec.Reply.Root.Uri, "err", err) + } + } + } + + return nil + case *bsky.FeedRepost: + if rec.Subject != nil { + if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil { + log.Infow("failed to crawl repost subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err) + } + } + return nil + case *bsky.FeedLike: + if rec.Subject != nil { + if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil { + log.Infow("failed to crawl vote subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err) + } + } + return nil + case *bsky.GraphFollow: + _, err := ix.GetUserOrMissing(ctx, rec.Subject) + if err != nil { + log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) + } + return nil + case *bsky.GraphBlock: + _, err := ix.GetUserOrMissing(ctx, rec.Subject) + if err != nil { + log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) + } + return nil + case *bsky.ActorProfile: + return nil + default: + log.Warnf("unrecognized record type: %T", op.Record) + return nil + } +} + +func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*models.ActorInfo, error) { + ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing") + defer span.End() + + ai, err := ix.LookupUserByDid(ctx, did) + if err == nil { + return ai, nil + } + + if !isNotFound(err) { + return nil, err + } + + // unknown user... create it and send it off to the crawler + return ix.createMissingUserRecord(ctx, did) +} + +func (ix *Indexer) createMissingUserRecord(ctx context.Context, did string) (*models.ActorInfo, error) { + ctx, span := otel.Tracer("indexer").Start(ctx, "createMissingUserRecord") + defer span.End() + + ai, err := ix.CreateExternalUser(ctx, did) + if err != nil { + return nil, err + } + + if err := ix.addUserToCrawler(ctx, ai); err != nil { + return nil, fmt.Errorf("failed to add unknown user to crawler: %w", err) + } + + return ai, nil +} + +func (ix *Indexer) addUserToCrawler(ctx context.Context, ai *models.ActorInfo) error { + log.Infow("Sending user to crawler: ", "did", ai.Did) + if ix.Crawler == nil { + return nil + } + + return ix.Crawler.Crawl(ctx, ai) +} + +func (ix *Indexer) DidForUser(ctx context.Context, uid models.Uid) (string, error) { + var ai models.ActorInfo + if err := ix.db.First(&ai, "uid = ?", uid).Error; err != nil { + return "", err + } + + return ai.Did, nil +} + +func (ix *Indexer) LookupUser(ctx context.Context, id models.Uid) (*models.ActorInfo, error) { + var ai models.ActorInfo + if err := ix.db.First(&ai, "uid = ?", id).Error; err != nil { + return nil, err + } + + return &ai, nil +} + +func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*models.ActorInfo, error) { + var ai models.ActorInfo + if err := ix.db.Find(&ai, "did = ?", did).Error; err != nil { + return nil, err + } + + if ai.ID == 0 { + return nil, gorm.ErrRecordNotFound + } + + return &ai, nil +} + +func (ix *Indexer) LookupUserByHandle(ctx context.Context, handle string) (*models.ActorInfo, error) { + var ai models.ActorInfo + if err := ix.db.Find(&ai, "handle = ?", handle).Error; err != nil { + return nil, err + } + + if ai.ID == 0 { + return nil, gorm.ErrRecordNotFound + } + + return &ai, nil +} + +func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { + ai := op.ActorInfo + + if err := ix.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "uid"}}, + UpdateAll: true, + }).Create(&models.ActorInfo{ + Uid: evt.User, + Handle: ai.Handle, + Did: ai.Did, + DisplayName: ai.DisplayName, + Type: ai.Type, + PDS: evt.PDS, + }).Error; err != nil { + return fmt.Errorf("initializing new actor info: %w", err) + } + + if err := ix.db.Create(&models.FollowRecord{ + Follower: evt.User, + Target: evt.User, + }).Error; err != nil { + return err + } + + return nil +} + +func isNotFound(err error) bool { + if errors.Is(err, gorm.ErrRecordNotFound) { + return true + } + + return false +} + +// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way? +func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error { + ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo") + defer span.End() + + span.SetAttributes(attribute.Int("catchup", len(job.catchup))) + + ai := job.act + + var pds models.PDS + if err := ix.db.First(&pds, "id = ?", ai.PDS).Error; err != nil { + return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err) + } + + rev, err := ix.repomgr.GetRepoRev(ctx, ai.Uid) + if err != nil && !isNotFound(err) { + return fmt.Errorf("failed to get repo root: %w", err) + } + + if !(job.initScrape || len(job.catchup) == 0) { + first := job.catchup[0] + if first.evt.Since == nil || rev == *first.evt.Since { + for _, j := range job.catchup { + if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, j.evt.Since, j.evt.Rev, j.evt.Blocks, j.evt.Ops); err != nil { + // TODO: if we fail here, we should probably fall back to a repo re-sync + return fmt.Errorf("post rebase catchup failed: %w", err) + } + } + + return nil + } + } + + var host string + if pds.SSL { + host = "https://" + pds.Host + } else { + host = "http://" + pds.Host + } + c := &xrpc.Client{ + Host: host, + } + + ix.ApplyPDSClientSettings(c) + + if rev == "" { + span.SetAttributes(attribute.Bool("full", true)) + } + + limiter := ix.GetLimiter(pds.ID) + if limiter == nil { + limiter = rate.NewLimiter(rate.Limit(pds.CrawlRateLimit), 1) + ix.SetLimiter(pds.ID, limiter) + } + + // Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos + limiter.Wait(ctx) + + log.Infow("SyncGetRepo", "did", ai.Did, "user", ai.Handle, "since", rev) + // TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us + repo, err := comatproto.SyncGetRepo(ctx, c, ai.Did, rev) + if err != nil { + return fmt.Errorf("failed to fetch repo: %w", err) + } + + // this process will send individual indexing events back to the indexer, doing a 'fast forward' of the users entire history + // we probably want alternative ways of doing this for 'very large' or 'very old' repos, but this works for now + if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil { + span.RecordError(err) + return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err) + } + + // TODO: this is currently doing too much work, allowing us to ignore the catchup events we've gotten + // need to do 'just enough' work... + + return nil +} + +func (ix *Indexer) GetPost(ctx context.Context, uri string) (*models.FeedPost, error) { + puri, err := util.ParseAtUri(uri) + if err != nil { + return nil, err + } + + var post models.FeedPost + if err := ix.db.First(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(models.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil { + return nil, err + } + + return &post, nil +} + func (ix *Indexer) handleRecordDelete(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error { log.Infow("record delete event", "collection", op.Collection) @@ -227,9 +511,9 @@ func (ix *Indexer) handleRecordDelete(ctx context.Context, evt *repomgr.RepoEven log.Warn("TODO: remove notifications on delete") /* - if err := ix.notifman.RemoveRepost(ctx, fp.Author, rr.ID, evt.User); err != nil { - return nil, err - } + if err := ix.notifman.RemoveRepost(ctx, fp.Author, rr.ID, evt.User); err != nil { + return nil, err + } */ case "app.bsky.feed.vote": @@ -311,104 +595,28 @@ func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEven Post: fp.ID, Reposter: evt.User, Author: fp.Author, - RecCid: op.RecCid.String(), - Rkey: op.Rkey, - } - if err := ix.db.Create(&rr).Error; err != nil { - return nil, err - } - - if err := ix.notifman.AddRepost(ctx, fp.Author, rr.ID, evt.User); err != nil { - return nil, err - } - - case *bsky.FeedLike: - return nil, ix.handleRecordCreateFeedLike(ctx, rec, evt, op) - case *bsky.GraphFollow: - return out, ix.handleRecordCreateGraphFollow(ctx, rec, evt, op) - case *bsky.ActorProfile: - log.Infof("TODO: got actor profile record creation, need to do something with this") - default: - return nil, fmt.Errorf("unrecognized record type: %T", rec) - } - - return out, nil -} - -func (ix *Indexer) crawlAtUriRef(ctx context.Context, uri string) error { - puri, err := util.ParseAtUri(uri) - if err != nil { - return err - } else { - _, err := ix.GetUserOrMissing(ctx, puri.Did) - if err != nil { - return err - } - } - return nil -} -func (ix *Indexer) crawlRecordReferences(ctx context.Context, op *repomgr.RepoOp) error { - ctx, span := otel.Tracer("indexer").Start(ctx, "crawlRecordReferences") - defer span.End() - - switch rec := op.Record.(type) { - case *bsky.FeedPost: - for _, e := range rec.Entities { - if e.Type == "mention" { - _, err := ix.GetUserOrMissing(ctx, e.Value) - if err != nil { - log.Infow("failed to parse user mention", "ref", e.Value, "err", err) - } - } - } - - if rec.Reply != nil { - if rec.Reply.Parent != nil { - if err := ix.crawlAtUriRef(ctx, rec.Reply.Parent.Uri); err != nil { - log.Infow("failed to crawl reply parent", "cid", op.RecCid, "replyuri", rec.Reply.Parent.Uri, "err", err) - } - } - - if rec.Reply.Root != nil { - if err := ix.crawlAtUriRef(ctx, rec.Reply.Root.Uri); err != nil { - log.Infow("failed to crawl reply root", "cid", op.RecCid, "rooturi", rec.Reply.Root.Uri, "err", err) - } - } + RecCid: op.RecCid.String(), + Rkey: op.Rkey, + } + if err := ix.db.Create(&rr).Error; err != nil { + return nil, err } - return nil - case *bsky.FeedRepost: - if rec.Subject != nil { - if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil { - log.Infow("failed to crawl repost subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err) - } + if err := ix.notifman.AddRepost(ctx, fp.Author, rr.ID, evt.User); err != nil { + return nil, err } - return nil + case *bsky.FeedLike: - if rec.Subject != nil { - if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil { - log.Infow("failed to crawl vote subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err) - } - } - return nil + return nil, ix.handleRecordCreateFeedLike(ctx, rec, evt, op) case *bsky.GraphFollow: - _, err := ix.GetUserOrMissing(ctx, rec.Subject) - if err != nil { - log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) - } - return nil - case *bsky.GraphBlock: - _, err := ix.GetUserOrMissing(ctx, rec.Subject) - if err != nil { - log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) - } - return nil + return out, ix.handleRecordCreateGraphFollow(ctx, rec, evt, op) case *bsky.ActorProfile: - return nil + log.Infof("TODO: got actor profile record creation, need to do something with this") default: - log.Warnf("unrecognized record type: %T", op.Record) - return nil + return nil, fmt.Errorf("unrecognized record type: %T", rec) } + + return out, nil } func (ix *Indexer) handleRecordCreateFeedLike(ctx context.Context, rec *bsky.FeedLike, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { @@ -658,23 +866,6 @@ func (ix *Indexer) handleRecordCreateFeedPost(ctx context.Context, user models.U return nil } -func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*models.ActorInfo, error) { - ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing") - defer span.End() - - ai, err := ix.LookupUserByDid(ctx, did) - if err == nil { - return ai, nil - } - - if !isNotFound(err) { - return nil, err - } - - // unknown user... create it and send it off to the crawler - return ix.createMissingUserRecord(ctx, did) -} - func (ix *Indexer) createMissingPostRecord(ctx context.Context, puri *util.ParsedUri) (*models.FeedPost, error) { log.Warn("creating missing post record") ai, err := ix.GetUserOrMissing(ctx, puri.Did) @@ -694,75 +885,6 @@ func (ix *Indexer) createMissingPostRecord(ctx context.Context, puri *util.Parse return &fp, nil } -func (ix *Indexer) createMissingUserRecord(ctx context.Context, did string) (*models.ActorInfo, error) { - ctx, span := otel.Tracer("indexer").Start(ctx, "createMissingUserRecord") - defer span.End() - - ai, err := ix.CreateExternalUser(ctx, did) - if err != nil { - return nil, err - } - - if err := ix.addUserToCrawler(ctx, ai); err != nil { - return nil, fmt.Errorf("failed to add unknown user to crawler: %w", err) - } - - return ai, nil -} - -func (ix *Indexer) addUserToCrawler(ctx context.Context, ai *models.ActorInfo) error { - log.Infow("Sending user to crawler: ", "did", ai.Did) - if ix.Crawler == nil { - return nil - } - - return ix.Crawler.Crawl(ctx, ai) -} - -func (ix *Indexer) DidForUser(ctx context.Context, uid models.Uid) (string, error) { - var ai models.ActorInfo - if err := ix.db.First(&ai, "uid = ?", uid).Error; err != nil { - return "", err - } - - return ai.Did, nil -} - -func (ix *Indexer) LookupUser(ctx context.Context, id models.Uid) (*models.ActorInfo, error) { - var ai models.ActorInfo - if err := ix.db.First(&ai, "uid = ?", id).Error; err != nil { - return nil, err - } - - return &ai, nil -} - -func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*models.ActorInfo, error) { - var ai models.ActorInfo - if err := ix.db.Find(&ai, "did = ?", did).Error; err != nil { - return nil, err - } - - if ai.ID == 0 { - return nil, gorm.ErrRecordNotFound - } - - return &ai, nil -} - -func (ix *Indexer) LookupUserByHandle(ctx context.Context, handle string) (*models.ActorInfo, error) { - var ai models.ActorInfo - if err := ix.db.Find(&ai, "handle = ?", handle).Error; err != nil { - return nil, err - } - - if ai.ID == 0 { - return nil, gorm.ErrRecordNotFound - } - - return &ai, nil -} - func (ix *Indexer) addNewPostNotification(ctx context.Context, post *bsky.FeedPost, fp *models.FeedPost, mentions []*models.ActorInfo) error { if post.Reply != nil { replyto, err := ix.GetPost(ctx, post.Reply.Parent.Uri) @@ -788,160 +910,3 @@ func (ix *Indexer) addNewPostNotification(ctx context.Context, post *bsky.FeedPo func (ix *Indexer) addNewVoteNotification(ctx context.Context, postauthor models.Uid, vr *models.VoteRecord) error { return ix.notifman.AddUpVote(ctx, vr.Voter, vr.Post, vr.ID, postauthor) } - -func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { - ai := op.ActorInfo - - if err := ix.db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "uid"}}, - UpdateAll: true, - }).Create(&models.ActorInfo{ - Uid: evt.User, - Handle: ai.Handle, - Did: ai.Did, - DisplayName: ai.DisplayName, - Type: ai.Type, - PDS: evt.PDS, - }).Error; err != nil { - return fmt.Errorf("initializing new actor info: %w", err) - } - - if err := ix.db.Create(&models.FollowRecord{ - Follower: evt.User, - Target: evt.User, - }).Error; err != nil { - return err - } - - return nil -} - -func isNotFound(err error) bool { - if errors.Is(err, gorm.ErrRecordNotFound) { - return true - } - - return false -} - -func (ix *Indexer) GetPost(ctx context.Context, uri string) (*models.FeedPost, error) { - puri, err := util.ParseAtUri(uri) - if err != nil { - return nil, err - } - - var post models.FeedPost - if err := ix.db.First(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(models.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil { - return nil, err - } - - return &post, nil -} - -// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way? -func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error { - ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo") - defer span.End() - - span.SetAttributes(attribute.Int("catchup", len(job.catchup))) - - ai := job.act - - var pds models.PDS - if err := ix.db.First(&pds, "id = ?", ai.PDS).Error; err != nil { - return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err) - } - - curHead, err := ix.repomgr.GetRepoRoot(ctx, ai.Uid) - if err != nil && !isNotFound(err) { - return fmt.Errorf("failed to get repo root: %w", err) - } - - var rebase *comatproto.SyncSubscribeRepos_Commit - var rebaseIx int - for i, j := range job.catchup { - if j.evt.Rebase { - rebase = j.evt - rebaseIx = i - break - } - } - - if rebase != nil { - if err := ix.repomgr.HandleRebase(ctx, ai.PDS, ai.Uid, ai.Did, (*cid.Cid)(rebase.Prev), (cid.Cid)(rebase.Commit), rebase.Blocks); err != nil { - return fmt.Errorf("handling rebase: %w", err) - } - // now process the rest of the catchup events - // these are all events that got received *after* the rebase, but - // before we could start processing it. - // That means these should be the next operations that get cleanly - // applied after the rebase - for _, j := range job.catchup[rebaseIx+1:] { - if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, (*cid.Cid)(j.evt.Prev), j.evt.Blocks, j.evt.Ops); err != nil { - return fmt.Errorf("post rebase catchup failed: %w", err) - } - } - return nil - } - - if !(job.initScrape || len(job.catchup) == 0) { - first := job.catchup[0] - if first.evt.Prev == nil || curHead == (cid.Cid)(*first.evt.Prev) { - for _, j := range job.catchup { - if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, (*cid.Cid)(j.evt.Prev), j.evt.Blocks, j.evt.Ops); err != nil { - // TODO: if we fail here, we should probably fall back to a repo re-sync - return fmt.Errorf("post rebase catchup failed: %w", err) - } - } - - return nil - } - } - - var host string - if pds.SSL { - host = "https://" + pds.Host - } else { - host = "http://" + pds.Host - } - c := &xrpc.Client{ - Host: host, - } - - ix.ApplyPDSClientSettings(c) - - var from string - if curHead.Defined() { - from = curHead.String() - } else { - span.SetAttributes(attribute.Bool("full", true)) - } - - limiter := ix.GetLimiter(pds.ID) - if limiter == nil { - limiter = rate.NewLimiter(rate.Limit(pds.CrawlRateLimit), 1) - ix.SetLimiter(pds.ID, limiter) - } - - // Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos - limiter.Wait(ctx) - - log.Infow("SyncGetRepo", "did", ai.Did, "user", ai.Handle, "from", from) - // TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us - repo, err := comatproto.SyncGetRepo(ctx, c, ai.Did, from, "") - if err != nil { - return fmt.Errorf("failed to fetch repo: %w", err) - } - - // this process will send individual indexing events back to the indexer, doing a 'fast forward' of the users entire history - // we probably want alternative ways of doing this for 'very large' or 'very old' repos, but this works for now - if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), curHead); err != nil { - span.RecordError(err) - return fmt.Errorf("importing fetched repo (curHead: %s): %w", from, err) - } - - // TODO: this is currently doing too much work, allowing us to ignore the catchup events we've gotten - // need to do 'just enough' work... - - return nil -} diff --git a/lex/gen.go b/lex/gen.go index 8b0005fd3..1777a336d 100644 --- a/lex/gen.go +++ b/lex/gen.go @@ -1211,7 +1211,7 @@ func (ts *TypeSchema) writeTypeDefinition(name string, w io.Writer) error { omit = ",omitempty" } cval := ts.id - if ts.defName != "" { + if ts.defName != "" && ts.defName != "main" { cval += "#" + ts.defName } pf("\tLexiconTypeID string `json:\"$type,const=%s%s\" cborgen:\"$type,const=%s%s\"`\n", cval, omit, cval, omit) diff --git a/pds/handlers.go b/pds/handlers.go index cc5451d76..b99eba162 100644 --- a/pds/handlers.go +++ b/pds/handlers.go @@ -565,7 +565,7 @@ func (s *Server) handleComAtprotoSyncUpdateRepo(ctx context.Context, r io.Reader panic("not yet implemented") } -func (s *Server) handleComAtprotoSyncGetCheckout(ctx context.Context, commit string, did string) (io.Reader, error) { +func (s *Server) handleComAtprotoSyncGetCheckout(ctx context.Context, did string) (io.Reader, error) { panic("not yet implemented") } @@ -593,34 +593,14 @@ func (s *Server) handleComAtprotoSyncGetRecord(ctx context.Context, collection s panic("not yet implemented") } -func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context, did string, earliest, latest string) (io.Reader, error) { - var earlyCid cid.Cid - if earliest != "" { - cc, err := cid.Decode(earliest) - if err != nil { - return nil, err - } - - earlyCid = cc - } - - var lateCid cid.Cid - if latest != "" { - cc, err := cid.Decode(latest) - if err != nil { - return nil, err - } - - lateCid = cc - } - +func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) { targetUser, err := s.lookupUser(ctx, did) if err != nil { return nil, err } buf := new(bytes.Buffer) - if err := s.repoman.ReadRepo(ctx, targetUser.ID, earlyCid, lateCid, buf); err != nil { + if err := s.repoman.ReadRepo(ctx, targetUser.ID, since, buf); err != nil { return nil, err } @@ -681,7 +661,7 @@ func (s *Server) handleComAtprotoSyncGetBlob(ctx context.Context, cid string, di panic("nyi") } -func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncListBlobs_Output, error) { +func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context, cursor string, did string, limit int, since string) (*comatprototypes.SyncListBlobs_Output, error) { panic("nyi") } @@ -798,15 +778,6 @@ func (s *Server) handleComAtprotoAdminEnableAccountInvites(ctx context.Context, panic("nyi") } -func (s *Server) handleComAtprotoRepoRebaseRepo(ctx context.Context, body *comatprototypes.RepoRebaseRepo_Input) error { - u, err := s.getUser(ctx) - if err != nil { - return err - } - - return s.repoman.DoRebase(ctx, u.ID) -} - func (s *Server) handleAppBskyFeedDescribeFeedGenerator(ctx context.Context) (*appbskytypes.FeedDescribeFeedGenerator_Output, error) { panic("nyi") } @@ -843,3 +814,19 @@ func (s *Server) handleComAtprotoAdminRebaseRepo(ctx context.Context, body *coma func (s *Server) handleComAtprotoAdminSendEmail(ctx context.Context, body *comatprototypes.AdminSendEmail_Input) (*comatprototypes.AdminSendEmail_Output, error) { panic("nyi") } + +func (s *Server) handleAppBskyFeedGetActorLikes(ctx context.Context, actor string, cursor string, limit int) (*appbskytypes.FeedGetActorLikes_Output, error) { + panic("nyi") +} + +func (s *Server) handleAppBskyNotificationRegisterPush(ctx context.Context, body *appbskytypes.NotificationRegisterPush_Input) error { + panic("nyi") +} + +func (s *Server) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) { + panic("nyi") +} + +func (s *Server) handleComAtprotoTempUpgradeRepoVersion(ctx context.Context, body *comatprototypes.TempUpgradeRepoVersion_Input) error { + panic("nyi") +} diff --git a/pds/server.go b/pds/server.go index 59107fde4..b9f98a09b 100644 --- a/pds/server.go +++ b/pds/server.go @@ -142,7 +142,7 @@ func (s *Server) handleFedEvent(ctx context.Context, host *Peering, env *events. u.ID = subj.Uid } - return s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks, evt.Ops) + return s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops) default: return fmt.Errorf("invalid fed event") } @@ -338,7 +338,7 @@ func (s *Server) RunAPIWithListener(listen net.Listener) error { } e.HTTPErrorHandler = func(err error, ctx echo.Context) { - fmt.Printf("HANDLER ERROR: (%s) %s\n", ctx.Path(), err) + fmt.Printf("PDS HANDLER ERROR: (%s) %s\n", ctx.Path(), err) // TODO: need to properly figure out where http error codes for error // types get decided. This spot is reasonable, but maybe a bit weird. diff --git a/pds/stubs.go b/pds/stubs.go index 0a870be65..a8431f264 100644 --- a/pds/stubs.go +++ b/pds/stubs.go @@ -20,6 +20,7 @@ func (s *Server) RegisterHandlersAppBsky(e *echo.Echo) error { e.GET("/xrpc/app.bsky.actor.searchActorsTypeahead", s.HandleAppBskyActorSearchActorsTypeahead) e.GET("/xrpc/app.bsky.feed.describeFeedGenerator", s.HandleAppBskyFeedDescribeFeedGenerator) e.GET("/xrpc/app.bsky.feed.getActorFeeds", s.HandleAppBskyFeedGetActorFeeds) + e.GET("/xrpc/app.bsky.feed.getActorLikes", s.HandleAppBskyFeedGetActorLikes) e.GET("/xrpc/app.bsky.feed.getAuthorFeed", s.HandleAppBskyFeedGetAuthorFeed) e.GET("/xrpc/app.bsky.feed.getFeed", s.HandleAppBskyFeedGetFeed) e.GET("/xrpc/app.bsky.feed.getFeedGenerator", s.HandleAppBskyFeedGetFeedGenerator) @@ -43,6 +44,7 @@ func (s *Server) RegisterHandlersAppBsky(e *echo.Echo) error { e.POST("/xrpc/app.bsky.graph.unmuteActorList", s.HandleAppBskyGraphUnmuteActorList) e.GET("/xrpc/app.bsky.notification.getUnreadCount", s.HandleAppBskyNotificationGetUnreadCount) e.GET("/xrpc/app.bsky.notification.listNotifications", s.HandleAppBskyNotificationListNotifications) + e.POST("/xrpc/app.bsky.notification.registerPush", s.HandleAppBskyNotificationRegisterPush) e.POST("/xrpc/app.bsky.notification.updateSeen", s.HandleAppBskyNotificationUpdateSeen) e.POST("/xrpc/app.bsky.unspecced.applyLabels", s.HandleAppBskyUnspeccedApplyLabels) e.GET("/xrpc/app.bsky.unspecced.getPopular", s.HandleAppBskyUnspeccedGetPopular) @@ -225,6 +227,32 @@ func (s *Server) HandleAppBskyFeedGetActorFeeds(c echo.Context) error { return c.JSON(200, out) } +func (s *Server) HandleAppBskyFeedGetActorLikes(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleAppBskyFeedGetActorLikes") + defer span.End() + actor := c.QueryParam("actor") + cursor := c.QueryParam("cursor") + + var limit int + if p := c.QueryParam("limit"); p != "" { + var err error + limit, err = strconv.Atoi(p) + if err != nil { + return err + } + } else { + limit = 50 + } + var out *appbskytypes.FeedGetActorLikes_Output + var handleErr error + // func (s *Server) handleAppBskyFeedGetActorLikes(ctx context.Context,actor string,cursor string,limit int) (*appbskytypes.FeedGetActorLikes_Output, error) + out, handleErr = s.handleAppBskyFeedGetActorLikes(ctx, actor, cursor, limit) + if handleErr != nil { + return handleErr + } + return c.JSON(200, out) +} + func (s *Server) HandleAppBskyFeedGetAuthorFeed(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleAppBskyFeedGetAuthorFeed") defer span.End() @@ -751,6 +779,23 @@ func (s *Server) HandleAppBskyNotificationListNotifications(c echo.Context) erro return c.JSON(200, out) } +func (s *Server) HandleAppBskyNotificationRegisterPush(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleAppBskyNotificationRegisterPush") + defer span.End() + + var body appbskytypes.NotificationRegisterPush_Input + if err := c.Bind(&body); err != nil { + return err + } + var handleErr error + // func (s *Server) handleAppBskyNotificationRegisterPush(ctx context.Context,body *appbskytypes.NotificationRegisterPush_Input) error + handleErr = s.handleAppBskyNotificationRegisterPush(ctx, &body) + if handleErr != nil { + return handleErr + } + return nil +} + func (s *Server) HandleAppBskyNotificationUpdateSeen(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleAppBskyNotificationUpdateSeen") defer span.End() @@ -883,7 +928,6 @@ func (s *Server) RegisterHandlersComAtproto(e *echo.Echo) error { e.GET("/xrpc/com.atproto.admin.getModerationReports", s.HandleComAtprotoAdminGetModerationReports) e.GET("/xrpc/com.atproto.admin.getRecord", s.HandleComAtprotoAdminGetRecord) e.GET("/xrpc/com.atproto.admin.getRepo", s.HandleComAtprotoAdminGetRepo) - e.POST("/xrpc/com.atproto.admin.rebaseRepo", s.HandleComAtprotoAdminRebaseRepo) e.POST("/xrpc/com.atproto.admin.resolveModerationReports", s.HandleComAtprotoAdminResolveModerationReports) e.POST("/xrpc/com.atproto.admin.reverseModerationAction", s.HandleComAtprotoAdminReverseModerationAction) e.GET("/xrpc/com.atproto.admin.searchRepos", s.HandleComAtprotoAdminSearchRepos) @@ -902,7 +946,6 @@ func (s *Server) RegisterHandlersComAtproto(e *echo.Echo) error { e.GET("/xrpc/com.atproto.repo.getRecord", s.HandleComAtprotoRepoGetRecord) e.GET("/xrpc/com.atproto.repo.listRecords", s.HandleComAtprotoRepoListRecords) e.POST("/xrpc/com.atproto.repo.putRecord", s.HandleComAtprotoRepoPutRecord) - e.POST("/xrpc/com.atproto.repo.rebaseRepo", s.HandleComAtprotoRepoRebaseRepo) e.POST("/xrpc/com.atproto.repo.uploadBlob", s.HandleComAtprotoRepoUploadBlob) e.POST("/xrpc/com.atproto.server.createAccount", s.HandleComAtprotoServerCreateAccount) e.POST("/xrpc/com.atproto.server.createAppPassword", s.HandleComAtprotoServerCreateAppPassword) @@ -923,14 +966,15 @@ func (s *Server) RegisterHandlersComAtproto(e *echo.Echo) error { e.GET("/xrpc/com.atproto.sync.getBlob", s.HandleComAtprotoSyncGetBlob) e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks) e.GET("/xrpc/com.atproto.sync.getCheckout", s.HandleComAtprotoSyncGetCheckout) - e.GET("/xrpc/com.atproto.sync.getCommitPath", s.HandleComAtprotoSyncGetCommitPath) e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetHead) + e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit) e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord) e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo) e.GET("/xrpc/com.atproto.sync.listBlobs", s.HandleComAtprotoSyncListBlobs) e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) e.POST("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate) e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) + e.POST("/xrpc/com.atproto.temp.upgradeRepoVersion", s.HandleComAtprotoTempUpgradeRepoVersion) return nil } @@ -1152,23 +1196,6 @@ func (s *Server) HandleComAtprotoAdminGetRepo(c echo.Context) error { return c.JSON(200, out) } -func (s *Server) HandleComAtprotoAdminRebaseRepo(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoAdminRebaseRepo") - defer span.End() - - var body comatprototypes.AdminRebaseRepo_Input - if err := c.Bind(&body); err != nil { - return err - } - var handleErr error - // func (s *Server) handleComAtprotoAdminRebaseRepo(ctx context.Context,body *comatprototypes.AdminRebaseRepo_Input) error - handleErr = s.handleComAtprotoAdminRebaseRepo(ctx, &body) - if handleErr != nil { - return handleErr - } - return nil -} - func (s *Server) HandleComAtprotoAdminResolveModerationReports(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoAdminResolveModerationReports") defer span.End() @@ -1519,23 +1546,6 @@ func (s *Server) HandleComAtprotoRepoPutRecord(c echo.Context) error { return c.JSON(200, out) } -func (s *Server) HandleComAtprotoRepoRebaseRepo(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoRepoRebaseRepo") - defer span.End() - - var body comatprototypes.RepoRebaseRepo_Input - if err := c.Bind(&body); err != nil { - return err - } - var handleErr error - // func (s *Server) handleComAtprotoRepoRebaseRepo(ctx context.Context,body *comatprototypes.RepoRebaseRepo_Input) error - handleErr = s.handleComAtprotoRepoRebaseRepo(ctx, &body) - if handleErr != nil { - return handleErr - } - return nil -} - func (s *Server) HandleComAtprotoRepoUploadBlob(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoRepoUploadBlob") defer span.End() @@ -1854,42 +1864,39 @@ func (s *Server) HandleComAtprotoSyncGetBlocks(c echo.Context) error { func (s *Server) HandleComAtprotoSyncGetCheckout(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCheckout") defer span.End() - commit := c.QueryParam("commit") did := c.QueryParam("did") var out io.Reader var handleErr error - // func (s *Server) handleComAtprotoSyncGetCheckout(ctx context.Context,commit string,did string) (io.Reader, error) - out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, commit, did) + // func (s *Server) handleComAtprotoSyncGetCheckout(ctx context.Context,did string) (io.Reader, error) + out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, did) if handleErr != nil { return handleErr } return c.Stream(200, "application/vnd.ipld.car", out) } -func (s *Server) HandleComAtprotoSyncGetCommitPath(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCommitPath") +func (s *Server) HandleComAtprotoSyncGetHead(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead") defer span.End() did := c.QueryParam("did") - earliest := c.QueryParam("earliest") - latest := c.QueryParam("latest") - var out *comatprototypes.SyncGetCommitPath_Output + var out *comatprototypes.SyncGetHead_Output var handleErr error - // func (s *Server) handleComAtprotoSyncGetCommitPath(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncGetCommitPath_Output, error) - out, handleErr = s.handleComAtprotoSyncGetCommitPath(ctx, did, earliest, latest) + // func (s *Server) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error) + out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did) if handleErr != nil { return handleErr } return c.JSON(200, out) } -func (s *Server) HandleComAtprotoSyncGetHead(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead") +func (s *Server) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit") defer span.End() did := c.QueryParam("did") - var out *comatprototypes.SyncGetHead_Output + var out *comatprototypes.SyncGetLatestCommit_Output var handleErr error - // func (s *Server) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error) - out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did) + // func (s *Server) handleComAtprotoSyncGetLatestCommit(ctx context.Context,did string) (*comatprototypes.SyncGetLatestCommit_Output, error) + out, handleErr = s.handleComAtprotoSyncGetLatestCommit(ctx, did) if handleErr != nil { return handleErr } @@ -1917,12 +1924,11 @@ func (s *Server) HandleComAtprotoSyncGetRepo(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRepo") defer span.End() did := c.QueryParam("did") - earliest := c.QueryParam("earliest") - latest := c.QueryParam("latest") + since := c.QueryParam("since") var out io.Reader var handleErr error - // func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context,did string,earliest string,latest string) (io.Reader, error) - out, handleErr = s.handleComAtprotoSyncGetRepo(ctx, did, earliest, latest) + // func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context,did string,since string) (io.Reader, error) + out, handleErr = s.handleComAtprotoSyncGetRepo(ctx, did, since) if handleErr != nil { return handleErr } @@ -1932,13 +1938,24 @@ func (s *Server) HandleComAtprotoSyncGetRepo(c echo.Context) error { func (s *Server) HandleComAtprotoSyncListBlobs(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListBlobs") defer span.End() + cursor := c.QueryParam("cursor") did := c.QueryParam("did") - earliest := c.QueryParam("earliest") - latest := c.QueryParam("latest") + + var limit int + if p := c.QueryParam("limit"); p != "" { + var err error + limit, err = strconv.Atoi(p) + if err != nil { + return err + } + } else { + limit = 500 + } + since := c.QueryParam("since") var out *comatprototypes.SyncListBlobs_Output var handleErr error - // func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncListBlobs_Output, error) - out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, did, earliest, latest) + // func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context,cursor string,did string,limit int,since string) (*comatprototypes.SyncListBlobs_Output, error) + out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, cursor, did, limit, since) if handleErr != nil { return handleErr } @@ -2003,3 +2020,20 @@ func (s *Server) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { } return nil } + +func (s *Server) HandleComAtprotoTempUpgradeRepoVersion(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoTempUpgradeRepoVersion") + defer span.End() + + var body comatprototypes.TempUpgradeRepoVersion_Input + if err := c.Bind(&body); err != nil { + return err + } + var handleErr error + // func (s *Server) handleComAtprotoTempUpgradeRepoVersion(ctx context.Context,body *comatprototypes.TempUpgradeRepoVersion_Input) error + handleErr = s.handleComAtprotoTempUpgradeRepoVersion(ctx, &body) + if handleErr != nil { + return handleErr + } + return nil +} diff --git a/repo/cbor_gen.go b/repo/cbor_gen.go index f845029f0..f2629fd93 100644 --- a/repo/cbor_gen.go +++ b/repo/cbor_gen.go @@ -68,7 +68,7 @@ func (t *SignedCommit) MarshalCBOR(w io.Writer) error { if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil { return err } - if _, err := io.WriteString(w, string("rev")); err != nil { + if _, err := cw.WriteString(string("rev")); err != nil { return err } @@ -79,7 +79,7 @@ func (t *SignedCommit) MarshalCBOR(w io.Writer) error { if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil { return err } - if _, err := io.WriteString(w, string(t.Rev)); err != nil { + if _, err := cw.WriteString(string(t.Rev)); err != nil { return err } } @@ -373,7 +373,7 @@ func (t *UnsignedCommit) MarshalCBOR(w io.Writer) error { if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil { return err } - if _, err := io.WriteString(w, string("rev")); err != nil { + if _, err := cw.WriteString(string("rev")); err != nil { return err } @@ -384,7 +384,7 @@ func (t *UnsignedCommit) MarshalCBOR(w io.Writer) error { if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil { return err } - if _, err := io.WriteString(w, string(t.Rev)); err != nil { + if _, err := cw.WriteString(string(t.Rev)); err != nil { return err } } diff --git a/repo/repo.go b/repo/repo.go index 0df9ab523..af6605af6 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -253,18 +253,18 @@ func (r *Repo) Truncate() { } // creates and writes a new SignedCommit for this repo, with `prev` pointing to old value -func (r *Repo) Commit(ctx context.Context, signer func(context.Context, string, []byte) ([]byte, error)) (cid.Cid, error) { +func (r *Repo) Commit(ctx context.Context, signer func(context.Context, string, []byte) ([]byte, error)) (cid.Cid, string, error) { ctx, span := otel.Tracer("repo").Start(ctx, "Commit") defer span.End() t, err := r.getMst(ctx) if err != nil { - return cid.Undef, err + return cid.Undef, "", err } rcid, err := t.GetPointer(ctx) if err != nil { - return cid.Undef, err + return cid.Undef, "", err } ncom := UnsignedCommit{ @@ -276,11 +276,11 @@ func (r *Repo) Commit(ctx context.Context, signer func(context.Context, string, sb, err := ncom.BytesForSigning() if err != nil { - return cid.Undef, fmt.Errorf("failed to serialize commit: %w", err) + return cid.Undef, "", fmt.Errorf("failed to serialize commit: %w", err) } sig, err := signer(ctx, ncom.Did, sb) if err != nil { - return cid.Undef, fmt.Errorf("failed to sign root: %w", err) + return cid.Undef, "", fmt.Errorf("failed to sign root: %w", err) } nsc := SignedCommit{ @@ -294,13 +294,13 @@ func (r *Repo) Commit(ctx context.Context, signer func(context.Context, string, nsccid, err := r.cst.Put(ctx, &nsc) if err != nil { - return cid.Undef, err + return cid.Undef, "", err } r.sc = nsc r.dirty = false - return nsccid, nil + return nsccid, nsc.Rev, nil } func (r *Repo) getMst(ctx context.Context) (*mst.MerkleSearchTree, error) { diff --git a/repomgr/ingest_test.go b/repomgr/ingest_test.go index 98737283a..4296cd949 100644 --- a/repomgr/ingest_test.go +++ b/repomgr/ingest_test.go @@ -64,7 +64,7 @@ func TestLoadNewRepo(t *testing.T) { defer fi.Close() ctx := context.TODO() - if err := repoman.ImportNewRepo(ctx, 2, "", fi, cid.Undef); err != nil { + if err := repoman.ImportNewRepo(ctx, 2, "", fi, nil); err != nil { t.Fatal(err) } } @@ -116,10 +116,10 @@ func TestIngestWithGap(t *testing.T) { } cs2 := testCarstore(t, dir2) + var since *string ctx := context.TODO() - var prev *cid.Cid for i := 0; i < 5; i++ { - slice, head, tid := doPost(t, cs2, did, prev, i) + slice, _, nrev, tid := doPost(t, cs2, did, since, i) ops := []*atproto.SyncSubscribeRepos_RepoOp{ { @@ -128,32 +128,30 @@ func TestIngestWithGap(t *testing.T) { }, } - if err := repoman.HandleExternalUserEvent(ctx, 1, 1, did, prev, slice, ops); err != nil { + if err := repoman.HandleExternalUserEvent(ctx, 1, 1, did, since, nrev, slice, ops); err != nil { t.Fatal(err) } - prev = &head + since = &nrev } - latest := *prev - // now do a few outside of the standard event stream flow for i := 0; i < 5; i++ { - _, head, _ := doPost(t, cs2, did, prev, i) - prev = &head + _, _, nrev, _ := doPost(t, cs2, did, since, i) + since = &nrev } buf := new(bytes.Buffer) - if err := cs2.ReadUserCar(ctx, 1, latest, *prev, true, buf); err != nil { + if err := cs2.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) } - if err := repoman.ImportNewRepo(ctx, 1, did, buf, latest); err != nil { + if err := repoman.ImportNewRepo(ctx, 1, did, buf, nil); err != nil { t.Fatal(err) } } -func doPost(t *testing.T, cs *carstore.CarStore, did string, prev *cid.Cid, postid int) ([]byte, cid.Cid, string) { +func doPost(t *testing.T, cs *carstore.CarStore, did string, prev *string, postid int) ([]byte, cid.Cid, string, string) { ctx := context.TODO() ds, err := cs.NewDeltaSession(ctx, 1, prev) if err != nil { @@ -169,65 +167,17 @@ func doPost(t *testing.T, cs *carstore.CarStore, did string, prev *cid.Cid, post t.Fatal(err) } - root, err := r.Commit(ctx, func(context.Context, string, []byte) ([]byte, error) { return nil, nil }) + root, nrev, err := r.Commit(ctx, func(context.Context, string, []byte) ([]byte, error) { return nil, nil }) if err != nil { t.Fatal(err) } - slice, err := ds.CloseWithRoot(ctx, root) + slice, err := ds.CloseWithRoot(ctx, root, nrev) if err != nil { t.Fatal(err) } - return slice, root, tid -} - -func TestRebase(t *testing.T) { - dir, err := os.MkdirTemp("", "integtest") - if err != nil { - t.Fatal(err) - } - - maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite"))) - if err != nil { - t.Fatal(err) - } - maindb.AutoMigrate(models.ActorInfo{}) - - did := "did:plc:beepboop" - maindb.Create(&models.ActorInfo{ - Did: did, - Uid: 1, - }) - - cs := testCarstore(t, dir) - - repoman := NewRepoManager(cs, &util.FakeKeyManager{}) - - ctx := context.TODO() - if err := repoman.InitNewActor(ctx, 1, "hello.world", "did:plc:foobar", "", "", ""); err != nil { - t.Fatal(err) - } - - for i := 0; i < 5; i++ { - _, _, err := repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ - Text: fmt.Sprintf("hello friend %d", i), - }) - if err != nil { - t.Fatal(err) - } - } - - if err := repoman.DoRebase(ctx, 1); err != nil { - t.Fatal(err) - } - - _, _, err = repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ - Text: "after the rebase", - }) - if err != nil { - t.Fatal(err) - } + return slice, root, nrev, tid } func TestDuplicateRecord(t *testing.T) { diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 8c0bd1a9e..eb403d62d 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -16,7 +16,6 @@ import ( "github.com/bluesky-social/indigo/models" "github.com/bluesky-social/indigo/mst" "github.com/bluesky-social/indigo/repo" - "github.com/bluesky-social/indigo/util" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -70,10 +69,11 @@ type RepoEvent struct { User models.Uid OldRoot *cid.Cid NewRoot cid.Cid + Since *string + Rev string RepoSlice []byte PDS uint Ops []RepoOp - Rebase bool } type RepoOp struct { @@ -146,16 +146,18 @@ func (rm *RepoManager) CreateRecord(ctx context.Context, user models.Uid, collec unlock := rm.lockUser(ctx, user) defer unlock() - head, err := rm.cs.GetUserRepoHead(ctx, user) + rev, err := rm.cs.GetUserRepoRev(ctx, user) if err != nil { return "", cid.Undef, err } - ds, err := rm.cs.NewDeltaSession(ctx, user, &head) + ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) if err != nil { return "", cid.Undef, err } + head := ds.BaseCid() + r, err := repo.OpenRepo(ctx, ds, head, true) if err != nil { return "", cid.Undef, err @@ -166,12 +168,12 @@ func (rm *RepoManager) CreateRecord(ctx context.Context, user models.Uid, collec return "", cid.Undef, err } - nroot, err := r.Commit(ctx, rm.kmgr.SignForUser) + nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) if err != nil { return "", cid.Undef, err } - rslice, err := ds.CloseWithRoot(ctx, nroot) + rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) if err != nil { return "", cid.Undef, fmt.Errorf("close with root: %w", err) } @@ -186,6 +188,8 @@ func (rm *RepoManager) CreateRecord(ctx context.Context, user models.Uid, collec User: user, OldRoot: oldroot, NewRoot: nroot, + Rev: nrev, + Since: &rev, Ops: []RepoOp{{ Kind: EvtKindCreateRecord, Collection: collection, @@ -207,16 +211,17 @@ func (rm *RepoManager) UpdateRecord(ctx context.Context, user models.Uid, collec unlock := rm.lockUser(ctx, user) defer unlock() - head, err := rm.cs.GetUserRepoHead(ctx, user) + rev, err := rm.cs.GetUserRepoRev(ctx, user) if err != nil { return cid.Undef, err } - ds, err := rm.cs.NewDeltaSession(ctx, user, &head) + ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) if err != nil { return cid.Undef, err } + head := ds.BaseCid() r, err := repo.OpenRepo(ctx, ds, head, true) if err != nil { return cid.Undef, err @@ -228,12 +233,12 @@ func (rm *RepoManager) UpdateRecord(ctx context.Context, user models.Uid, collec return cid.Undef, err } - nroot, err := r.Commit(ctx, rm.kmgr.SignForUser) + nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) if err != nil { return cid.Undef, err } - rslice, err := ds.CloseWithRoot(ctx, nroot) + rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) if err != nil { return cid.Undef, fmt.Errorf("close with root: %w", err) } @@ -248,6 +253,8 @@ func (rm *RepoManager) UpdateRecord(ctx context.Context, user models.Uid, collec User: user, OldRoot: oldroot, NewRoot: nroot, + Rev: nrev, + Since: &rev, Ops: []RepoOp{{ Kind: EvtKindUpdateRecord, Collection: collection, @@ -269,16 +276,17 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec unlock := rm.lockUser(ctx, user) defer unlock() - head, err := rm.cs.GetUserRepoHead(ctx, user) + rev, err := rm.cs.GetUserRepoRev(ctx, user) if err != nil { return err } - ds, err := rm.cs.NewDeltaSession(ctx, user, &head) + ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) if err != nil { return err } + head := ds.BaseCid() r, err := repo.OpenRepo(ctx, ds, head, true) if err != nil { return err @@ -289,12 +297,12 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec return err } - nroot, err := r.Commit(ctx, rm.kmgr.SignForUser) + nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) if err != nil { return err } - rslice, err := ds.CloseWithRoot(ctx, nroot) + rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) if err != nil { return fmt.Errorf("close with root: %w", err) } @@ -309,6 +317,8 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec User: user, OldRoot: oldroot, NewRoot: nroot, + Rev: nrev, + Since: &rev, Ops: []RepoOp{{ Kind: EvtKindDeleteRecord, Collection: collection, @@ -350,12 +360,12 @@ func (rm *RepoManager) InitNewActor(ctx context.Context, user models.Uid, handle return fmt.Errorf("setting initial actor profile: %w", err) } - root, err := r.Commit(ctx, rm.kmgr.SignForUser) + root, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) if err != nil { return fmt.Errorf("committing repo for actor init: %w", err) } - rslice, err := ds.CloseWithRoot(ctx, root) + rslice, err := ds.CloseWithRoot(ctx, root, nrev) if err != nil { return fmt.Errorf("close with root: %w", err) } @@ -364,6 +374,7 @@ func (rm *RepoManager) InitNewActor(ctx context.Context, user models.Uid, handle rm.events(ctx, &RepoEvent{ User: user, NewRoot: root, + Rev: nrev, Ops: []RepoOp{{ Kind: EvtKindCreateRecord, Collection: "app.bsky.actor.profile", @@ -384,8 +395,15 @@ func (rm *RepoManager) GetRepoRoot(ctx context.Context, user models.Uid) (cid.Ci return rm.cs.GetUserRepoHead(ctx, user) } -func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, earlyCid, lateCid cid.Cid, w io.Writer) error { - return rm.cs.ReadUserCar(ctx, user, earlyCid, lateCid, true, w) +func (rm *RepoManager) GetRepoRev(ctx context.Context, user models.Uid) (string, error) { + unlock := rm.lockUser(ctx, user) + defer unlock() + + return rm.cs.GetUserRepoRev(ctx, user) +} + +func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, since string, w io.Writer) error { + return rm.cs.ReadUserCar(ctx, user, since, true, w) } func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, cbg.CBORMarshaler, error) { @@ -445,153 +463,6 @@ func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.Ac return ap, nil } -var ErrUncleanRebase = fmt.Errorf("unclean rebase") - -func (rm *RepoManager) HandleRebase(ctx context.Context, pdsid uint, uid models.Uid, did string, prev *cid.Cid, commit cid.Cid, carslice []byte) error { - ctx, span := otel.Tracer("repoman").Start(ctx, "HandleRebase") - defer span.End() - - log.Infow("HandleRebase", "pds", pdsid, "uid", uid, "commit", commit) - - unlock := rm.lockUser(ctx, uid) - defer unlock() - - ro, err := rm.cs.ReadOnlySession(uid) - if err != nil { - return err - } - - head, err := rm.cs.GetUserRepoHead(ctx, uid) - if err != nil { - return err - } - - // TODO: do we allow prev to be nil in any case here? - if prev != nil { - if *prev != head { - log.Warnw("rebase 'prev' value did not match our latest head for repo", "did", did, "rprev", prev.String(), "lprev", head.String()) - } - } - - currepo, err := repo.OpenRepo(ctx, ro, head, true) - if err != nil { - return err - } - - olddc := currepo.DataCid() - - root, ds, err := rm.cs.ImportSlice(ctx, uid, nil, carslice) - if err != nil { - return fmt.Errorf("importing external carslice: %w", err) - } - - r, err := repo.OpenRepo(ctx, ds, root, true) - if err != nil { - return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err) - } - - if r.DataCid() != olddc { - return ErrUncleanRebase - } - - if err := rm.CheckRepoSig(ctx, r, did); err != nil { - return err - } - - // TODO: this is moderately expensive and currently results in the users - // entire repo being held in memory - if err := r.CopyDataTo(ctx, ds); err != nil { - return err - } - - if err := ds.CloseAsRebase(ctx, root); err != nil { - return fmt.Errorf("finalizing rebase: %w", err) - } - - if rm.events != nil { - rm.events(ctx, &RepoEvent{ - User: uid, - OldRoot: prev, - NewRoot: root, - Ops: nil, - RepoSlice: carslice, - PDS: pdsid, - Rebase: true, - }) - } - - return nil -} - -func (rm *RepoManager) DoRebase(ctx context.Context, uid models.Uid) error { - ctx, span := otel.Tracer("repoman").Start(ctx, "DoRebase") - defer span.End() - - log.Infow("DoRebase", "uid", uid) - - unlock := rm.lockUser(ctx, uid) - defer unlock() - - ds, err := rm.cs.NewDeltaSession(ctx, uid, nil) - if err != nil { - return err - } - - head, err := rm.cs.GetUserRepoHead(ctx, uid) - if err != nil { - return err - } - - r, err := repo.OpenRepo(ctx, ds, head, true) - if err != nil { - return err - } - - r.Truncate() - - nroot, err := r.Commit(ctx, rm.kmgr.SignForUser) - if err != nil { - return err - } - - if err := r.CopyDataTo(ctx, ds); err != nil { - return err - } - - if err := ds.CloseAsRebase(ctx, nroot); err != nil { - return fmt.Errorf("finalizing rebase: %w", err) - } - - // outbound car slice should just be the new signed root - buf := new(bytes.Buffer) - if _, err := carstore.WriteCarHeader(buf, nroot); err != nil { - return err - } - - robj, err := ds.Get(ctx, nroot) - if err != nil { - return err - } - _, err = carstore.LdWrite(buf, robj.Cid().Bytes(), robj.RawData()) - if err != nil { - return err - } - - if rm.events != nil { - rm.events(ctx, &RepoEvent{ - User: uid, - OldRoot: &head, - NewRoot: nroot, - Ops: nil, - RepoSlice: buf.Bytes(), - PDS: 0, - Rebase: true, - }) - } - - return nil -} - func (rm *RepoManager) CheckRepoSig(ctx context.Context, r *repo.Repo, expdid string) error { ctx, span := otel.Tracer("repoman").Start(ctx, "CheckRepoSig") defer span.End() @@ -615,16 +486,16 @@ func (rm *RepoManager) CheckRepoSig(ctx context.Context, r *repo.Repo, expdid st return nil } -func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, prev *cid.Cid, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { +func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent") defer span.End() - log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "prev", prev) + log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev) unlock := rm.lockUser(ctx, uid) defer unlock() - root, ds, err := rm.cs.ImportSlice(ctx, uid, prev, carslice) + root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice) if err != nil { return fmt.Errorf("importing external carslice: %w", err) } @@ -684,16 +555,18 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, } } - rslice, err := ds.CloseWithRoot(ctx, root) + rslice, err := ds.CloseWithRoot(ctx, root, nrev) if err != nil { return fmt.Errorf("close with root: %w", err) } if rm.events != nil { rm.events(ctx, &RepoEvent{ - User: uid, - OldRoot: prev, + User: uid, + //OldRoot: prev, NewRoot: root, + Rev: nrev, + Since: since, Ops: evtops, RepoSlice: rslice, PDS: pdsid, @@ -714,16 +587,17 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [ unlock := rm.lockUser(ctx, user) defer unlock() - head, err := rm.cs.GetUserRepoHead(ctx, user) + rev, err := rm.cs.GetUserRepoRev(ctx, user) if err != nil { return err } - ds, err := rm.cs.NewDeltaSession(ctx, user, &head) + ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) if err != nil { return err } + head := ds.BaseCid() r, err := repo.OpenRepo(ctx, ds, head, true) if err != nil { return err @@ -786,12 +660,12 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [ } } - nroot, err := r.Commit(ctx, rm.kmgr.SignForUser) + nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) if err != nil { return err } - rslice, err := ds.CloseWithRoot(ctx, nroot) + rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) if err != nil { return fmt.Errorf("close with root: %w", err) } @@ -807,6 +681,8 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [ OldRoot: oldroot, NewRoot: nroot, RepoSlice: rslice, + Rev: nrev, + Since: &rev, Ops: ops, }) } @@ -814,25 +690,30 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [ return nil } -func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoDid string, r io.Reader, oldest cid.Cid) error { +func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoDid string, r io.Reader, rev *string) error { ctx, span := otel.Tracer("repoman").Start(ctx, "ImportNewRepo") defer span.End() unlock := rm.lockUser(ctx, user) defer unlock() - head, err := rm.cs.GetUserRepoHead(ctx, user) + currev, err := rm.cs.GetUserRepoRev(ctx, user) + if err != nil { + return err + } + + curhead, err := rm.cs.GetUserRepoHead(ctx, user) if err != nil { return err } - if head != oldest { + if rev != nil && *rev != currev { // TODO: we could probably just deal with this return fmt.Errorf("ImportNewRepo called with incorrect base") } - err = rm.processNewRepo(ctx, user, r, head, func(ctx context.Context, old, nu cid.Cid, finish func(context.Context) ([]byte, error), bs blockstore.Blockstore) error { - r, err := repo.OpenRepo(ctx, bs, nu, true) + err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error { + r, err := repo.OpenRepo(ctx, bs, root, true) if err != nil { return fmt.Errorf("opening new repo: %w", err) } @@ -848,7 +729,7 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD return fmt.Errorf("new user signature check failed: %w", err) } - diffops, err := r.DiffSince(ctx, old) + diffops, err := r.DiffSince(ctx, curhead) if err != nil { return fmt.Errorf("diff trees: %w", err) } @@ -865,21 +746,18 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD } } - slice, err := finish(ctx) + slice, err := finish(ctx, scom.Rev) if err != nil { return err } - var oldroot *cid.Cid - if old.Defined() { - oldroot = &old - } - if rm.events != nil { rm.events(ctx, &RepoEvent{ - User: user, - OldRoot: oldroot, - NewRoot: nu, + User: user, + //OldRoot: oldroot, + NewRoot: root, + Rev: scom.Rev, + Since: &currev, RepoSlice: slice, Ops: ops, }) @@ -888,7 +766,7 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD return nil }) if err != nil { - return fmt.Errorf("process new repo (current head: %s): %w:", head, err) + return fmt.Errorf("process new repo (current rev: %s): %w:", currev, err) } return nil @@ -944,7 +822,7 @@ func processOp(ctx context.Context, bs blockstore.Blockstore, op *mst.DiffOp) (* } } -func (rm *RepoManager) processNewRepo(ctx context.Context, user models.Uid, r io.Reader, until cid.Cid, cb func(ctx context.Context, old, nu cid.Cid, finish func(context.Context) ([]byte, error), bs blockstore.Blockstore) error) error { +func (rm *RepoManager) processNewRepo(ctx context.Context, user models.Uid, r io.Reader, rev *string, cb func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error) error { ctx, span := otel.Tracer("repoman").Start(ctx, "processNewRepo") defer span.End() @@ -973,97 +851,40 @@ func (rm *RepoManager) processNewRepo(ctx context.Context, user models.Uid, r io } } - head := &carr.Header.Roots[0] - - var commits []cid.Cid - for head != nil && *head != until { - commits = append(commits, *head) - rep, err := repo.OpenRepo(ctx, membs, *head, true) - if err != nil { - return fmt.Errorf("opening repo for backwalk (%d commits, until: %s, head: %s, carRoot: %s): %w", len(commits), until, *head, carr.Header.Roots[0], err) - } - - prev, err := rep.PrevCommit(ctx) - if err != nil { - return fmt.Errorf("prevCommit: %w", err) - } - - head = prev - } - - if until.Defined() && (head == nil || *head != until) { - // TODO: this shouldnt be happening, but i've seen some log messages - // suggest that it might. Leaving this here to discover any cases where - // it does. - log.Errorw("reached end of walkback without finding our 'until' commit", - "until", until, - "root", carr.Header.Roots[0], - "commits", len(commits), - "head", head, - "user", user, - ) - } - - // now we need to generate repo slices for each commit - seen := make(map[cid.Cid]bool) - if until.Defined() { - seen[until] = true + root := carr.Header.Roots[0] + // TODO: if there are blocks that get convergently recreated throughout + // the repos lifecycle, this will end up erroneously not including + // them. We should compute the set of blocks needed to read any repo + // ops that happened in the commit and use that for our 'output' blocks + cids, err := walkTree(ctx, seen, root, membs, true) + if err != nil { + return fmt.Errorf("walkTree: %w", err) } - cbs := membs - if until.Defined() { - bs, err := rm.cs.ReadOnlySession(user) - if err != nil { - return err - } - - // TODO: we technically only need this for the 'next' commit to diff against our current head. - cbs = util.NewReadThroughBstore(bs, membs) + ds, err := rm.cs.NewDeltaSession(ctx, user, rev) + if err != nil { + return fmt.Errorf("opening delta session: %w", err) } - prev := until - for i := len(commits) - 1; i >= 0; i-- { - root := commits[i] - // TODO: if there are blocks that get convergently recreated throughout - // the repos lifecycle, this will end up erroneously not including - // them. We should compute the set of blocks needed to read any repo - // ops that happened in the commit and use that for our 'output' blocks - cids, err := walkTree(ctx, seen, root, membs, true) - if err != nil { - return fmt.Errorf("walkTree: %w", err) - } - - var prevptr *cid.Cid - if prev.Defined() { - prevptr = &prev - } - ds, err := rm.cs.NewDeltaSession(ctx, user, prevptr) + for _, c := range cids { + blk, err := membs.Get(ctx, c) if err != nil { - return fmt.Errorf("opening delta session (%d / %d): %w", i, len(commits)-1, err) + return fmt.Errorf("copying walked cids to carstore: %w", err) } - for _, c := range cids { - blk, err := membs.Get(ctx, c) - if err != nil { - return fmt.Errorf("copying walked cids to carstore: %w", err) - } - - if err := ds.Put(ctx, blk); err != nil { - return err - } - } - - finish := func(ctx context.Context) ([]byte, error) { - return ds.CloseWithRoot(ctx, root) + if err := ds.Put(ctx, blk); err != nil { + return err } + } - if err := cb(ctx, prev, root, finish, cbs); err != nil { - return fmt.Errorf("cb errored (%d/%d) root: %s, prev: %s: %w", i, len(commits)-1, root, prev, err) - } + finish := func(ctx context.Context, nrev string) ([]byte, error) { + return ds.CloseWithRoot(ctx, root, nrev) + } - prev = root + if err := cb(ctx, root, finish, membs); err != nil { + return fmt.Errorf("cb errored root: %s, rev: %s: %w", root, *rev, err) } return nil diff --git a/search/firehose.go b/search/firehose.go index e1b5a5cb9..53d48564c 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -239,7 +239,7 @@ func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, } func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { - repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String()) + repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "") if err != nil { return err } diff --git a/testing/car_did_repro_test.go b/testing/car_did_repro_test.go index 4acea816a..700eb5638 100644 --- a/testing/car_did_repro_test.go +++ b/testing/car_did_repro_test.go @@ -81,7 +81,7 @@ func deepReproduceRepo(t *testing.T, carPath, docPath string) { // verify MST tree reproduced kmgr := &util.FakeKeyManager{} - _, err = secondRepo.Commit(ctx, kmgr.SignForUser) + _, _, err = secondRepo.Commit(ctx, kmgr.SignForUser) if err != nil { t.Fatal(err) } diff --git a/testing/integ_test.go b/testing/integ_test.go index fc73a47df..4ba4dfd4a 100644 --- a/testing/integ_test.go +++ b/testing/integ_test.go @@ -332,128 +332,6 @@ func TestBGSTakedown(t *testing.T) { assert.Equal(alice.did, last.RepoCommit.Repo) } -func TestRebase(t *testing.T) { - if testing.Short() { - t.Skip("skipping BGS test in 'short' test mode") - } - assert := assert.New(t) - didr := TestPLC(t) - p1 := MustSetupPDS(t, ".tpds", didr) - p1.Run(t) - - b1 := MustSetupBGS(t, didr) - b1.Run(t) - - b1.tr.TrialHosts = []string{p1.RawHost()} - - p1.RequestScraping(t, b1) - - time.Sleep(time.Millisecond * 50) - - bob := p1.MustNewUser(t, "bob.tpds") - - bob.Post(t, "cats for cats") - bob.Post(t, "i am the king of the world") - bob.Post(t, "the name is bob") - bob.Post(t, "why cant i eat pie") - - time.Sleep(time.Millisecond * 100) - - evts1 := b1.Events(t, 0) - defer evts1.Cancel() - - preRebaseEvts := evts1.WaitFor(5) - fmt.Println(preRebaseEvts) - - bob.DoRebase(t) - - rbevt := evts1.Next() - assert.Equal(true, rbevt.RepoCommit.Rebase) - - sc := commitFromSlice(t, rbevt.RepoCommit.Blocks, (cid.Cid)(rbevt.RepoCommit.Commit)) - assert.Nil(sc.Prev) - - lev := preRebaseEvts[4] - oldsc := commitFromSlice(t, lev.RepoCommit.Blocks, (cid.Cid)(lev.RepoCommit.Commit)) - - assert.Equal(sc.Data, oldsc.Data) - - evts2 := b1.Events(t, 0) - afterEvts := evts2.WaitFor(1) - assert.Equal(true, afterEvts[0].RepoCommit.Rebase) -} - -func TestRebaseMulti(t *testing.T) { - if testing.Short() { - t.Skip("skipping BGS test in 'short' test mode") - } - assert := assert.New(t) - didr := TestPLC(t) - p1 := MustSetupPDS(t, ".tpds", didr) - p1.Run(t) - - b1 := MustSetupBGS(t, didr) - b1.Run(t) - - b1.tr.TrialHosts = []string{p1.RawHost()} - - p1.RequestScraping(t, b1) - - esgenesis := b1.Events(t, 0) - - time.Sleep(time.Millisecond * 50) - - bob := p1.MustNewUser(t, "bob.tpds") - - for i := 0; i < 10; i++ { - bob.Post(t, fmt.Sprintf("this is bobs post %d", i)) - } - - // wait for 11 events, the first one is the actor creation - firsten := esgenesis.WaitFor(11) - _ = firsten - - fmt.Println("REBASE ONE") - bob.DoRebase(t) - - var posts []*atproto.RepoStrongRef - for i := 0; i < 10; i++ { - ref := bob.Post(t, fmt.Sprintf("this is bobs post after rebase %d", i)) - posts = append(posts, ref) - } - - time.Sleep(time.Millisecond * 50) - - evts1 := b1.Events(t, 0) - defer evts1.Cancel() - - all := evts1.WaitFor(11) - - assert.Equal(true, all[0].RepoCommit.Rebase) - assert.Equal(int64(12), all[0].RepoCommit.Seq) - assert.Equal(posts[0].Cid, all[1].RepoCommit.Ops[0].Cid.String()) - - // and another one! - fmt.Println("REBASE TWO") - bob.DoRebase(t) - - var posts2 []*atproto.RepoStrongRef - for i := 0; i < 15; i++ { - ref := bob.Post(t, fmt.Sprintf("this is bobs post after second rebase %d", i)) - posts2 = append(posts2, ref) - } - - time.Sleep(time.Millisecond * 50) - - evts2 := b1.Events(t, 0) - defer evts2.Cancel() - - all = evts2.WaitFor(16) - - assert.Equal(true, all[0].RepoCommit.Rebase) - assert.Equal(posts2[0].Cid, all[1].RepoCommit.Ops[0].Cid.String()) -} - func jsonPrint(v any) { b, _ := json.Marshal(v) fmt.Println(string(b)) diff --git a/testing/testdata/greenground.repo.car b/testing/testdata/greenground.repo.car index f22fe03d7..b4c88d5c5 100644 Binary files a/testing/testdata/greenground.repo.car and b/testing/testdata/greenground.repo.car differ diff --git a/testing/utils.go b/testing/utils.go index b9f189c20..9a216fe41 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -810,7 +810,7 @@ func GenerateFakeRepo(r *repo.Repo, size int) (cid.Cid, error) { kmgr := &bsutil.FakeKeyManager{} - nroot, err := r.Commit(ctx, kmgr.SignForUser) + nroot, _, err := r.Commit(ctx, kmgr.SignForUser) if err != nil { return cid.Undef, err }