diff --git a/cmd/barge/client.go b/cmd/barge/client.go index d363e2c0..cd71270b 100644 --- a/cmd/barge/client.go +++ b/cmd/barge/client.go @@ -310,15 +310,15 @@ func (c *EstClient) CollectionsListDir(ctx context.Context, coluuid, path string return out, nil } -func (c *EstClient) PinAdd(ctx context.Context, root cid.Cid, name string, origins []string, meta map[string]interface{}) (*types.IpfsPinStatus, error) { +func (c *EstClient) PinAdd(ctx context.Context, root cid.Cid, name string, origins []string, meta map[string]interface{}) (*types.IpfsPinStatusResponse, error) { p := &types.IpfsPin{ - Cid: root.String(), + CID: root.String(), Name: name, Origins: origins, Meta: meta, } - var resp types.IpfsPinStatus + var resp types.IpfsPinStatusResponse _, err := c.doRequest(ctx, "POST", "/pinning/pins", p, &resp) if err != nil { return nil, err @@ -327,8 +327,8 @@ func (c *EstClient) PinAdd(ctx context.Context, root cid.Cid, name string, origi return &resp, nil } -func (c *EstClient) PinStatus(ctx context.Context, reqid string) (*types.IpfsPinStatus, error) { - var resp types.IpfsPinStatus +func (c *EstClient) PinStatus(ctx context.Context, reqid string) (*types.IpfsPinStatusResponse, error) { + var resp types.IpfsPinStatusResponse _, err := c.doRequest(ctx, "GET", "/pinning/pins/"+reqid, nil, &resp) if err != nil { return nil, err @@ -339,7 +339,7 @@ func (c *EstClient) PinStatus(ctx context.Context, reqid string) (*types.IpfsPin type listPinsResp struct { Count int - Results []*types.IpfsPinStatus + Results []*types.IpfsPinStatusResponse } func shouldRetry(err error) bool { @@ -381,31 +381,31 @@ func (c *EstClient) doRequestRetries(ctx context.Context, method, path string, b } } -func (c *EstClient) PinStatuses(ctx context.Context, reqids []string) (map[string]*types.IpfsPinStatus, error) { +func (c *EstClient) PinStatuses(ctx context.Context, reqids []string) (map[string]*types.IpfsPinStatusResponse, error) { var resp listPinsResp _, err := c.doRequestRetries(ctx, "GET", "/pinning/pins?requestid="+strings.Join(reqids, ","), nil, &resp, 5) if err != nil { return nil, err } - out := make(map[string]*types.IpfsPinStatus) + out := make(map[string]*types.IpfsPinStatusResponse) for _, res := range resp.Results { - out[res.Requestid] = res + out[res.RequestID] = res } return out, nil } -func (c *EstClient) PinStatusByCid(ctx context.Context, cids []string) (map[string]*types.IpfsPinStatus, error) { +func (c *EstClient) PinStatusByCid(ctx context.Context, cids []string) (map[string]*types.IpfsPinStatusResponse, error) { var resp listPinsResp _, err := c.doRequest(ctx, "GET", "/pinning/pins?cid="+strings.Join(cids, ","), nil, &resp) if err != nil { return nil, err } - out := make(map[string]*types.IpfsPinStatus) + out := make(map[string]*types.IpfsPinStatusResponse) for _, res := range resp.Results { - out[res.Pin.Cid] = res + out[res.Pin.CID] = res } return out, nil diff --git a/cmd/barge/main.go b/cmd/barge/main.go index 5de82b12..b66b7882 100644 --- a/cmd/barge/main.go +++ b/cmd/barge/main.go @@ -51,6 +51,7 @@ import ( cli "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "github.com/application-research/estuary/pinner/types" dagsplit "github.com/application-research/estuary/util/dagsplit" ) @@ -378,7 +379,7 @@ func doAddPin(ctx context.Context, bstore blockstore.Blockstore, client *EstClie fmt.Println("failed to connect to pin delegates: ", err) } - pins := []string{st.Requestid} + pins := []string{st.RequestID} for range time.Tick(time.Second * 2) { var pinning, queued, pinned, failed int for _, p := range pins { @@ -389,13 +390,13 @@ func doAddPin(ctx context.Context, bstore blockstore.Blockstore, client *EstClie } switch status.Status { - case "pinned": + case types.PinningStatusPinned: pinned++ - case "failed": + case types.PinningStatusFailed: failed++ - case "pinning": + case types.PinningStatusPinning: pinning++ - case "queued": + case types.PinningStatusQueued: queued++ } @@ -750,7 +751,7 @@ var plumbSplitAddFileCmd = &cli.Command{ fmt.Println("failed to connect to pin delegates: ", err) } - pins = append(pins, st.Requestid) + pins = append(pins, st.RequestID) } for range time.Tick(time.Second * 2) { @@ -763,13 +764,13 @@ var plumbSplitAddFileCmd = &cli.Command{ } switch status.Status { - case "pinned": + case types.PinningStatusPinned: pinned++ - case "failed": + case types.PinningStatusFailed: failed++ - case "pinning": + case types.PinningStatusPinning: pinning++ - case "queued": + case types.PinningStatusQueued: queued++ } @@ -1206,7 +1207,7 @@ var bargeStatusCmd = &cli.Command{ if len(pins) > 0 { pin := pins[0] - if pin.Status == "pinned" { + if pin.Status == types.PinningStatusPinned { // unchanged and pinned, no need to print anything continue } @@ -1237,7 +1238,7 @@ type fileWithPin struct { Cid string Path string - Status string + Status types.PinningStatus RequestID string } @@ -1302,7 +1303,7 @@ var bargeSyncCmd = &cli.Command{ continue } - if f.Status == "pinned" { + if f.Status == types.PinningStatusPinned { // TODO: add flag to allow a forced rechecking continue } @@ -1336,12 +1337,12 @@ var bargeSyncCmd = &cli.Command{ } switch st.Status { - case "pinned": + case types.PinningStatusPinned: pinComplete = append(pinComplete, fp) if err := r.DB.Model(Pin{}).Where("id = ?", fp.PinID).UpdateColumn("status", st.Status).Error; err != nil { return err } - case "failed": + case types.PinningStatusFailed: needsNewPin = append(needsNewPin, fp) if err := r.DB.Delete(Pin{ID: fp.PinID}).Error; err != nil { return err @@ -1375,7 +1376,7 @@ var bargeSyncCmd = &cli.Command{ continue } - if pin.Status == "failed" { + if pin.Status == types.PinningStatusFailed { // dont bother recording continue } @@ -1383,7 +1384,7 @@ var bargeSyncCmd = &cli.Command{ if err := r.DB.Create(&Pin{ File: nnp.FileID, Cid: nnp.Cid, - RequestID: pin.Requestid, + RequestID: pin.RequestID, Status: pin.Status, }).Error; err != nil { return err @@ -1445,7 +1446,7 @@ var bargeSyncCmd = &cli.Command{ p := &Pin{ File: f.FileID, Cid: fcid.String(), - RequestID: resp.Requestid, + RequestID: resp.RequestID, Status: resp.Status, } @@ -1525,13 +1526,13 @@ var bargeSyncCmd = &cli.Command{ } switch status.Status { - case "pinned": + case types.PinningStatusPinned: newdone++ complete[req] = true - if err := r.DB.Model(Pin{}).Where("request_id = ?", req).UpdateColumn("status", "pinned").Error; err != nil { + if err := r.DB.Model(Pin{}).Where("request_id = ?", req).UpdateColumn("status", types.PinningStatusPinned).Error; err != nil { return err } - case "failed": + case types.PinningStatusFailed: newdone++ failed[req] = true if err := r.DB.Model(Pin{}).Where("request_id = ?", req).Delete(Pin{}).Error; err != nil { diff --git a/cmd/barge/repo.go b/cmd/barge/repo.go index f84b43ef..4707fb80 100644 --- a/cmd/barge/repo.go +++ b/cmd/barge/repo.go @@ -6,6 +6,7 @@ import ( "path/filepath" "time" + "github.com/application-research/estuary/pinner/types" flatfs "github.com/ipfs/go-ds-flatfs" leveldb "github.com/ipfs/go-ds-leveldb" "github.com/ipfs/go-filestore" @@ -50,7 +51,7 @@ type Pin struct { File uint `gorm:"index"` Cid string RequestID string `gorm:"index"` - Status string + Status types.PinningStatus } func findRepo(cctx *cli.Context) (string, error) { diff --git a/cmd/estuary-shuttle/main.go b/cmd/estuary-shuttle/main.go index 8f9d6f44..2aed94a8 100644 --- a/cmd/estuary-shuttle/main.go +++ b/cmd/estuary-shuttle/main.go @@ -6,7 +6,6 @@ import ( "encoding/json" "flag" "fmt" - "github.com/application-research/estuary/node/modules/peering" "io" "io/ioutil" "net/http" @@ -18,6 +17,9 @@ import ( "sync" "time" + "github.com/application-research/estuary/node/modules/peering" + "github.com/application-research/estuary/pinner/types" + "github.com/application-research/estuary/config" estumetrics "github.com/application-research/estuary/metrics" "github.com/application-research/estuary/util/gateway" @@ -49,7 +51,7 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" + lotusTypes "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-blockservice" @@ -157,6 +159,10 @@ func overrideSetOptions(flags []cli.Flag, cctx *cli.Context, cfg *config.Shuttle } func main() { + // set global time to UTC + utc, _ := time.LoadLocation("UTC") + time.Local = utc + logging.SetLogLevel("dt-impl", "debug") logging.SetLogLevel("shuttle", "debug") logging.SetLogLevel("paych", "debug") @@ -975,8 +981,8 @@ func (d *Shuttle) AuthRequired(level int) echo.MiddlewareFunc { log.Warnw("User not authorized", "user", u.ID, "perms", u.Perms, "required", level) return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusUnauthorized, + Reason: util.ERR_NOT_AUTHORIZED, } } } @@ -1118,7 +1124,8 @@ func (s *Shuttle) handleAdd(c echo.Context, u *User) error { if u.StorageDisabled || s.disableLocalAdding { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_ADDING_DISABLED, + Reason: util.ERR_CONTENT_ADDING_DISABLED, + Details: "uploading content to this node is not allowed at the moment", } } @@ -1138,7 +1145,7 @@ func (s *Shuttle) handleAdd(c echo.Context, u *User) error { if !u.FlagSplitContent() && mpf.Size > util.DefaultContentSizeLimit { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, Details: fmt.Sprintf("content size %d bytes, is over upload size limit of %d bytes, and content splitting is not enabled, please reduce the content size", mpf.Size, util.DefaultContentSizeLimit), } } @@ -1183,7 +1190,7 @@ func (s *Shuttle) handleAdd(c echo.Context, u *User) error { pin := &Pin{ Content: contid, - Cid: util.DbCID{nd.Cid()}, + Cid: util.DbCID{CID: nd.Cid()}, UserID: u.ID, Active: false, @@ -1251,7 +1258,8 @@ func (s *Shuttle) handleAddCar(c echo.Context, u *User) error { if u.StorageDisabled || s.disableLocalAdding { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_ADDING_DISABLED, + Reason: util.ERR_CONTENT_ADDING_DISABLED, + Details: "uploading content to this node is not allowed at the moment", } } @@ -1269,7 +1277,7 @@ func (s *Shuttle) handleAddCar(c echo.Context, u *User) error { if bdSize > util.DefaultContentSizeLimit { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, Details: fmt.Sprintf("content size %d bytes, is over upload size of limit %d bytes, and content splitting is not enabled, please reduce the content size", bdSize, util.DefaultContentSizeLimit), } } @@ -1322,7 +1330,7 @@ func (s *Shuttle) handleAddCar(c echo.Context, u *User) error { pin := &Pin{ Content: contid, - Cid: util.DbCID{root}, + Cid: util.DbCID{CID: root}, UserID: u.ID, Active: false, @@ -1475,7 +1483,7 @@ func (d *Shuttle) doPinning(ctx context.Context, op *pinner.PinningOperation, cb defer span.End() for _, pi := range op.Peers { - if err := d.Node.Host.Connect(ctx, pi); err != nil { + if err := d.Node.Host.Connect(ctx, *pi); err != nil { log.Warnf("failed to connect to origin node for pinning operation: %s", err) } } @@ -1633,9 +1641,9 @@ func (d *Shuttle) addDatabaseTrackingToContent(ctx context.Context, contid uint, return nil } -func (d *Shuttle) onPinStatusUpdate(cont uint, status string) { +func (d *Shuttle) onPinStatusUpdate(cont uint, location string, status types.PinningStatus) error { log.Infof("updating pin status: %d %s", cont, status) - if status == "failed" { + if status == types.PinningStatusFailed { if err := d.DB.Model(Pin{}).Where("content = ?", cont).UpdateColumns(map[string]interface{}{ "pinning": false, "active": false, @@ -1658,6 +1666,7 @@ func (d *Shuttle) onPinStatusUpdate(cont uint, status string) { log.Errorf("failed to send pin status update: %s", err) } }() + return nil } func (s *Shuttle) refreshPinQueue() error { @@ -1681,14 +1690,14 @@ func (s *Shuttle) refreshPinQueue() error { return nil } -func (s *Shuttle) addPinToQueue(p Pin, peers []peer.AddrInfo, replace uint) { +func (s *Shuttle) addPinToQueue(p Pin, peers []*peer.AddrInfo, replace uint) { op := &pinner.PinningOperation{ ContId: p.Content, UserId: p.UserID, Obj: p.Cid.CID, Peers: peers, Started: p.CreatedAt, - Status: "queued", + Status: types.PinningStatusQueued, Replace: replace, } @@ -1901,7 +1910,7 @@ func (s *Shuttle) GarbageCollect(ctx context.Context) error { count := 0 for c := range keys { - del, err := s.deleteIfNotPinned(ctx, &Object{Cid: util.DbCID{c}}) + del, err := s.deleteIfNotPinned(ctx, &Object{Cid: util.DbCID{CID: c}}) if err != nil { return err } @@ -2179,7 +2188,7 @@ func (s *Shuttle) handleImportDeal(c echo.Context, u *User) error { var cc cid.Cid var deals []*api.MarketDeal for _, id := range body.DealIDs { - deal, err := s.Api.StateMarketStorageDeal(ctx, abi.DealID(id), types.EmptyTSK) + deal, err := s.Api.StateMarketStorageDeal(ctx, abi.DealID(id), lotusTypes.EmptyTSK) if err != nil { return fmt.Errorf("getting deal info from chain: %w", err) } diff --git a/cmd/estuary-shuttle/rpc.go b/cmd/estuary-shuttle/rpc.go index 29dfd6f4..1afcbd8a 100644 --- a/cmd/estuary-shuttle/rpc.go +++ b/cmd/estuary-shuttle/rpc.go @@ -6,6 +6,7 @@ import ( "github.com/application-research/estuary/drpc" "github.com/application-research/estuary/pinner" + "github.com/application-research/estuary/pinner/types" "github.com/application-research/estuary/util" dagsplit "github.com/application-research/estuary/util/dagsplit" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -112,7 +113,7 @@ func (d *Shuttle) addPin(ctx context.Context, contid uint, data cid.Cid, user ui Params: drpc.MsgParams{ UpdatePinStatus: &drpc.UpdatePinStatus{ DBID: contid, - Status: "failed", + Status: types.PinningStatusFailed, }, }, }); err != nil { @@ -142,9 +143,8 @@ func (d *Shuttle) addPin(ctx context.Context, contid uint, data cid.Cid, user ui // good, no pin found with this content id, lets create it pin := &Pin{ Content: contid, - Cid: util.DbCID{data}, + Cid: util.DbCID{CID: data}, UserID: user, - Active: false, Pinning: true, } @@ -155,11 +155,10 @@ func (d *Shuttle) addPin(ctx context.Context, contid uint, data cid.Cid, user ui } op := &pinner.PinningOperation{ - Obj: data, - ContId: contid, - UserId: user, - Status: "queued", - + Obj: data, + ContId: contid, + UserId: user, + Status: types.PinningStatusQueued, SkipLimiter: skipLimiter, } @@ -334,11 +333,10 @@ func (d *Shuttle) handleRpcAggregateContent(ctx context.Context, cmd *drpc.Aggre } pin := &Pin{ - Content: cmd.DBID, - Cid: util.DbCID{cmd.Root}, - UserID: cmd.UserID, - Size: totalSize, - + Content: cmd.DBID, + Cid: util.DbCID{CID: cmd.Root}, + UserID: cmd.UserID, + Size: totalSize, Active: false, Pinning: true, Aggregate: true, @@ -609,7 +607,7 @@ func (s *Shuttle) handleRpcSplitContent(ctx context.Context, req *drpc.SplitCont } pin := &Pin{ - Cid: util.DbCID{c}, + Cid: util.DbCID{CID: c}, Content: contid, Active: false, Pinning: true, diff --git a/docs/docs.go b/docs/docs.go index c40d021a..0129bcb7 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1675,7 +1675,7 @@ const docTemplate = `{ "details": { "type": "string" }, - "message": { + "reason": { "type": "string" } } diff --git a/docs/swagger.json b/docs/swagger.json index 6629b6b2..a5205ee7 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1668,7 +1668,7 @@ "details": { "type": "string" }, - "message": { + "reason": { "type": "string" } } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 2a521370..451a800c 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -94,7 +94,7 @@ definitions: type: integer details: type: string - message: + reason: type: string type: object host: api.estuary.tech diff --git a/drpc/rpc.go b/drpc/rpc.go index 54da929c..0b9f3186 100644 --- a/drpc/rpc.go +++ b/drpc/rpc.go @@ -1,6 +1,7 @@ package drpc import ( + "github.com/application-research/estuary/pinner/types" "github.com/application-research/filclient" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -58,7 +59,7 @@ type AddPin struct { DBID uint UserId uint Cid cid.Cid - Peers []peer.AddrInfo + Peers []*peer.AddrInfo } const CMD_TakeContent = "TakeContent" @@ -176,7 +177,7 @@ const OP_UpdatePinStatus = "UpdatePinStatus" type UpdatePinStatus struct { DBID uint - Status string + Status types.PinningStatus } type PinObj struct { diff --git a/gc.go b/gc.go index b2218c57..082b4ca5 100644 --- a/gc.go +++ b/gc.go @@ -71,14 +71,14 @@ func (cm *ContentManager) trackingObject(c cid.Cid) (bool, error) { return count > 0, nil } -func (cm *ContentManager) RemoveContent(ctx context.Context, c uint, now bool) error { +func (cm *ContentManager) removeContent(ctx context.Context, contID uint, now bool) error { ctx, span := cm.tracer.Start(ctx, "RemoveContent") defer span.End() cm.contentLk.Lock() defer cm.contentLk.Unlock() - if err := cm.DB.Delete(&Content{}, c).Error; err != nil { + if err := cm.DB.Delete(&Content{}, contID).Error; err != nil { return fmt.Errorf("failed to delete content from db: %w", err) } @@ -86,11 +86,11 @@ func (cm *ContentManager) RemoveContent(ctx context.Context, c uint, now bool) e Object uint } - if err := cm.DB.Model(&ObjRef{}).Find(&objIds, "content = ?", c).Error; err != nil { + if err := cm.DB.Model(&ObjRef{}).Find(&objIds, "content = ?", contID).Error; err != nil { return fmt.Errorf("failed to gather referenced object IDs: %w", err) } - if err := cm.DB.Where("content = ?", c).Delete(&ObjRef{}).Error; err != nil { + if err := cm.DB.Where("content = ?", contID).Delete(&ObjRef{}).Error; err != nil { return fmt.Errorf("failed to delete related object references: %w", err) } @@ -142,7 +142,6 @@ func (cm *ContentManager) RemoveContent(ctx context.Context, c uint, now bool) e return err } } - return nil } @@ -175,7 +174,6 @@ func (cm *ContentManager) unpinContent(ctx context.Context, contid uint) error { return err } } - return nil } @@ -188,6 +186,7 @@ func (cm *ContentManager) deleteIfNotPinned(ctx context.Context, o *Object) (boo return cm.deleteIfNotPinnedLock(ctx, o) } + func (cm *ContentManager) deleteIfNotPinnedLock(ctx context.Context, o *Object) (bool, error) { ctx, span := cm.tracer.Start(ctx, "deleteIfNotPinnedLock") defer span.End() @@ -215,7 +214,6 @@ func (cm *ContentManager) clearUnreferencedObjects(ctx context.Context, objs []* Delete(Object{}).Error; err != nil { return err } - return nil } diff --git a/handlers.go b/handlers.go index 509e753c..a7eec8d7 100644 --- a/handlers.go +++ b/handlers.go @@ -73,11 +73,6 @@ import ( _ "github.com/application-research/estuary/docs" ) -// generic response models -type GenericResponse struct { - Message string `json: "Message"` -} - // @title Estuary API // @version 0.0.0 // @description This is the API for the Estuary application. @@ -361,9 +356,12 @@ func withUser(f func(echo.Context, *User) error) func(echo.Context) error { return func(c echo.Context) error { u, ok := c.Get("user").(*User) if !ok { - return fmt.Errorf("endpoint not called with proper authentication") + return &util.HttpError{ + Code: http.StatusUnauthorized, + Reason: util.ERR_INVALID_AUTH, + Details: "endpoint not called with proper authentication", + } } - return f(c, u) } } @@ -471,7 +469,10 @@ func (s *Server) handlePeeringPeersAdd(c echo.Context) error { log.Errorf("handlePeeringPeersAdd error on Decode: %s", err) return c.JSON(http.StatusBadRequest, util.PeeringPeerAddMessage{ - "Adding Peer(s) on Peering failed, the peerID is invalid: " + peerParam.ID, params}) + Message: "Adding Peer(s) on Peering failed, the peerID is invalid: " + peerParam.ID, + PeersAdd: params, + }, + ) } // validate the Addrs for each PeerID @@ -482,7 +483,10 @@ func (s *Server) handlePeeringPeersAdd(c echo.Context) error { log.Errorf("handlePeeringPeersAdd error: %s", err) return c.JSON(http.StatusBadRequest, util.PeeringPeerAddMessage{ - "Adding Peer(s) on Peering failed, the addr is invalid: " + addr, params}) + Message: "Adding Peer(s) on Peering failed, the addr is invalid: " + addr, + PeersAdd: params, + }, + ) } multiAddrs = append(multiAddrs, a) } @@ -490,8 +494,8 @@ func (s *Server) handlePeeringPeersAdd(c echo.Context) error { // Only add it here if all is valid. validPeersAddInfo = append(validPeersAddInfo, peer.AddrInfo{ - peerParamId, - multiAddrs, + ID: peerParamId, + Addrs: multiAddrs, }) } @@ -500,8 +504,7 @@ func (s *Server) handlePeeringPeersAdd(c echo.Context) error { for _, validPeerAddInfo := range validPeersAddInfo { s.Node.Peering.AddPeer(validPeerAddInfo) } - - return c.JSON(http.StatusOK, util.PeeringPeerAddMessage{"Added the following Peers on Peering", params}) + return c.JSON(http.StatusOK, util.PeeringPeerAddMessage{Message: "Added the following Peers on Peering", PeersAdd: params}) } // handlePeeringPeersRemove godoc @@ -516,16 +519,15 @@ func (s *Server) handlePeeringPeersRemove(c echo.Context) error { if err := c.Bind(¶ms); err != nil { log.Errorf("handlePeeringPeersRemove error: %s", err) return &util.HttpError{ - Code: http.StatusBadRequest, - Message: util.ERR_PEERING_PEERS_REMOVE_ERROR, + Code: http.StatusBadRequest, + Reason: util.ERR_PEERING_PEERS_REMOVE_ERROR, } } for _, peerId := range params { s.Node.Peering.RemovePeer(peerId) } - - return c.JSON(http.StatusOK, util.PeeringPeerRemoveMessage{"Removed the following Peers from Peering", params}) + return c.JSON(http.StatusOK, util.PeeringPeerRemoveMessage{Message: "Removed the following Peers from Peering", PeersRemove: params}) } // handlePeeringPeersList godoc @@ -562,11 +564,11 @@ func (s *Server) handlePeeringStart(c echo.Context) error { if err != nil { log.Errorf("handlePeeringStart error: %s", err) return &util.HttpError{ - Code: http.StatusBadRequest, - Message: util.ERR_PEERING_PEERS_START_ERROR, + Code: http.StatusBadRequest, + Reason: util.ERR_PEERING_PEERS_START_ERROR, } } - return c.JSON(http.StatusOK, GenericResponse{Message: "Peering Started."}) + return c.JSON(http.StatusOK, util.GenericResponse{Message: "Peering Started."}) } // handlePeeringStop godoc @@ -580,11 +582,11 @@ func (s *Server) handlePeeringStop(c echo.Context) error { if err != nil { log.Errorf("handlePeeringStop error: %s", err) return &util.HttpError{ - Code: http.StatusBadRequest, - Message: util.ERR_PEERING_PEERS_STOP_ERROR, + Code: http.StatusBadRequest, + Reason: util.ERR_PEERING_PEERS_STOP_ERROR, } } - return c.JSON(http.StatusOK, GenericResponse{Message: "Peering Stopped."}) + return c.JSON(http.StatusOK, util.GenericResponse{Message: "Peering Stopped."}) } // handlePeeringStatus godoc @@ -595,7 +597,7 @@ func (s *Server) handlePeeringStop(c echo.Context) error { // @Router /admin/peering/status [get] func (s *Server) handlePeeringStatus(c echo.Context) error { type StateResponse struct { - State string `json: "State"` + State string `json:"state"` } return c.JSON(http.StatusOK, StateResponse{State: ""}) } @@ -613,7 +615,8 @@ func (s *Server) handleAddIpfs(c echo.Context, u *User) error { if s.CM.contentAddingDisabled || u.StorageDisabled { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_ADDING_DISABLED, + Reason: util.ERR_CONTENT_ADDING_DISABLED, + Details: "uploading content to this node is not allowed at the moment", } } @@ -654,20 +657,21 @@ func (s *Server) handleAddIpfs(c echo.Context, u *User) error { filename = filepath.Base(colp) } - cols = []*CollectionRef{&CollectionRef{ - Collection: srchCol.ID, - Path: &path, - }} + cols = []*CollectionRef{ + { + Collection: srchCol.ID, + Path: &path, + }, + } } - var addrInfos []peer.AddrInfo + var origins []*peer.AddrInfo for _, p := range params.Peers { ai, err := peer.AddrInfoFromString(p) if err != nil { return err } - - addrInfos = append(addrInfos, *ai) + origins = append(origins, ai) } rcid, err := cid.Decode(params.Root) @@ -684,12 +688,12 @@ func (s *Server) handleAddIpfs(c echo.Context, u *User) error { return c.JSON(302, map[string]string{"message": "content with given cid already preserved"}) } } + makeDeal := true - pinstatus, err := s.CM.pinContent(ctx, u.ID, rcid, filename, cols, addrInfos, 0, nil, makeDeal) + pinstatus, err := s.CM.pinContent(ctx, u.ID, rcid, filename, cols, origins, 0, nil, makeDeal) if err != nil { return err } - return c.JSON(http.StatusAccepted, pinstatus) } @@ -709,7 +713,8 @@ func (s *Server) handleAddCar(c echo.Context, u *User) error { if s.CM.contentAddingDisabled || u.StorageDisabled || s.CM.localContentAddingDisabled { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_ADDING_DISABLED, + Reason: util.ERR_CONTENT_ADDING_DISABLED, + Details: "uploading content to this node is not allowed at the moment", } } @@ -727,7 +732,7 @@ func (s *Server) handleAddCar(c echo.Context, u *User) error { if bdSize > util.DefaultContentSizeLimit { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, Details: fmt.Sprintf("content size %d bytes, is over upload size of limit %d bytes, and content splitting is not enabled, please reduce the content size", bdSize, util.DefaultContentSizeLimit), } } @@ -831,7 +836,8 @@ func (s *Server) handleAdd(c echo.Context, u *User) error { } return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_ADDING_DISABLED, + Reason: util.ERR_CONTENT_ADDING_DISABLED, + Details: "uploading content to this node is not allowed at the moment", } } @@ -851,7 +857,7 @@ func (s *Server) handleAdd(c echo.Context, u *User) error { if !u.FlagSplitContent() && mpf.Size > s.CM.contentSizeLimit { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, Details: fmt.Sprintf("content size %d bytes, is over upload size limit of %d bytes, and content splitting is not enabled, please reduce the content size", mpf.Size, s.CM.contentSizeLimit), } } @@ -1049,7 +1055,7 @@ func (cm *ContentManager) addDatabaseTrackingToContent(ctx context.Context, cont objlk.Lock() objects = append(objects, &Object{ - Cid: util.DbCID{c}, + Cid: util.DbCID{CID: c}, Size: len(node.RawData()), }) objlk.Unlock() @@ -1072,7 +1078,7 @@ func (cm *ContentManager) addDatabaseTracking(ctx context.Context, u *User, dser defer span.End() content := &Content{ - Cid: util.DbCID{root}, + Cid: util.DbCID{CID: root}, Name: filename, Active: false, Pinning: true, @@ -1246,20 +1252,21 @@ type dealStatus struct { // @Router /content/status/{id} [get] func (s *Server) handleContentStatus(c echo.Context, u *User) error { ctx := c.Request().Context() - val, err := strconv.Atoi(c.Param("id")) + contID, err := strconv.Atoi(c.Param("id")) if err != nil { return err } var content Content - if err := s.DB.First(&content, "id = ?", val, u.ID).Error; err != nil { + if err := s.DB.First(&content, "id = ?", contID).Error; err != nil { return err } if content.UserID != u.ID { return &util.HttpError{ Code: http.StatusForbidden, - Message: util.ERR_NOT_AUTHORIZED, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user is not owner of specified content", } } @@ -1538,8 +1545,8 @@ func (s *Server) handleMakeDeal(c echo.Context, u *User) error { if u.Perm < util.PermLevelAdmin { return util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_INVALID_AUTH, + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, } } @@ -2044,8 +2051,8 @@ func (s *Server) handleMinersSetInfo(c echo.Context, u *User) error { if !(u.Perm >= util.PermLevelAdmin || sm.Owner == u.ID) { return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_MINER_NOT_OWNED, + Code: http.StatusUnauthorized, + Reason: util.ERR_MINER_NOT_OWNED, } } @@ -2091,8 +2098,8 @@ func (s *Server) handleSuspendMiner(c echo.Context, u *User) error { if !(u.Perm >= util.PermLevelAdmin || sm.Owner == u.ID) { return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_MINER_NOT_OWNED, + Code: http.StatusUnauthorized, + Reason: util.ERR_MINER_NOT_OWNED, } } @@ -2124,8 +2131,8 @@ func (s *Server) handleUnsuspendMiner(c echo.Context, u *User) error { if !(u.Perm >= util.PermLevelAdmin || sm.Owner == u.ID) { return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_MINER_NOT_OWNED, + Code: http.StatusUnauthorized, + Reason: util.ERR_MINER_NOT_OWNED, } } @@ -2145,7 +2152,7 @@ func (s *Server) handleAdminAddMiner(c echo.Context) error { name := c.QueryParam("name") if err := s.DB.Clauses(&clause.OnConflict{UpdateAll: true}).Create(&storageMiner{ - Address: util.DbAddr{m}, + Address: util.DbAddr{Addr: m}, Name: name, }).Error; err != nil { return err @@ -2470,9 +2477,9 @@ func (s *Server) handleGetMinerStats(c echo.Context) error { } type minerDealsResp struct { - ID uint `json:"id"` - CreatedAt time.Time - UpdatedAt time.Time + ID uint `json:"id"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` Content uint `json:"content"` PropCid util.DbCID `json:"propCid"` Miner string `json:"miner"` @@ -2483,10 +2490,9 @@ type minerDealsResp struct { DTChan string `json:"dtChan"` TransferStarted time.Time `json:"transferStarted"` TransferFinished time.Time `json:"transferFinished"` - - OnChainAt time.Time `json:"onChainAt"` - SealedAt time.Time `json:"sealedAt"` - ContentCid util.DbCID `json:"contentCid"` + OnChainAt time.Time `json:"onChainAt"` + SealedAt time.Time `json:"sealedAt"` + ContentCid util.DbCID `json:"contentCid"` } // handleGetMinerDeals godoc @@ -2530,20 +2536,21 @@ type bandwidthResponse struct { // @Param content path string true "Content ID" // @Router /content/bw-usage/{content} [get] func (s *Server) handleGetContentBandwidth(c echo.Context, u *User) error { - cont, err := strconv.Atoi(c.Param("content")) + contID, err := strconv.Atoi(c.Param("content")) if err != nil { return err } var content Content - if err := s.DB.First(&content, cont).Error; err != nil { + if err := s.DB.First(&content, contID).Error; err != nil { return err } if content.UserID != u.ID { return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user is not owner of specified content", } } @@ -2571,28 +2578,28 @@ func (s *Server) handleGetContentBandwidth(c echo.Context, u *User) error { // @Success 200 {object} string // @Router /content/aggregated/{content} [get] func (s *Server) handleGetAggregatedForContent(c echo.Context, u *User) error { - cont, err := strconv.Atoi(c.Param("content")) + contID, err := strconv.Atoi(c.Param("content")) if err != nil { return err } var content Content - if err := s.DB.First(&content, "id = ?", cont).Error; err != nil { + if err := s.DB.First(&content, "id = ?", contID).Error; err != nil { return err } if content.UserID != u.ID { return &util.HttpError{ Code: http.StatusForbidden, - Message: util.ERR_NOT_AUTHORIZED, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user is not owner of specified content", } } var sub []Content - if err := s.DB.Find(&sub, "aggregated_in = ?", cont).Error; err != nil { + if err := s.DB.Find(&sub, "aggregated_in = ?", contID).Error; err != nil { return err } - return c.JSON(http.StatusOK, sub) } @@ -2753,8 +2760,9 @@ func (s *Server) checkTokenAuth(token string) (*User, error) { if err := s.DB.First(&authToken, "token = ?", token).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return nil, &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_INVALID_TOKEN, + Code: http.StatusUnauthorized, + Reason: util.ERR_INVALID_TOKEN, + Details: "api key does not exists", } } return nil, err @@ -2762,8 +2770,8 @@ func (s *Server) checkTokenAuth(token string) (*User, error) { if authToken.Expiry.Before(time.Now()) { return nil, &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_TOKEN_EXPIRED, + Code: http.StatusUnauthorized, + Reason: util.ERR_TOKEN_EXPIRED, Details: fmt.Sprintf("token for user %d expired %s", authToken.User, authToken.Expiry), } } @@ -2772,8 +2780,9 @@ func (s *Server) checkTokenAuth(token string) (*User, error) { if err := s.DB.First(&user, "id = ?", authToken.User).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return nil, &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_INVALID_TOKEN, + Code: http.StatusUnauthorized, + Reason: util.ERR_INVALID_TOKEN, + Details: "no user exists for the spicified api key", } } return nil, err @@ -2809,8 +2818,9 @@ func (s *Server) AuthRequired(level int) echo.MiddlewareFunc { log.Warnw("api key is upload only", "user", u.ID, "perm", u.Perm, "required", level) return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "api key is upload only", } } @@ -2819,11 +2829,12 @@ func (s *Server) AuthRequired(level int) echo.MiddlewareFunc { return next(c) } - log.Warnw("User not authorized", "user", u.ID, "perm", u.Perm, "required", level) + log.Warnw("user not authorized", "user", u.ID, "perm", u.Perm, "required", level) return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user not authorized", } } } @@ -2845,26 +2856,33 @@ func (s *Server) handleRegisterUser(c echo.Context) error { if err := s.DB.First(&invite, "code = ?", reg.InviteCode).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_INVALID_INVITE, + Code: http.StatusNotFound, + Reason: util.ERR_INVALID_INVITE, } } + return err } if invite.ClaimedBy != 0 { return &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_INVITE_ALREADY_USED, + Code: http.StatusBadRequest, + Reason: util.ERR_INVITE_ALREADY_USED, } } username := strings.ToLower(reg.Username) - var exist User - if err := s.DB.First(&exist, "username = ?", username).Error; err == nil { + var exist *User + if err := s.DB.First(&exist, "username = ?", username).Error; err != nil { + if !xerrors.Is(err, gorm.ErrRecordNotFound) { + return err + } + } + + if exist != nil { return &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_USERNAME_TAKEN, + Code: http.StatusBadRequest, + Reason: util.ERR_USERNAME_TAKEN, } } @@ -2874,13 +2892,12 @@ func (s *Server) handleRegisterUser(c echo.Context) error { PassHash: reg.PasswordHash, Perm: util.PermLevelUser, } + if err := s.DB.Create(newUser).Error; err != nil { - herr := &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_USER_CREATION_FAILED, + return &util.HttpError{ + Code: http.StatusInternalServerError, + Reason: util.ERR_USER_CREATION_FAILED, } - - return fmt.Errorf("user creation failed: %s: %w", err, herr) } authToken := &AuthToken{ @@ -2924,8 +2941,8 @@ func (s *Server) handleLoginUser(c echo.Context) error { if err := s.DB.First(&user, "username = ?", strings.ToLower(body.Username)).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_USER_NOT_FOUND, + Code: http.StatusForbidden, + Reason: util.ERR_USER_NOT_FOUND, } } return err @@ -2933,8 +2950,8 @@ func (s *Server) handleLoginUser(c echo.Context) error { if user.PassHash != body.PassHash { return &util.HttpError{ - Code: http.StatusForbidden, - Message: util.ERR_INVALID_PASSWORD, + Code: http.StatusForbidden, + Reason: util.ERR_INVALID_PASSWORD, } } @@ -2981,8 +2998,8 @@ func (s *Server) handleUserChangeAddress(c echo.Context, u *User) error { log.Warnf("invalid filecoin address in change address request body: %w", err) return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: "invalid address in request body", + Code: http.StatusUnauthorized, + Reason: "invalid address in request body", } } @@ -3330,7 +3347,7 @@ func (s *Server) handleAddContentsToCollection(c echo.Context, u *User) error { } var cont Content - if err := s.DB.First(&cont, "cid = ? and user_id = ?", util.DbCID{cc}, u.ID).Error; err != nil { + if err := s.DB.First(&cont, "cid = ? and user_id = ?", util.DbCID{CID: cc}, u.ID).Error; err != nil { return fmt.Errorf("failed to find content by given cid %s: %w", cc, err) } @@ -3381,6 +3398,28 @@ func (s *Server) handleCommitCollection(c echo.Context, u *User) error { return err } + // transform listen addresses (/ip/1.2.3.4/tcp/80) into full p2p multiaddresses + // e.g. /ip/1.2.3.4/tcp/80/p2p/12D3KooWCVTKbuvrZ9ton6zma5LNhCEeZyuFtxcDzDTmWh2qPtWM + fullP2pMultiAddrs := []multiaddr.Multiaddr{} + for _, listenAddr := range s.Node.Host.Addrs() { + fullP2pAddr := fmt.Sprintf("%s/p2p/%s", listenAddr, s.Node.Host.ID()) + fullP2pMultiAddr, err := multiaddr.NewMultiaddr(fullP2pAddr) + if err != nil { + return err + } + fullP2pMultiAddrs = append(fullP2pMultiAddrs, fullP2pMultiAddr) + } + + // transform multiaddresses into AddrInfo objects + var origins []*peer.AddrInfo + for _, p := range fullP2pMultiAddrs { + ai, err := peer.AddrInfoFromP2pAddr(p) + if err != nil { + return err + } + origins = append(origins, ai) + } + bserv := blockservice.New(s.Node.Blockstore, nil) dserv := merkledag.NewDAGService(bserv) @@ -3405,37 +3444,17 @@ func (s *Server) handleCommitCollection(c echo.Context, u *User) error { // update DB with new collection CID col.CID = collectionNode.Cid().String() - err := s.DB.Model(Collection{}).Where("id = ?", col.ID).UpdateColumn("c_id", collectionNode.Cid().String()).Error - if err != nil { - return err - } - - // transform listen addresses (/ip/1.2.3.4/tcp/80) in full p2p multiaddresses - // e.g. /ip/1.2.3.4/tcp/80/p2p/12D3KooWCVTKbuvrZ9ton6zma5LNhCEeZyuFtxcDzDTmWh2qPtWM - fullP2pMultiAddrs := []multiaddr.Multiaddr{} - for _, listenAddr := range s.Node.Host.Addrs() { - fullP2pAddr := fmt.Sprintf("%s/p2p/%s", listenAddr, s.Node.Host.ID()) - fullP2pMultiAddr, err := multiaddr.NewMultiaddr(fullP2pAddr) - if err != nil { - return err - } - fullP2pMultiAddrs = append(fullP2pMultiAddrs, fullP2pMultiAddr) - } - - // transform multiaddresses into AddrInfo objects - peers, err := peer.AddrInfosFromP2pAddrs(fullP2pMultiAddrs...) - if err != nil { + if err := s.DB.Model(Collection{}).Where("id = ?", col.ID).UpdateColumn("c_id", collectionNode.Cid().String()).Error; err != nil { return err } ctx := c.Request().Context() makeDeal := false - pinstatus, err := s.CM.pinContent(ctx, u.ID, collectionNode.Cid(), collectionNode.Cid().String(), []*CollectionRef{}, peers, 0, nil, makeDeal) + pinstatus, err := s.CM.pinContent(ctx, u.ID, collectionNode.Cid(), collectionNode.Cid().String(), nil, origins, 0, nil, makeDeal) if err != nil { return err } - return c.JSON(http.StatusOK, pinstatus) } @@ -3482,7 +3501,6 @@ func (s *Server) handleGetCollectionContents(c echo.Context, u *User) error { } path := r.Path - relp, err := filepath.Rel(dir, path) if err != nil { return c.JSON(http.StatusInternalServerError, fmt.Errorf("listing CID directories is not allowed")) @@ -3550,11 +3568,9 @@ func (s *Server) handleGetCollectionContents(c echo.Context, u *User) error { Type: contentType, Size: r.Size, ContID: r.ID, - Cid: &util.DbCID{r.Cid.CID}, + Cid: &util.DbCID{CID: r.Cid.CID}, }) - } - return c.JSON(http.StatusOK, out) } @@ -3568,15 +3584,28 @@ func (s *Server) handleDeleteCollection(c echo.Context, u *User) error { coluuid := c.Param("coluuid") var col Collection - if err := s.DB.First(&col, "uuid = ? and user_id = ?", coluuid, u.ID).Error; err != nil { - return c.String(404, "Collection not found") + if err := s.DB.First(&col, "uuid = ?", coluuid).Error; err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + return &util.HttpError{ + Code: http.StatusNotFound, + Reason: util.ERR_CONTENT_NOT_FOUND, + Details: fmt.Sprintf("collection with ID(%s) was not found", coluuid), + } + } + } + + if col.UserID != u.ID { + return &util.HttpError{ + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user is not owner of specified collection", + } } if err := s.DB.Delete(&col).Error; err != nil { return err } - - return c.NoContent(200) + return c.NoContent(http.StatusOK) } func (s *Server) tracingMiddleware(next echo.HandlerFunc) echo.HandlerFunc { @@ -3789,16 +3818,15 @@ type dealMetricsInfo struct { } type metricsDealJoin struct { - CreatedAt time.Time - Failed bool - FailedAt time.Time - DealID int64 - Size int64 + CreatedAt time.Time `json:"created_at"` + Failed bool `json:"failed"` + FailedAt time.Time `json:"failed_at"` + DealID int64 `json:"deal_id"` + Size int64 `json:"size"` TransferStarted time.Time `json:"transferStarted"` TransferFinished time.Time `json:"transferFinished"` - - OnChainAt time.Time `json:"onChainAt"` - SealedAt time.Time `json:"sealedAt"` + OnChainAt time.Time `json:"onChainAt"` + SealedAt time.Time `json:"sealedAt"` } // handleMetricsDealOnChain godoc @@ -4445,7 +4473,7 @@ func (s *Server) handleAutoretrieveHeartbeat(c echo.Context) error { return err } - autoretrieve.LastConnection = time.Now().UTC() + autoretrieve.LastConnection = time.Now() if err := s.DB.Save(&autoretrieve).Error; err != nil { return err } @@ -4590,8 +4618,10 @@ func (s *Server) handleCreateContent(c echo.Context, u *User) error { return err } - if col.UserID != u.ID { - return fmt.Errorf("attempted to create content in collection %s not owned by the user (%d)", c, u.ID) + return &util.HttpError{ + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: fmt.Sprintf("attempted to create content in collection %s not owned by the user (%d)", c, u.ID), } } @@ -4671,8 +4701,8 @@ func (s *Server) handleUserClaimMiner(c echo.Context, u *User) error { if len(sigb) < 2 { return &util.HttpError{ - Code: http.StatusBadRequest, - Message: util.ERR_INVALID_INPUT, + Code: http.StatusBadRequest, + Reason: util.ERR_INVALID_INPUT, } } @@ -4690,14 +4720,14 @@ func (s *Server) handleUserClaimMiner(c echo.Context, u *User) error { if len(sm) == 0 { // This is a new miner, need to run some checks first if err := s.checkNewMiner(ctx, cmb.Miner); err != nil { - return c.JSON(400, map[string]interface{}{ + return c.JSON(http.StatusBadRequest, map[string]interface{}{ "success": false, "error": err.Error(), }) } if err := s.DB.Create(&storageMiner{ - Address: util.DbAddr{cmb.Miner}, + Address: util.DbAddr{Addr: cmb.Miner}, Name: cmb.Name, Owner: u.ID, }).Error; err != nil { @@ -4969,7 +4999,7 @@ func (s *Server) handleShuttleCreateContent(c echo.Context) error { } content := &Content{ - Cid: util.DbCID{root}, + Cid: util.DbCID{CID: root}, Name: req.Name, Active: false, Pinning: false, @@ -5003,11 +5033,10 @@ func (s *Server) withShuttleAuth() echo.MiddlewareFunc { if err := s.DB.First(&sh, "token = ?", auth).Error; err != nil { log.Warnw("Shuttle not authorized", "token", auth) return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusUnauthorized, + Reason: util.ERR_NOT_AUTHORIZED, } } - return next(c) } } @@ -5044,6 +5073,7 @@ func (s *Server) handleShuttleRepinAll(c echo.Context) error { return nil } +// this is required as ipfs pinning spec has strong requirements on response format func openApiMiddleware(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { err := next(c) @@ -5051,35 +5081,32 @@ func openApiMiddleware(next echo.HandlerFunc) echo.HandlerFunc { return nil } - var herr *util.HttpError - if xerrors.As(err, &herr) { - errmap := map[string]string{ - "reason": herr.Message, - } - if herr.Details != "" { - errmap["details"] = herr.Details - } - res := map[string]interface{}{ - "error": errmap, - } - return c.JSON(herr.Code, res) + var httpRespErr *util.HttpError + if xerrors.As(err, &httpRespErr) { + log.Errorf("handler error: %s", err) + return c.JSON(httpRespErr.Code, &util.HttpErrorResponse{ + Error: util.HttpError{ + Reason: httpRespErr.Reason, + Details: httpRespErr.Details, + }, + }) } var echoErr *echo.HTTPError if xerrors.As(err, &echoErr) { - return c.JSON(echoErr.Code, map[string]interface{}{ - "error": map[string]interface{}{ - "reason": echoErr.Message, + return c.JSON(echoErr.Code, &util.HttpErrorResponse{ + Error: util.HttpError{ + Reason: http.StatusText(echoErr.Code), + Details: echoErr.Message.(string), }, }) } log.Errorf("handler error: %s", err) - - // TODO: returning all errors out to the user smells potentially bad - return c.JSON(500, map[string]interface{}{ - "error": map[string]string{ - "reason": err.Error(), + return c.JSON(http.StatusInternalServerError, &util.HttpErrorResponse{ + Error: util.HttpError{ + Reason: http.StatusText(http.StatusInternalServerError), + Details: err.Error(), }, }) } @@ -5143,8 +5170,8 @@ func (s *Server) handleColfsAdd(c echo.Context, u *User) error { if col.UserID != u.ID { return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, Details: "user is not owner of specified collection", } } @@ -5156,8 +5183,8 @@ func (s *Server) handleColfsAdd(c echo.Context, u *User) error { if content.UserID != u.ID { return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, Details: "user is not owner of specified content", } } @@ -5171,14 +5198,9 @@ func (s *Server) handleColfsAdd(c echo.Context, u *User) error { path = &p } - if err := s.DB.Create(&CollectionRef{ - Collection: col.ID, - Content: content.ID, - Path: path, - }).Error; err != nil { - log.Errorf("failed to add content to requested collection: %s", err) + if err := s.DB.Create(&CollectionRef{Collection: col.ID, Content: content.ID, Path: path}).Error; err != nil { + return errors.Wrap(err, "failed to add content to requested collection") } - return c.JSON(http.StatusOK, map[string]string{}) } @@ -5210,7 +5232,6 @@ func (s *Server) handleGateway(c echo.Context) error { s.gwayHandler.ServeHTTP(c.Response().Writer, req) return nil } - return c.Redirect(307, redir) } @@ -5222,7 +5243,7 @@ func (s *Server) checkGatewayRedirect(proto string, cc cid.Cid, segs []string) ( } var cont Content - if err := s.DB.First(&cont, "cid = ? and active and not offloaded", &util.DbCID{cc}).Error; err != nil { + if err := s.DB.First(&cont, "cid = ? and active and not offloaded", &util.DbCID{CID: cc}).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return "", nil } diff --git a/main.go b/main.go index b7afa8ed..c6aded96 100644 --- a/main.go +++ b/main.go @@ -83,6 +83,8 @@ type Content struct { Pinning bool `json:"pinning"` PinMeta string `json:"pinMeta"` + Replace bool `json:"replace" gorm:"default:0"` + Origins string `json:"origins"` Failed bool `json:"failed"` @@ -131,7 +133,7 @@ func (s *Server) updateAutoretrieveIndex(tickInterval time.Duration, quit chan s defer ticker.Stop() for { - lastTickTime = time.Now().UTC().Add(-tickInterval) + lastTickTime = time.Now().Add(-tickInterval) // Find all autoretrieve servers that are online (that sent heartbeat) err := s.DB.Find(&autoretrieves, "last_connection > ?", lastTickTime).Error @@ -250,6 +252,10 @@ func overrideSetOptions(flags []cli.Flag, cctx *cli.Context, cfg *config.Estuary } func main() { + //set global time to UTC + utc, _ := time.LoadLocation("UTC") + time.Local = utc + logging.SetLogLevel("dt-impl", "debug") logging.SetLogLevel("estuary", "debug") logging.SetLogLevel("paych", "debug") @@ -574,7 +580,7 @@ func main() { } // TODO: this is an ugly self referential hack... should fix - pinmgr := pinner.NewPinManager(s.doPinning, nil, &pinner.PinManagerOpts{ + pinmgr := pinner.NewPinManager(s.doPinning, s.PinStatusFunc, &pinner.PinManagerOpts{ MaxActivePerUser: 20, }) go pinmgr.Run(50) @@ -635,6 +641,8 @@ func main() { if !cm.contentAddingDisabled { go func() { + // TODO - resume pin removal request + // wait for shuttles to reconnect // This is a bit of a hack, and theres probably a better way to // solve this. but its good enough for now diff --git a/pinner/pinmgr.go b/pinner/pinmgr.go index 66264737..0de4c66e 100644 --- a/pinner/pinmgr.go +++ b/pinner/pinmgr.go @@ -2,6 +2,7 @@ package pinner import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -18,11 +19,13 @@ var log = logging.Logger("pinner") type PinFunc func(context.Context, *PinningOperation, PinProgressCB) error type PinProgressCB func(int64) -type PinStatusFunc func(uint, string) +type PinStatusFunc func(contID uint, location string, status types.PinningStatus) error func NewPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts) *PinManager { if scf == nil { - scf = func(uint, string) {} + scf = func(contID uint, location string, status types.PinningStatus) error { + return nil + } } if opts == nil { opts = DefaultOpts @@ -49,16 +52,14 @@ type PinManagerOpts struct { } type PinManager struct { - pinQueueIn chan *PinningOperation - pinQueueOut chan *PinningOperation - pinComplete chan *PinningOperation - pinQueue map[uint][]*PinningOperation - activePins map[uint]int - pinQueueLk sync.Mutex - + pinQueueIn chan *PinningOperation + pinQueueOut chan *PinningOperation + pinComplete chan *PinningOperation + pinQueue map[uint][]*PinningOperation + activePins map[uint]int + pinQueueLk sync.Mutex RunPinFunc PinFunc - StatusChangeFunc func(uint, string) - + StatusChangeFunc PinStatusFunc maxActivePerUser int } @@ -68,10 +69,10 @@ type PinManager struct { type PinningOperation struct { Obj cid.Cid Name string - Peers []peer.AddrInfo - Meta map[string]interface{} + Peers []*peer.AddrInfo + Meta string - Status string + Status types.PinningStatus UserId uint ContId uint @@ -98,7 +99,7 @@ func (po *PinningOperation) fail(err error) { po.lk.Lock() po.FetchErr = err po.EndTime = time.Now() - po.Status = "failed" + po.Status = types.PinningStatusFailed po.LastUpdate = time.Now() po.lk.Unlock() } @@ -109,10 +110,10 @@ func (po *PinningOperation) complete() { po.EndTime = time.Now() po.LastUpdate = time.Now() - po.Status = "pinned" + po.Status = types.PinningStatusPinned } -func (po *PinningOperation) SetStatus(st string) { +func (po *PinningOperation) SetStatus(st types.PinningStatus) { po.lk.Lock() defer po.lk.Unlock() @@ -120,19 +121,38 @@ func (po *PinningOperation) SetStatus(st string) { po.LastUpdate = time.Now() } -func (po *PinningOperation) PinStatus() *types.IpfsPinStatus { +func (po *PinningOperation) PinStatus() *types.IpfsPinStatusResponse { po.lk.Lock() defer po.lk.Unlock() - return &types.IpfsPinStatus{ - Requestid: fmt.Sprint(po.ContId), + meta := make(map[string]interface{}, 0) + if po.Meta != "" { + if err := json.Unmarshal([]byte(po.Meta), &meta); err != nil { + log.Warnf("content %d has invalid meta: %s", po.ContId, err) + } + } + + originStrs := make([]string, 0) + for _, o := range po.Peers { + ai, err := peer.AddrInfoToP2pAddrs(o) + if err == nil { + for _, a := range ai { + originStrs = append(originStrs, a.String()) + } + } + } + + return &types.IpfsPinStatusResponse{ + RequestID: fmt.Sprint(po.ContId), Status: po.Status, Created: po.Started, Pin: types.IpfsPin{ - Cid: po.Obj.String(), - Name: po.Name, - Meta: po.Meta, + CID: po.Obj.String(), + Name: po.Name, + Origins: originStrs, + Meta: meta, }, + Info: make(map[string]interface{}, 0), /* Ref: https://github.com/ipfs/go-pinning-service-http-client/issues/12 Info: map[string]interface{}{ "obj_fetched": po.NumFetched, @@ -149,7 +169,6 @@ func (pm *PinManager) PinQueueSize() int { for _, pq := range pm.pinQueue { count += len(pq) } - return count } @@ -157,7 +176,6 @@ func (pm *PinManager) Add(op *PinningOperation) { go func() { pm.pinQueueIn <- op }() - } var maxTimeout = 24 * time.Hour @@ -166,8 +184,10 @@ func (pm *PinManager) doPinning(op *PinningOperation) error { ctx, cancel := context.WithTimeout(context.Background(), maxTimeout) defer cancel() - op.SetStatus("pinning") - pm.StatusChangeFunc(op.ContId, "pinning") + op.SetStatus(types.PinningStatusPinning) + if err := pm.StatusChangeFunc(op.ContId, op.Location, types.PinningStatusPinning); err != nil { + return err + } if err := pm.RunPinFunc(ctx, op, func(size int64) { op.lk.Lock() @@ -176,12 +196,13 @@ func (pm *PinManager) doPinning(op *PinningOperation) error { op.SizeFetched += size }); err != nil { op.fail(err) - pm.StatusChangeFunc(op.ContId, "failed") + if err2 := pm.StatusChangeFunc(op.ContId, op.Location, types.PinningStatusFailed); err2 != nil { + return err2 + } return errors.Wrap(err, "shuttle RunPinFunc failed") } op.complete() - pm.StatusChangeFunc(op.ContId, "pinned") - return nil + return pm.StatusChangeFunc(op.ContId, op.Location, types.PinningStatusPinned) } func (pm *PinManager) popNextPinOp() *PinningOperation { diff --git a/pinner/types/types.go b/pinner/types/types.go index 865563b5..18b0977e 100644 --- a/pinner/types/types.go +++ b/pinner/types/types.go @@ -2,18 +2,38 @@ package types import "time" +type PinningStatus string + +const ( + /* + - queued # pinning operation is waiting in the queue; additional info can be returned in info[status_details] + - pinning # pinning in progress; additional info can be returned in info[status_details] + - pinned # pinned successfully + - failed # pinning service was unable to finish pinning operation; additional info can be found in info[status_details] + */ + PinningStatusPinning PinningStatus = "pinning" + PinningStatusPinned PinningStatus = "pinned" + PinningStatusFailed PinningStatus = "failed" + PinningStatusQueued PinningStatus = "queued" +) + type IpfsPin struct { - Cid string `json:"cid"` + CID string `json:"cid"` Name string `json:"name"` Origins []string `json:"origins"` Meta map[string]interface{} `json:"meta"` } -type IpfsPinStatus struct { - Requestid string `json:"requestid"` - Status string `json:"status"` +type IpfsPinStatusResponse struct { + RequestID string `json:"requestid"` + Status PinningStatus `json:"status"` Created time.Time `json:"created"` - Pin IpfsPin `json:"pin"` Delegates []string `json:"delegates"` Info map[string]interface{} `json:"info"` + Pin IpfsPin `json:"pin"` +} + +type IpfsListPinStatusResponse struct { + Count int `json:"count"` + Results []*IpfsPinStatusResponse `json:"results"` } diff --git a/pinning.go b/pinning.go index 52d27ab6..ff9a05a0 100644 --- a/pinning.go +++ b/pinning.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/go-merkledag" "github.com/labstack/echo/v4" "github.com/libp2p/go-libp2p-core/peer" + "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" trace "go.opentelemetry.io/otel/trace" "golang.org/x/xerrors" @@ -26,44 +27,56 @@ import ( "gorm.io/gorm/clause" ) -func (cm *ContentManager) pinStatus(cont Content) (*types.IpfsPinStatus, error) { +func (cm *ContentManager) pinStatus(cont Content, origins []*peer.AddrInfo) (*types.IpfsPinStatusResponse, error) { + delegates := cm.pinDelegatesForContent(cont) + cm.pinLk.Lock() po, ok := cm.pinJobs[cont.ID] cm.pinLk.Unlock() if !ok { - var meta map[string]interface{} + meta := make(map[string]interface{}, 0) if cont.PinMeta != "" { if err := json.Unmarshal([]byte(cont.PinMeta), &meta); err != nil { log.Warnf("content %d has invalid pinmeta: %s", cont, err) } } - ps := &types.IpfsPinStatus{ - Requestid: fmt.Sprintf("%d", cont.ID), - Status: "pinning", + originStrs := make([]string, 0) + for _, o := range origins { + ai, err := peer.AddrInfoToP2pAddrs(o) + if err == nil { + for _, a := range ai { + originStrs = append(originStrs, a.String()) + } + } + } + + ps := &types.IpfsPinStatusResponse{ + RequestID: fmt.Sprintf("%d", cont.ID), + Status: types.PinningStatusPinning, Created: cont.CreatedAt, Pin: types.IpfsPin{ - Cid: cont.Cid.CID.String(), - Name: cont.Name, - Meta: meta, + CID: cont.Cid.CID.String(), + Name: cont.Name, + Meta: meta, + Origins: originStrs, }, - Delegates: cm.pinDelegatesForContent(cont), - Info: nil, // TODO: all sorts of extra info we could add... + Delegates: delegates, + Info: make(map[string]interface{}, 0), // TODO: all sorts of extra info we could add... } if cont.Active { - ps.Status = "pinned" + ps.Status = types.PinningStatusPinned } + if cont.Failed { - ps.Status = "failed" + ps.Status = types.PinningStatusFailed } - return ps, nil } status := po.PinStatus() - status.Delegates = cm.pinDelegatesForContent(cont) - + status.Delegates = delegates return status, nil } @@ -83,7 +96,7 @@ func (cm *ContentManager) pinDelegatesForContent(cont Content) []string { } if ai == nil { - log.Warnf("no address info for shuttle %s: %s", cont.Location, err) + log.Warnf("no address info for shuttle: %s", cont.Location) return nil } @@ -99,15 +112,23 @@ func (s *Server) doPinning(ctx context.Context, op *pinner.PinningOperation, cb ctx, span := s.tracer.Start(ctx, "doPinning") defer span.End() + // remove replacement async - move this out + if op.Replace > 0 { + go func() { + if err := s.CM.removeContent(ctx, op.Replace, true); err != nil { + log.Infof("failed to remove content in replacement: %d with: %d", op.Replace, op.ContId) + } + }() + } + for _, pi := range op.Peers { - if err := s.Node.Host.Connect(ctx, pi); err != nil { + if err := s.Node.Host.Connect(ctx, *pi); err != nil { log.Warnf("failed to connect to origin node for pinning operation: %s", err) } } bserv := blockservice.New(s.Node.Blockstore, s.Node.Bitswap) dserv := merkledag.NewDAGService(bserv) - dsess := merkledag.NewSession(ctx, dserv) if err := s.CM.addDatabaseTrackingToContent(ctx, op.ContId, dsess, op.Obj, cb); err != nil { @@ -118,12 +139,6 @@ func (s *Server) doPinning(ctx context.Context, op *pinner.PinningOperation, cb s.CM.ToCheck <- op.ContId } - if op.Replace > 0 { - if err := s.CM.RemoveContent(ctx, op.Replace, true); err != nil { - log.Infof("failed to remove content in replacement: %d", op.Replace) - } - } - // this provide call goes out immediately if err := s.Node.FullRT.Provide(ctx, op.Obj, true); err != nil { log.Warnf("provider broadcast failed: %s", err) @@ -133,10 +148,13 @@ func (s *Server) doPinning(ctx context.Context, op *pinner.PinningOperation, cb if err := s.Node.Provider.Provide(op.Obj); err != nil { log.Warnf("providing failed: %s", err) } - return nil } +func (s *Server) PinStatusFunc(contID uint, location string, status types.PinningStatus) error { + return s.CM.UpdatePinStatus(location, contID, status) +} + func (cm *ContentManager) refreshPinQueue() error { var toPin []Content if err := cm.DB.Find(&toPin, "active = false and pinning = true and not aggregate").Error; err != nil { @@ -152,52 +170,65 @@ func (cm *ContentManager) refreshPinQueue() error { // anyways makeDeal := true for _, c := range toPin { + var origins []*peer.AddrInfo + // when refreshing pinning queue, use content origins if available + if c.Origins != "" { + _ = json.Unmarshal([]byte(c.Origins), &origins) // no need to handle or log err, its just a nice to have + } + if c.Location == "local" { - cm.addPinToQueue(c, nil, 0, makeDeal) + cm.addPinToQueue(c, origins, 0, makeDeal) } else { - if err := cm.pinContentOnShuttle(context.TODO(), c, nil, 0, c.Location, makeDeal); err != nil { + if err := cm.pinContentOnShuttle(context.TODO(), c, origins, 0, c.Location, makeDeal); err != nil { log.Errorf("failed to send pin message to shuttle: %s", err) time.Sleep(time.Millisecond * 100) } } } - return nil } -func (cm *ContentManager) pinContent(ctx context.Context, user uint, obj cid.Cid, name string, cols []*CollectionRef, peers []peer.AddrInfo, replace uint, meta map[string]interface{}, makeDeal bool) (*types.IpfsPinStatus, error) { +func (cm *ContentManager) pinContent(ctx context.Context, user uint, obj cid.Cid, name string, cols []*CollectionRef, origins []*peer.AddrInfo, replaceID uint, meta map[string]interface{}, makeDeal bool) (*types.IpfsPinStatusResponse, error) { loc, err := cm.selectLocationForContent(ctx, obj, user) if err != nil { return nil, xerrors.Errorf("selecting location for content failed: %w", err) } - var metab string + if replaceID > 0 { + // mark as replace since it will removed and so it should not be fetched anymore + if err := cm.DB.Model(&Content{}).Where("id = ?", replaceID).Update("replace", true).Error; err != nil { + return nil, err + } + } + + var metaStr string if meta != nil { b, err := json.Marshal(meta) if err != nil { return nil, err } - metab = string(b) + metaStr = string(b) } - cont := Content{ - Cid: util.DbCID{obj}, + var originsStr string + if origins != nil { + b, err := json.Marshal(origins) + if err != nil { + return nil, err + } + originsStr = string(b) + } + cont := Content{ + Cid: util.DbCID{CID: obj}, Name: name, UserID: user, Active: false, Replication: cm.Replication, - - Pinning: true, - PinMeta: metab, - - Location: loc, - - /* - Size int64 `json:"size"` - Offloaded bool `json:"offloaded"` - */ - + Pinning: true, + PinMeta: metaStr, + Location: loc, + Origins: originsStr, } if err := cm.DB.Create(&cont).Error; err != nil { return nil, err @@ -217,17 +248,16 @@ func (cm *ContentManager) pinContent(ctx context.Context, user uint, obj cid.Cid } if loc == "local" { - cm.addPinToQueue(cont, peers, replace, makeDeal) + cm.addPinToQueue(cont, origins, replaceID, makeDeal) } else { - if err := cm.pinContentOnShuttle(ctx, cont, peers, replace, loc, makeDeal); err != nil { + if err := cm.pinContentOnShuttle(ctx, cont, origins, replaceID, loc, makeDeal); err != nil { return nil, err } } - - return cm.pinStatus(cont) + return cm.pinStatus(cont, origins) } -func (cm *ContentManager) addPinToQueue(cont Content, peers []peer.AddrInfo, replace uint, makeDeal bool) { +func (cm *ContentManager) addPinToQueue(cont Content, peers []*peer.AddrInfo, replaceID uint, makeDeal bool) { if cont.Location != "local" { log.Errorf("calling addPinToQueue on non-local content") } @@ -239,10 +269,11 @@ func (cm *ContentManager) addPinToQueue(cont Content, peers []peer.AddrInfo, rep Name: cont.Name, Peers: peers, Started: cont.CreatedAt, - Status: "queued", - Replace: replace, + Status: types.PinningStatusQueued, + Replace: replaceID, Location: cont.Location, MakeDeal: makeDeal, + Meta: cont.PinMeta, } cm.pinLk.Lock() @@ -253,7 +284,7 @@ func (cm *ContentManager) addPinToQueue(cont Content, peers []peer.AddrInfo, rep cm.pinMgr.Add(op) } -func (cm *ContentManager) pinContentOnShuttle(ctx context.Context, cont Content, peers []peer.AddrInfo, replace uint, handle string, makeDeal bool) error { +func (cm *ContentManager) pinContentOnShuttle(ctx context.Context, cont Content, peers []*peer.AddrInfo, replaceID uint, handle string, makeDeal bool) error { ctx, span := cm.tracer.Start(ctx, "pinContentOnShuttle", trace.WithAttributes( attribute.String("handle", handle), attribute.String("CID", cont.Cid.CID.String()), @@ -281,10 +312,11 @@ func (cm *ContentManager) pinContentOnShuttle(ctx context.Context, cont Content, Name: cont.Name, Peers: peers, Started: cont.CreatedAt, - Status: "queued", - Replace: replace, + Status: types.PinningStatusQueued, + Replace: replaceID, Location: handle, MakeDeal: makeDeal, + Meta: cont.PinMeta, } cm.pinLk.Lock() @@ -440,13 +472,14 @@ func (s *Server) handleListPins(e echo.Context, u *User) error { qcids := e.QueryParam("cid") qname := e.QueryParam("name") + qmatch := e.QueryParam("match") qstatus := e.QueryParam("status") qbefore := e.QueryParam("before") qafter := e.QueryParam("after") qlimit := e.QueryParam("limit") qreqids := e.QueryParam("requestid") - q := s.DB.Model(Content{}).Where("user_id = ? and not aggregate", u.ID).Order("created_at desc") + q := s.DB.Model(Content{}).Where("user_id = ? AND not aggregate AND not replace", u.ID).Order("created_at desc") if qcids != "" { var cids []util.DbCID @@ -455,14 +488,22 @@ func (s *Server) handleListPins(e echo.Context, u *User) error { if err != nil { return err } - cids = append(cids, util.DbCID{c}) + cids = append(cids, util.DbCID{CID: c}) } - q = q.Where("cid in ?", cids) } if qname != "" { - q = q.Where("name = ?", qname) + switch strings.ToLower(qmatch) { + case "ipartial": + q = q.Where("lower(name) like ?", fmt.Sprintf("%%%s%%", strings.ToLower(qname))) + case "partial": + q = q.Where("name like ?", fmt.Sprintf("%%%s%%", qname)) + case "iexact": + q = q.Where("lower(name) = ?", strings.ToLower(qname)) + default: //exact + q = q.Where("name = ?", qname) + } } if qbefore != "" { @@ -470,7 +511,6 @@ func (s *Server) handleListPins(e echo.Context, u *User) error { if err != nil { return err } - q = q.Where("created_at <= ?", beftime) } @@ -490,14 +530,12 @@ func (s *Server) handleListPins(e echo.Context, u *User) error { if err != nil { return err } - ids = append(ids, id) } - q = q.Where("id in ?", ids) } - var lim int + lim := 10 // default from spec if qlimit != "" { limit, err := strconv.Atoi(qlimit) if err != nil { @@ -506,141 +544,119 @@ func (s *Server) handleListPins(e echo.Context, u *User) error { lim = limit } - if lim == 0 { - lim = 500 - } - - var allowed map[string]bool + pinStatuses := make(map[types.PinningStatus]bool) if qstatus != "" { - allowed = make(map[string]bool) - /* - - queued # pinning operation is waiting in the queue; additional info can be returned in info[status_details] - - pinning # pinning in progress; additional info can be returned in info[status_details] - - pinned # pinned successfully - - failed # pinning service was unable to finish pinning operation; additional info can be found in info[status_details] - */ statuses := strings.Split(qstatus, ",") for _, s := range statuses { - switch s { - case "queued", "pinning", "pinned", "failed": - allowed[s] = true + ps := types.PinningStatus(s) + switch ps { + case types.PinningStatusQueued, types.PinningStatusPinning, types.PinningStatusPinned, types.PinningStatusFailed: + pinStatuses[ps] = true default: - return fmt.Errorf("unrecognized pin status in query: %q", s) + return &util.HttpError{ + Code: http.StatusBadRequest, + Reason: util.ERR_INVALID_PINNING_STATUS, + Details: fmt.Sprintf("unrecognized pin status in query: %q", s), + } } } } - // certain sets of statuses we can use the database to filter for - oq, dblimit, err := filterForStatusQuery(q, allowed) + q, err := filterForStatusQuery(q, pinStatuses) if err != nil { return err } - q = oq - if dblimit { - q = q.Limit(lim) + var count int64 + if err := q.Count(&count).Error; err != nil { + return err } + q.Limit(lim) + var contents []Content if err := q.Scan(&contents).Error; err != nil { return err } - var out []*types.IpfsPinStatus + out := make([]*types.IpfsPinStatusResponse, 0) for _, c := range contents { - if lim > 0 && len(out) >= lim { - break - } - - st, err := s.CM.pinStatus(c) + st, err := s.CM.pinStatus(c, nil) if err != nil { return err } - if allowed == nil || allowed[st.Status] { - out = append(out, st) - } + out = append(out, st) } - if len(out) == 0 { - out = make([]*types.IpfsPinStatus, 0) - } - return e.JSON(http.StatusOK, map[string]interface{}{ - "count": len(out), - "results": out, + return e.JSON(http.StatusOK, types.IpfsListPinStatusResponse{ + Count: int(count), + Results: out, }) } -func filterForStatusQuery(q *gorm.DB, statuses map[string]bool) (*gorm.DB, bool, error) { +func filterForStatusQuery(q *gorm.DB, statuses map[types.PinningStatus]bool) (*gorm.DB, error) { + // TODO maybe we should move all these statuses to a status column in contents if len(statuses) == 0 || len(statuses) == 4 { - // if not filtering by status, we return *all* pins, in that case we can use the query to limit results - return q, true, nil + return q, nil // if no status filter or all statuses are specified, return all pins } - pinned := statuses["pinned"] - failed := statuses["failed"] - pinning := statuses["pinning"] - queued := statuses["queued"] + pinned := statuses[types.PinningStatusPinned] + failed := statuses[types.PinningStatusFailed] + pinning := statuses[types.PinningStatusPinning] + queued := statuses[types.PinningStatusQueued] if len(statuses) == 1 { switch { case pinned: - return q.Where("active"), true, nil + return q.Where("active and not failed and not pinning"), nil case failed: - return q.Where("failed"), true, nil + return q.Where("failed and not active and not pinning"), nil + case pinning: + return q.Where("pinning and not active and not failed"), nil default: - return q, false, nil + return q.Where("not active and not pinning and not failed"), nil } } if len(statuses) == 2 { if pinned && failed { - return q.Where("active or failed"), true, nil + return q.Where("(active or failed) and not pinning"), nil + } + + if pinned && queued { + return q.Where("active and not failed and not pinning"), nil + } + + if pinned && pinning { + return q.Where("(active or pinning) and not failed"), nil + } + + if pinning && failed { + return q.Where("(pinning or failed) and not active"), nil } if pinning && queued { - return q.Where("not active and not failed"), true, nil + return q.Where("pinning and not active and not failed"), nil } - // fallthrough to the rest of the logic - } - var canUseDBLimit bool = true - // If the query is trying to distinguish between pinning and queued, we cannot do that solely via a database query - if (statuses["queued"] && !statuses["pinning"]) || (statuses["pinning"] && !statuses["queued"]) { - canUseDBLimit = false + if failed && queued { + return q.Where("failed and not active and not pinning"), nil + } } - if !statuses["failed"] { - q = q.Where("not failed") + if !statuses[types.PinningStatusFailed] { + return q.Where("not failed and (active or pinning)"), nil } - if !statuses["pinned"] { - q = q.Where("not active") + if !statuses[types.PinningStatusPinned] { + return q.Where("not active and (failed or pinning"), nil } - return q, canUseDBLimit, nil -} - -/* -{ - - "cid": "QmCIDToBePinned", - "name": "PreciousData.pdf", - "origins": - -[ - - "/ip4/203.0.113.142/tcp/4001/p2p/QmSourcePeerId", - "/ip4/203.0.113.114/udp/4001/quic/p2p/QmSourcePeerId" - -], -"meta": - - { - "app_id": "99986338-1113-4706-8302-4420da6158aa" - } - + if !statuses[types.PinningStatusPinning] { + return q.Where("not pinning and (active or failed"), nil + } + return q.Where("active or pinning or failed"), nil } -*/ // handleAddPin godoc // @Summary Add and pin object @@ -657,7 +673,8 @@ func (s *Server) handleAddPin(e echo.Context, u *User) error { if s.CM.contentAddingDisabled || u.StorageDisabled { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_ADDING_DISABLED, + Reason: util.ERR_CONTENT_ADDING_DISABLED, + Details: "uploading content to this node is not allowed at the moment", } } @@ -684,35 +701,34 @@ func (s *Server) handleAddPin(e echo.Context, u *User) error { colpath = &p } - cols = []*CollectionRef{&CollectionRef{ - Collection: srchCol.ID, - Path: colpath, - }} + cols = []*CollectionRef{ + { + Collection: srchCol.ID, + Path: colpath, + }, + } } - var addrInfos []peer.AddrInfo + var origins []*peer.AddrInfo for _, p := range pin.Origins { ai, err := peer.AddrInfoFromString(p) if err != nil { return err } - - addrInfos = append(addrInfos, *ai) + origins = append(origins, ai) } - obj, err := cid.Decode(pin.Cid) + obj, err := cid.Decode(pin.CID) if err != nil { return err } makeDeal := true - status, err := s.CM.pinContent(ctx, u.ID, obj, pin.Name, cols, addrInfos, 0, pin.Meta, makeDeal) + // TODO pinning should be async + status, err := s.CM.pinContent(ctx, u.ID, obj, pin.Name, cols, origins, 0, pin.Meta, makeDeal) if err != nil { return err } - - status.Pin.Meta = pin.Meta - return e.JSON(http.StatusAccepted, status) } @@ -724,21 +740,35 @@ func (s *Server) handleAddPin(e echo.Context, u *User) error { // @Param pinid path string true "cid" // @Router /pinning/pins/{pinid} [get] func (s *Server) handleGetPin(e echo.Context, u *User) error { - id, err := strconv.Atoi(e.Param("pinid")) + pinID, err := strconv.Atoi(e.Param("pinid")) if err != nil { return err } var content Content - if err := s.DB.First(&content, "id = ?", uint(id)).Error; err != nil { + if err := s.DB.First(&content, "id = ? AND not replace", pinID).Error; err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + return &util.HttpError{ + Code: http.StatusNotFound, + Reason: util.ERR_CONTENT_NOT_FOUND, + Details: fmt.Sprintf("content with ID(%d) was not found", pinID), + } + } return err } - st, err := s.CM.pinStatus(content) + if content.UserID != u.ID { + return &util.HttpError{ + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user is not owner of specified content", + } + } + + st, err := s.CM.pinStatus(content, nil) if err != nil { return err } - return e.JSON(http.StatusOK, st) } @@ -753,12 +783,12 @@ func (s *Server) handleReplacePin(e echo.Context, u *User) error { if s.CM.contentAddingDisabled || u.StorageDisabled { return &util.HttpError{ Code: http.StatusBadRequest, - Message: util.ERR_CONTENT_ADDING_DISABLED, + Reason: util.ERR_CONTENT_ADDING_DISABLED, + Details: "uploading content to this node is not allowed at the moment", } } - ctx := e.Request().Context() - id, err := strconv.Atoi(e.Param("pinid")) + pinID, err := strconv.Atoi(e.Param("pinid")) if err != nil { return err } @@ -769,37 +799,44 @@ func (s *Server) handleReplacePin(e echo.Context, u *User) error { } var content Content - if err := s.DB.First(&content, "id = ?", id).Error; err != nil { + if err := s.DB.First(&content, "id = ? AND not replace", pinID).Error; err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + return &util.HttpError{ + Code: http.StatusNotFound, + Reason: util.ERR_CONTENT_NOT_FOUND, + Details: fmt.Sprintf("content with ID(%d) was not found", pinID), + } + } return err } + if content.UserID != u.ID { return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user is not owner of specified content", } } - var addrInfos []peer.AddrInfo + var origins []*peer.AddrInfo for _, p := range pin.Origins { ai, err := peer.AddrInfoFromString(p) if err != nil { return err } - - addrInfos = append(addrInfos, *ai) + origins = append(origins, ai) } - obj, err := cid.Decode(pin.Cid) + pinCID, err := cid.Decode(pin.CID) if err != nil { return err } makeDeal := true - status, err := s.CM.pinContent(ctx, u.ID, obj, pin.Name, nil, addrInfos, uint(id), pin.Meta, makeDeal) + status, err := s.CM.pinContent(e.Request().Context(), u.ID, pinCID, pin.Name, nil, origins, uint(pinID), pin.Meta, makeDeal) if err != nil { return err } - return e.JSON(http.StatusAccepted, status) } @@ -811,55 +848,65 @@ func (s *Server) handleReplacePin(e echo.Context, u *User) error { // @Param pinid path string true "Pin ID" // @Router /pinning/pins/{pinid} [delete] func (s *Server) handleDeletePin(e echo.Context, u *User) error { - // TODO: need to cancel any in-progress pinning operation - ctx := e.Request().Context() - id, err := strconv.Atoi(e.Param("pinid")) + pinID, err := strconv.Atoi(e.Param("pinid")) if err != nil { return err } var content Content - if err := s.DB.First(&content, "id = ?", id).Error; err != nil { + if err := s.DB.First(&content, "id = ? AND not replace", pinID).Error; err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + return &util.HttpError{ + Code: http.StatusNotFound, + Reason: util.ERR_CONTENT_NOT_FOUND, + Details: fmt.Sprintf("content with ID(%d) was not found", pinID), + } + } return err } + if content.UserID != u.ID { return &util.HttpError{ - Code: http.StatusUnauthorized, - Message: util.ERR_NOT_AUTHORIZED, + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user is not owner of specified content", } } - // TODO: what if we delete a pin that was in progress? - if err := s.CM.unpinContent(ctx, uint(id)); err != nil { + // mark as replace since it will removed and so it should not be fetched anymore + if err := s.DB.Model(&Content{}).Where("id = ?", pinID).Update("replace", true).Error; err != nil { return err } + // unpin async + go func() { + if err := s.CM.unpinContent(e.Request().Context(), uint(pinID)); err != nil { + log.Errorf("could not unpinContent(%d): %s", err, pinID) + } + }() return e.NoContent(http.StatusAccepted) } -func (cm *ContentManager) UpdatePinStatus(handle string, cont uint, status string) { +func (cm *ContentManager) UpdatePinStatus(location string, contID uint, status types.PinningStatus) error { cm.pinLk.Lock() - op, ok := cm.pinJobs[cont] + op, ok := cm.pinJobs[contID] + cm.pinLk.Unlock() if !ok { - log.Warnw("got pin status update for unknown content", "content", cont, "status", status, "shuttle", handle) - return + return fmt.Errorf("got pin status update for unknown content: %d, status: %s, location: %s", contID, status, location) } - op.SetStatus(status) - if status == "failed" { + if status == types.PinningStatusFailed { var c Content - if err := cm.DB.First(&c, "id = ?", cont).Error; err != nil { - log.Errorf("failed to look up content: %s", err) - return + if err := cm.DB.First(&c, "id = ?", contID).Error; err != nil { + return errors.Wrap(err, "failed to look up content") } if c.Active { - log.Errorf("got failed pin status message from shuttle %s where content(%d) was already active, refusing to do anything", handle, cont) - return + return fmt.Errorf("got failed pin status message from location: %s where content(%d) was already active, refusing to do anything", location, contID) } - if err := cm.DB.Model(Content{}).Where("id = ?", cont).UpdateColumns(map[string]interface{}{ + if err := cm.DB.Model(Content{}).Where("id = ?", contID).UpdateColumns(map[string]interface{}{ "active": false, "pinning": false, "failed": true, @@ -867,6 +914,8 @@ func (cm *ContentManager) UpdatePinStatus(handle string, cont uint, status strin log.Errorf("failed to mark content as failed in database: %s", err) } } + op.SetStatus(status) + return nil } func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle string, pincomp *drpc.PinComplete) error { @@ -904,7 +953,7 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri objects := make([]*Object, 0, len(pincomp.Objects)) for _, o := range pincomp.Objects { objects = append(objects, &Object{ - Cid: util.DbCID{o.Cid}, + Cid: util.DbCID{CID: o.Cid}, Size: o.Size, }) } diff --git a/pinning_test.go b/pinning_test.go index 40f64924..ec5f2859 100644 --- a/pinning_test.go +++ b/pinning_test.go @@ -6,6 +6,7 @@ import ( "gorm.io/driver/sqlite" "gorm.io/gorm" + "github.com/application-research/estuary/pinner/types" "github.com/stretchr/testify/assert" ) @@ -19,54 +20,49 @@ func TestStatusFilterQuery(t *testing.T) { db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{DryRun: true}) assert.NoError(err) - s := map[string]bool{} + s := map[types.PinningStatus]bool{} - resp, ok, err := filterForStatusQuery(db, s) + resp, err := filterForStatusQuery(db, s) assert.NoError(err) - assert.True(ok) assert.Equal("SELECT * FROM `conts`", resp.Find([]Conts{}).Statement.SQL.String()) - s = map[string]bool{ - "failed": true, + s = map[types.PinningStatus]bool{ + types.PinningStatusFailed: true, } - resp, ok, err = filterForStatusQuery(db, s) + resp, err = filterForStatusQuery(db, s) assert.NoError(err) - assert.True(ok) - assert.Equal("SELECT * FROM `conts` WHERE failed", + assert.Equal("SELECT * FROM `conts` WHERE failed and not active and not pinning", resp.Find([]Conts{}).Statement.SQL.String()) - s = map[string]bool{ - "failed": true, - "pinned": true, + s = map[types.PinningStatus]bool{ + types.PinningStatusFailed: true, + types.PinningStatusPinned: true, } - resp, ok, err = filterForStatusQuery(db, s) + resp, err = filterForStatusQuery(db, s) assert.NoError(err) - assert.True(ok) - assert.Equal("SELECT * FROM `conts` WHERE active or failed", + assert.Equal("SELECT * FROM `conts` WHERE (active or failed) and not pinning", resp.Find([]Conts{}).Statement.SQL.String()) - s = map[string]bool{ - "pinning": true, - "pinned": true, + s = map[types.PinningStatus]bool{ + types.PinningStatusPinning: true, + types.PinningStatusPinned: true, } - resp, ok, err = filterForStatusQuery(db, s) + resp, err = filterForStatusQuery(db, s) assert.NoError(err) - assert.False(ok) - assert.Equal("SELECT * FROM `conts` WHERE not failed", + assert.Equal("SELECT * FROM `conts` WHERE (active or pinning) and not failed", resp.Find([]Conts{}).Statement.SQL.String()) - s = map[string]bool{ - "pinning": true, - "queued": true, + s = map[types.PinningStatus]bool{ + types.PinningStatusPinning: true, + types.PinningStatusQueued: true, } - resp, ok, err = filterForStatusQuery(db, s) + resp, err = filterForStatusQuery(db, s) assert.NoError(err) - assert.True(ok) - assert.Equal("SELECT * FROM `conts` WHERE not active and not failed", + assert.Equal("SELECT * FROM `conts` WHERE pinning and not active and not failed", resp.Find([]Conts{}).Statement.SQL.String()) } diff --git a/replication.go b/replication.go index ad2b59be..f3c539fc 100644 --- a/replication.go +++ b/replication.go @@ -289,7 +289,6 @@ func (cm *ContentManager) tryAddContent(cb *contentStagingZone, c Content) (bool if cb.CloseTime.Before(nowPlus) { cb.CloseTime = nowPlus } - return true, nil } diff --git a/shuttle.go b/shuttle.go index 7d440de1..cf61354c 100644 --- a/shuttle.go +++ b/shuttle.go @@ -136,8 +136,7 @@ func (cm *ContentManager) processShuttleMessage(handle string, msg *drpc.Message if ups == nil { return ErrNilParams } - cm.UpdatePinStatus(handle, ups.DBID, ups.Status) - return nil + return cm.UpdatePinStatus(handle, ups.DBID, ups.Status) case drpc.OP_PinComplete: param := msg.Params.PinComplete if param == nil { @@ -287,17 +286,13 @@ func (cm *ContentManager) handleRpcCommPComplete(ctx context.Context, handle str defer span.End() opcr := PieceCommRecord{ - Data: util.DbCID{resp.Data}, - Piece: util.DbCID{resp.CommP}, + Data: util.DbCID{CID: resp.Data}, + Piece: util.DbCID{CID: resp.CommP}, Size: resp.Size, CarSize: resp.CarSize, } - if err := cm.DB.Clauses(clause.OnConflict{DoNothing: true}).Create(&opcr).Error; err != nil { - return err - } - - return nil + return cm.DB.Clauses(clause.OnConflict{DoNothing: true}).Create(&opcr).Error } func (cm *ContentManager) handleRpcTransferStarted(ctx context.Context, handle string, param *drpc.TransferStarted) error { diff --git a/util/http.go b/util/http.go index adfb99e6..26879ee8 100644 --- a/util/http.go +++ b/util/http.go @@ -37,19 +37,25 @@ const ( ERR_PEERING_PEERS_REMOVE_ERROR = "ERR_PEERING_PEERS_REMOVE_ERROR" ERR_PEERING_PEERS_START_ERROR = "ERR_PEERING_PEERS_START_ERROR" ERR_PEERING_PEERS_STOP_ERROR = "ERR_PEERING_PEERS_STOP_ERROR" + ERR_CONTENT_NOT_FOUND = "ERR_CONTENT_NOT_FOUND" + ERR_INVALID_PINNING_STATUS = "ERR_INVALID_PINNING_STATUS" ) type HttpError struct { - Code int - Message string - Details string + Code int `json:"code,omitempty"` + Reason string `json:"reason"` + Details string `json:"details"` } func (he HttpError) Error() string { if he.Details == "" { - return he.Message + return he.Reason } - return he.Message + ": " + he.Details + return he.Reason + ": " + he.Details +} + +type HttpErrorResponse struct { + Error HttpError `json:"error"` } const ( @@ -67,6 +73,7 @@ func isValidAuth(authStr string) bool { if !matchEst && !matchSecret { return false } + // only get the uuid from the string uuidStr := strings.ReplaceAll(authStr, "SECRET", "") uuidStr = strings.ReplaceAll(uuidStr, "EST", "") @@ -77,7 +84,6 @@ func isValidAuth(authStr string) bool { if err != nil { return false } - return true } @@ -86,91 +92,79 @@ func ExtractAuth(c echo.Context) (string, error) { // undefined will be the auth value if ESTUARY_TOKEN cookie is removed. if auth == "" || auth == "undefined" { return "", &HttpError{ - Code: http.StatusForbidden, - Message: ERR_AUTH_MISSING, + Code: http.StatusUnauthorized, + Reason: ERR_AUTH_MISSING, + Details: "no api key was specified", } } parts := strings.Split(auth, " ") if len(parts) != 2 { return "", &HttpError{ - Code: http.StatusForbidden, - Message: ERR_INVALID_AUTH, + Code: http.StatusUnauthorized, + Reason: ERR_INVALID_AUTH, + Details: "invalid api key was specified", } } if parts[0] != "Bearer" { return "", &HttpError{ - Code: http.StatusForbidden, - Message: ERR_AUTH_MISSING_BEARER, + Code: http.StatusUnauthorized, + Reason: ERR_AUTH_MISSING_BEARER, + Details: "invalid api key was specified", } } - - /* - // if auth is not missing, check format first before extracting - if !isValidAuth(parts[1]) { - return "", &HttpError{ - Code: http.StatusForbidden, - Message: ERR_WRONG_AUTH_FORMAT, - } - } - */ - return parts[1], nil } type UserSettings struct { - Replication int `json:"replication"` - Verified bool `json:"verified"` - DealDuration int `json:"dealDuration"` - - MaxStagingWait time.Duration `json:"maxStagingWait"` - FileStagingThreshold int64 `json:"fileStagingThreshold"` - - ContentAddingDisabled bool `json:"contentAddingDisabled"` - DealMakingDisabled bool `json:"dealMakingDisabled"` - - UploadEndpoints []string `json:"uploadEndpoints"` - Flags int `json:"flags"` + Replication int `json:"replication"` + Verified bool `json:"verified"` + DealDuration int `json:"dealDuration"` + MaxStagingWait time.Duration `json:"maxStagingWait"` + FileStagingThreshold int64 `json:"fileStagingThreshold"` + ContentAddingDisabled bool `json:"contentAddingDisabled"` + DealMakingDisabled bool `json:"dealMakingDisabled"` + UploadEndpoints []string `json:"uploadEndpoints"` + Flags int `json:"flags"` } type ViewerResponse struct { - Username string `json:"username"` - Perms int `json:"perms"` - ID uint `json:"id"` - Address string `json:"address,omitempty"` - Miners []string `json:"miners,omitempty"` - - AuthExpiry time.Time - - Settings UserSettings `json:"settings"` + Username string `json:"username"` + Perms int `json:"perms"` + ID uint `json:"id"` + Address string `json:"address,omitempty"` + Miners []string `json:"miners,omitempty"` + AuthExpiry time.Time `json:"auth_expiry,omitempty"` + Settings UserSettings `json:"settings"` } func ErrorHandler(err error, ctx echo.Context) { - var herr *HttpError - if xerrors.As(err, &herr) { - res := map[string]string{ - "error": herr.Message, - } - if herr.Details != "" { - res["details"] = herr.Details - } - + var httpRespErr *HttpError + if xerrors.As(err, &httpRespErr) { log.Errorf("handler error: %s", err) - ctx.JSON(herr.Code, res) + ctx.JSON(httpRespErr.Code, HttpErrorResponse{Error: *httpRespErr}) return } var echoErr *echo.HTTPError if xerrors.As(err, &echoErr) { - ctx.JSON(echoErr.Code, map[string]interface{}{ - "error": echoErr.Message, + ctx.JSON(echoErr.Code, HttpErrorResponse{ + Error: HttpError{ + Code: echoErr.Code, + Reason: http.StatusText(echoErr.Code), + Details: echoErr.Message.(string), + }, }) return } log.Errorf("handler error: %s", err) - _ = ctx.JSON(500, map[string]interface{}{ - "error": err.Error(), + ctx.JSON(http.StatusInternalServerError, HttpErrorResponse{ + Error: HttpError{ + Code: http.StatusInternalServerError, + Reason: http.StatusText(http.StatusInternalServerError), + Details: err.Error(), + }, }) } diff --git a/util/peering.go b/util/peering.go index 0c4b6136..afba7df7 100644 --- a/util/peering.go +++ b/util/peering.go @@ -6,11 +6,16 @@ import ( ) type PeeringPeerAddMessage struct { - Message string `json: Message` - PeersAdd []peering.PeeringPeer `json: Peers` + Message string `json:"message"` + PeersAdd []peering.PeeringPeer `json:"peers"` } type PeeringPeerRemoveMessage struct { - Message string `json: Message` - PeersRemove []peer.ID `json: Peers` + Message string `json:"message"` + PeersRemove []peer.ID `json:"peers"` +} + +// generic response models +type GenericResponse struct { + Message string `json:"message"` }