Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement some unimplemented Atproto Sync BGS Handlers #326

Merged
merged 9 commits into from
Sep 21, 2023
4 changes: 2 additions & 2 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,13 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {
// TODO: this API is temporary until we formalize what we want here

e.GET("/xrpc/com.atproto.sync.subscribeRepos", bgs.EventsHandler)
e.GET("/xrpc/com.atproto.sync.getCheckout", bgs.HandleComAtprotoSyncGetCheckout)
e.GET("/xrpc/com.atproto.sync.getHead", bgs.HandleComAtprotoSyncGetHead)
e.GET("/xrpc/com.atproto.sync.getRecord", bgs.HandleComAtprotoSyncGetRecord)
e.GET("/xrpc/com.atproto.sync.getRepo", bgs.HandleComAtprotoSyncGetRepo)
e.GET("/xrpc/com.atproto.sync.getBlocks", bgs.HandleComAtprotoSyncGetBlocks)
e.GET("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl)
e.POST("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl)
e.GET("/xrpc/com.atproto.sync.listRepos", bgs.HandleComAtprotoSyncListRepos)
e.GET("/xrpc/com.atproto.sync.getLatestCommit", bgs.HandleComAtprotoSyncGetLatestCommit)
e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", bgs.HandleComAtprotoSyncNotifyOfUpdate)
e.GET("/xrpc/_health", bgs.HandleHealthCheck)

Expand Down
132 changes: 94 additions & 38 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,61 @@ package bgs
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"

atproto "github.com/bluesky-social/indigo/api/atproto"
comatprototypes "github.com/bluesky-social/indigo/api/atproto"
"gorm.io/gorm"

"github.com/bluesky-social/indigo/util"
"github.com/bluesky-social/indigo/xrpc"
"github.com/ipfs/go-cid"
"github.com/labstack/echo/v4"
)

func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context, did string) (io.Reader, error) {
/*
u, err := s.Index.LookupUserByDid(ctx, did)
if err != nil {
return nil, err
func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

c, err := cid.Decode(commit)
reqCid := cid.Undef
if commit != "" {
reqCid, err = cid.Decode(commit)
if err != nil {
return nil, err
}

// TODO: need to enable a 'write to' interface for codegenned things...
buf := new(bytes.Buffer)
if err := s.repoman.GetCheckout(ctx, u.Uid, c, buf); err != nil {
return nil, err
return nil, fmt.Errorf("failed to decode commit cid: %w", err)
}
}

return buf, nil
*/
return nil, fmt.Errorf("nyi")
}

func (s *BGS) handleComAtprotoSyncGetCommitPath(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncGetCommitPath_Output, error) {
return nil, fmt.Errorf("nyi")
}

func (s *BGS) handleComAtprotoSyncGetHead(ctx context.Context, did string) (*comatprototypes.SyncGetHead_Output, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
_, record, err := s.repoman.GetRecord(ctx, u.Uid, collection, rkey, reqCid)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get record: %w", err)
}

root, err := s.repoman.GetRepoRoot(ctx, u.Uid)
buf := new(bytes.Buffer)
err = record.MarshalCBOR(buf)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to marshal record: %w", err)
}

return &comatprototypes.SyncGetHead_Output{
Root: root.String(),
}, nil
}

func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) {
return nil, fmt.Errorf("nyi")
return buf, nil
}

func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
if err != nil {
return nil, err
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

// TODO: stream the response
Expand Down Expand Up @@ -139,7 +131,7 @@ func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *coma

func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context, cid string, did string) (io.Reader, error) {
if s.blobs == nil {
return nil, fmt.Errorf("blob store disabled")
return nil, echo.NewHTTPError(http.StatusNotFound, "blobs not enabled on this server")
}

b, err := s.blobs.GetBlob(ctx, cid, did)
Expand All @@ -155,9 +147,73 @@ func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, cursor string,
}

func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) {
return nil, fmt.Errorf("NYI")
// Use UIDs for the cursor
var err error
c := int64(0)
if cursor != "" {
c, err = strconv.ParseInt(cursor, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid cursor: %w", err)
}
}

users := []User{}
if err := s.db.Model(&User{}).Where("id > ?", c).Order("id").Limit(limit).Find(&users).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return &comatprototypes.SyncListRepos_Output{}, nil
}
return nil, fmt.Errorf("failed to get users: %w", err)
}

if len(users) == 0 {
return &comatprototypes.SyncListRepos_Output{}, nil
}

resp := &comatprototypes.SyncListRepos_Output{
Repos: []*comatprototypes.SyncListRepos_Repo{},
}

for i := range users {
user := users[i]
root, err := s.repoman.GetRepoRoot(ctx, user.ID)
if err != nil {
return nil, fmt.Errorf("failed to get repo root for (%s): %w", user.Did, err)
}

resp.Repos = append(resp.Repos, &comatprototypes.SyncListRepos_Repo{
Did: user.Did,
Head: root.String(),
})
}

c += int64(len(users))
cursor = strconv.FormatInt(c, 10)
resp.Cursor = &cursor

return resp, nil
}

func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) {
return nil, fmt.Errorf("NYI")
u, err := s.Index.LookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

root, err := s.repoman.GetRepoRoot(ctx, u.Uid)
if err != nil {
return nil, fmt.Errorf("failed to get repo root: %w", err)
}

rev, err := s.repoman.GetRepoRev(ctx, u.Uid)
if err != nil {
return nil, fmt.Errorf("failed to get repo rev: %w", err)
}

return &comatprototypes.SyncGetLatestCommit_Output{
Cid: root.String(),
Rev: rev,
}, nil
}
Loading