diff --git a/bgs/bgs.go b/bgs/bgs.go index 612a6c22a..a9d24fc60 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -273,13 +273,13 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { // TODO: this API is temporary until we formalize what we want here e.GET("/xrpc/com.atproto.sync.subscribeRepos", bgs.EventsHandler) - e.GET("/xrpc/com.atproto.sync.getCheckout", bgs.HandleComAtprotoSyncGetCheckout) - e.GET("/xrpc/com.atproto.sync.getHead", bgs.HandleComAtprotoSyncGetHead) e.GET("/xrpc/com.atproto.sync.getRecord", bgs.HandleComAtprotoSyncGetRecord) e.GET("/xrpc/com.atproto.sync.getRepo", bgs.HandleComAtprotoSyncGetRepo) e.GET("/xrpc/com.atproto.sync.getBlocks", bgs.HandleComAtprotoSyncGetBlocks) e.GET("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl) e.POST("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl) + e.GET("/xrpc/com.atproto.sync.listRepos", bgs.HandleComAtprotoSyncListRepos) + e.GET("/xrpc/com.atproto.sync.getLatestCommit", bgs.HandleComAtprotoSyncGetLatestCommit) e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", bgs.HandleComAtprotoSyncNotifyOfUpdate) e.GET("/xrpc/_health", bgs.HandleHealthCheck) diff --git a/bgs/handlers.go b/bgs/handlers.go index 68b3e27dc..dc2827178 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -3,69 +3,61 @@ package bgs import ( "bytes" "context" + "errors" "fmt" "io" "net/http" + "strconv" "strings" atproto "github.com/bluesky-social/indigo/api/atproto" comatprototypes "github.com/bluesky-social/indigo/api/atproto" + "gorm.io/gorm" + "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/xrpc" + "github.com/ipfs/go-cid" "github.com/labstack/echo/v4" ) -func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context, did string) (io.Reader, error) { - /* - u, err := s.Index.LookupUserByDid(ctx, did) - if err != nil { - return nil, err +func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) { + u, err := s.Index.LookupUserByDid(ctx, did) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") } + return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") + } - c, err := cid.Decode(commit) + reqCid := cid.Undef + if commit != "" { + reqCid, err = cid.Decode(commit) if err != nil { - return nil, err - } - - // TODO: need to enable a 'write to' interface for codegenned things... - buf := new(bytes.Buffer) - if err := s.repoman.GetCheckout(ctx, u.Uid, c, buf); err != nil { - return nil, err + return nil, fmt.Errorf("failed to decode commit cid: %w", err) } + } - return buf, nil - */ - return nil, fmt.Errorf("nyi") -} - -func (s *BGS) handleComAtprotoSyncGetCommitPath(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncGetCommitPath_Output, error) { - return nil, fmt.Errorf("nyi") -} - -func (s *BGS) handleComAtprotoSyncGetHead(ctx context.Context, did string) (*comatprototypes.SyncGetHead_Output, error) { - u, err := s.Index.LookupUserByDid(ctx, did) + _, record, err := s.repoman.GetRecord(ctx, u.Uid, collection, rkey, reqCid) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get record: %w", err) } - root, err := s.repoman.GetRepoRoot(ctx, u.Uid) + buf := new(bytes.Buffer) + err = record.MarshalCBOR(buf) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to marshal record: %w", err) } - return &comatprototypes.SyncGetHead_Output{ - Root: root.String(), - }, nil -} - -func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) { - return nil, fmt.Errorf("nyi") + return buf, nil } func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) { u, err := s.Index.LookupUserByDid(ctx, did) if err != nil { - return nil, err + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") + } + return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } // TODO: stream the response @@ -139,7 +131,7 @@ func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *coma func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context, cid string, did string) (io.Reader, error) { if s.blobs == nil { - return nil, fmt.Errorf("blob store disabled") + return nil, echo.NewHTTPError(http.StatusNotFound, "blobs not enabled on this server") } b, err := s.blobs.GetBlob(ctx, cid, did) @@ -155,9 +147,73 @@ func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, cursor string, } func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) { - return nil, fmt.Errorf("NYI") + // Use UIDs for the cursor + var err error + c := int64(0) + if cursor != "" { + c, err = strconv.ParseInt(cursor, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid cursor: %w", err) + } + } + + users := []User{} + if err := s.db.Model(&User{}).Where("id > ?", c).Order("id").Limit(limit).Find(&users).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return &comatprototypes.SyncListRepos_Output{}, nil + } + return nil, fmt.Errorf("failed to get users: %w", err) + } + + if len(users) == 0 { + return &comatprototypes.SyncListRepos_Output{}, nil + } + + resp := &comatprototypes.SyncListRepos_Output{ + Repos: []*comatprototypes.SyncListRepos_Repo{}, + } + + for i := range users { + user := users[i] + root, err := s.repoman.GetRepoRoot(ctx, user.ID) + if err != nil { + return nil, fmt.Errorf("failed to get repo root for (%s): %w", user.Did, err) + } + + resp.Repos = append(resp.Repos, &comatprototypes.SyncListRepos_Repo{ + Did: user.Did, + Head: root.String(), + }) + } + + c += int64(len(users)) + cursor = strconv.FormatInt(c, 10) + resp.Cursor = &cursor + + return resp, nil } func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) { - return nil, fmt.Errorf("NYI") + u, err := s.Index.LookupUserByDid(ctx, did) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") + } + return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") + } + + root, err := s.repoman.GetRepoRoot(ctx, u.Uid) + if err != nil { + return nil, fmt.Errorf("failed to get repo root: %w", err) + } + + rev, err := s.repoman.GetRepoRev(ctx, u.Uid) + if err != nil { + return nil, fmt.Errorf("failed to get repo rev: %w", err) + } + + return &comatprototypes.SyncGetLatestCommit_Output{ + Cid: root.String(), + Rev: rev, + }, nil } diff --git a/bgs/stubs.go b/bgs/stubs.go index bbb8777bf..f5308d915 100644 --- a/bgs/stubs.go +++ b/bgs/stubs.go @@ -1,14 +1,22 @@ package bgs import ( + "fmt" "io" + "net/http" "strconv" comatprototypes "github.com/bluesky-social/indigo/api/atproto" + "github.com/bluesky-social/indigo/atproto/syntax" + "github.com/ipfs/go-cid" "github.com/labstack/echo/v4" "go.opentelemetry.io/otel" ) +type XRPCError struct { + Message string `json:"message"` +} + func (s *BGS) RegisterHandlersAppBsky(e *echo.Echo) error { return nil } @@ -16,8 +24,6 @@ func (s *BGS) RegisterHandlersAppBsky(e *echo.Echo) error { func (s *BGS) RegisterHandlersComAtproto(e *echo.Echo) error { e.GET("/xrpc/com.atproto.sync.getBlob", s.HandleComAtprotoSyncGetBlob) e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks) - e.GET("/xrpc/com.atproto.sync.getCheckout", s.HandleComAtprotoSyncGetCheckout) - e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetHead) e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit) e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord) e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo) @@ -31,12 +37,23 @@ func (s *BGS) RegisterHandlersComAtproto(e *echo.Echo) error { func (s *BGS) HandleComAtprotoSyncGetBlob(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetBlob") defer span.End() - cid := c.QueryParam("cid") + bCid := c.QueryParam("cid") did := c.QueryParam("did") + + _, err := cid.Parse(bCid) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid cid: %s", bCid)}) + } + + _, err = syntax.ParseDID(did) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) + } + var out io.Reader var handleErr error // func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context,cid string,did string) (io.Reader, error) - out, handleErr = s.handleComAtprotoSyncGetBlob(ctx, cid, did) + out, handleErr = s.handleComAtprotoSyncGetBlob(ctx, bCid, did) if handleErr != nil { return handleErr } @@ -49,6 +66,18 @@ func (s *BGS) HandleComAtprotoSyncGetBlocks(c echo.Context) error { cids := c.QueryParams()["cids"] did := c.QueryParam("did") + _, err := syntax.ParseDID(did) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) + } + + for _, cd := range cids { + _, err = cid.Parse(cd) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid cid: %s", cd)}) + } + } + var out io.Reader var handleErr error // func (s *BGS) handleComAtprotoSyncGetBlocks(ctx context.Context,cids []string,did string) (io.Reader, error) @@ -59,38 +88,16 @@ func (s *BGS) HandleComAtprotoSyncGetBlocks(c echo.Context) error { return c.Stream(200, "application/vnd.ipld.car", out) } -func (s *BGS) HandleComAtprotoSyncGetCheckout(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCheckout") +func (s *BGS) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit") defer span.End() did := c.QueryParam("did") - var out io.Reader - var handleErr error - // func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context,did string) (io.Reader, error) - out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, did) - if handleErr != nil { - return handleErr - } - return c.Stream(200, "application/vnd.ipld.car", out) -} -func (s *BGS) HandleComAtprotoSyncGetHead(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead") - defer span.End() - did := c.QueryParam("did") - var out *comatprototypes.SyncGetHead_Output - var handleErr error - // func (s *BGS) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error) - out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did) - if handleErr != nil { - return handleErr + _, err := syntax.ParseDID(did) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) } - return c.JSON(200, out) -} -func (s *BGS) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error { - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit") - defer span.End() - did := c.QueryParam("did") var out *comatprototypes.SyncGetLatestCommit_Output var handleErr error // func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context,did string) (*comatprototypes.SyncGetLatestCommit_Output, error) @@ -108,6 +115,29 @@ func (s *BGS) HandleComAtprotoSyncGetRecord(c echo.Context) error { commit := c.QueryParam("commit") did := c.QueryParam("did") rkey := c.QueryParam("rkey") + + _, err := syntax.ParseRecordKey(rkey) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid rkey: %s", rkey)}) + } + + _, err = syntax.ParseNSID(collection) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid collection: %s", collection)}) + } + + _, err = syntax.ParseDID(did) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) + } + + if commit != "" { + _, err = cid.Parse(commit) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid commit: %s", commit)}) + } + } + var out io.Reader var handleErr error // func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context,collection string,commit string,did string,rkey string) (io.Reader, error) @@ -123,6 +153,12 @@ func (s *BGS) HandleComAtprotoSyncGetRepo(c echo.Context) error { defer span.End() did := c.QueryParam("did") since := c.QueryParam("since") + + _, err := syntax.ParseDID(did) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) + } + var out io.Reader var handleErr error // func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context,did string,since string) (io.Reader, error) @@ -144,11 +180,17 @@ func (s *BGS) HandleComAtprotoSyncListBlobs(c echo.Context) error { var err error limit, err = strconv.Atoi(p) if err != nil { - return err + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", p)}) } } else { limit = 500 } + + _, err := syntax.ParseDID(did) + if err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) + } + since := c.QueryParam("since") var out *comatprototypes.SyncListBlobs_Output var handleErr error @@ -170,7 +212,7 @@ func (s *BGS) HandleComAtprotoSyncListRepos(c echo.Context) error { var err error limit, err = strconv.Atoi(p) if err != nil { - return err + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", p)}) } } else { limit = 500 @@ -191,7 +233,7 @@ func (s *BGS) HandleComAtprotoSyncNotifyOfUpdate(c echo.Context) error { var body comatprototypes.SyncNotifyOfUpdate_Input if err := c.Bind(&body); err != nil { - return err + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid body: %s", err)}) } var handleErr error // func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context,body *comatprototypes.SyncNotifyOfUpdate_Input) error @@ -208,7 +250,7 @@ func (s *BGS) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { var body comatprototypes.SyncRequestCrawl_Input if err := c.Bind(&body); err != nil { - return err + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid body: %s", err)}) } var handleErr error // func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatprototypes.SyncRequestCrawl_Input) error