Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Improve pinning implementation to meet spec compliance #290

Merged
merged 12 commits into from
Jun 24, 2022
24 changes: 12 additions & 12 deletions cmd/barge/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,15 @@ func (c *EstClient) CollectionsListDir(ctx context.Context, coluuid, path string
return out, nil
}

func (c *EstClient) PinAdd(ctx context.Context, root cid.Cid, name string, origins []string, meta map[string]interface{}) (*types.IpfsPinStatus, error) {
func (c *EstClient) PinAdd(ctx context.Context, root cid.Cid, name string, origins []string, meta map[string]interface{}) (*types.IpfsPinStatusResponse, error) {
p := &types.IpfsPin{
Cid: root.String(),
CID: root.String(),
Name: name,
Origins: origins,
Meta: meta,
}

var resp types.IpfsPinStatus
var resp types.IpfsPinStatusResponse
_, err := c.doRequest(ctx, "POST", "/pinning/pins", p, &resp)
if err != nil {
return nil, err
Expand All @@ -327,8 +327,8 @@ func (c *EstClient) PinAdd(ctx context.Context, root cid.Cid, name string, origi
return &resp, nil
}

func (c *EstClient) PinStatus(ctx context.Context, reqid string) (*types.IpfsPinStatus, error) {
var resp types.IpfsPinStatus
func (c *EstClient) PinStatus(ctx context.Context, reqid string) (*types.IpfsPinStatusResponse, error) {
var resp types.IpfsPinStatusResponse
_, err := c.doRequest(ctx, "GET", "/pinning/pins/"+reqid, nil, &resp)
if err != nil {
return nil, err
Expand All @@ -339,7 +339,7 @@ func (c *EstClient) PinStatus(ctx context.Context, reqid string) (*types.IpfsPin

type listPinsResp struct {
Count int
Results []*types.IpfsPinStatus
Results []*types.IpfsPinStatusResponse
}

func shouldRetry(err error) bool {
Expand Down Expand Up @@ -381,31 +381,31 @@ func (c *EstClient) doRequestRetries(ctx context.Context, method, path string, b
}
}

func (c *EstClient) PinStatuses(ctx context.Context, reqids []string) (map[string]*types.IpfsPinStatus, error) {
func (c *EstClient) PinStatuses(ctx context.Context, reqids []string) (map[string]*types.IpfsPinStatusResponse, error) {
var resp listPinsResp
_, err := c.doRequestRetries(ctx, "GET", "/pinning/pins?requestid="+strings.Join(reqids, ","), nil, &resp, 5)
if err != nil {
return nil, err
}

out := make(map[string]*types.IpfsPinStatus)
out := make(map[string]*types.IpfsPinStatusResponse)
for _, res := range resp.Results {
out[res.Requestid] = res
out[res.RequestID] = res
}

return out, nil
}

func (c *EstClient) PinStatusByCid(ctx context.Context, cids []string) (map[string]*types.IpfsPinStatus, error) {
func (c *EstClient) PinStatusByCid(ctx context.Context, cids []string) (map[string]*types.IpfsPinStatusResponse, error) {
var resp listPinsResp
_, err := c.doRequest(ctx, "GET", "/pinning/pins?cid="+strings.Join(cids, ","), nil, &resp)
if err != nil {
return nil, err
}

out := make(map[string]*types.IpfsPinStatus)
out := make(map[string]*types.IpfsPinStatusResponse)
for _, res := range resp.Results {
out[res.Pin.Cid] = res
out[res.Pin.CID] = res
}

return out, nil
Expand Down
43 changes: 22 additions & 21 deletions cmd/barge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
cli "github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/application-research/estuary/pinner/types"
dagsplit "github.com/application-research/estuary/util/dagsplit"
)

Expand Down Expand Up @@ -378,7 +379,7 @@ func doAddPin(ctx context.Context, bstore blockstore.Blockstore, client *EstClie
fmt.Println("failed to connect to pin delegates: ", err)
}

pins := []string{st.Requestid}
pins := []string{st.RequestID}
for range time.Tick(time.Second * 2) {
var pinning, queued, pinned, failed int
for _, p := range pins {
Expand All @@ -389,13 +390,13 @@ func doAddPin(ctx context.Context, bstore blockstore.Blockstore, client *EstClie
}

switch status.Status {
case "pinned":
case types.PinningStatusPinned:
pinned++
case "failed":
case types.PinningStatusFailed:
failed++
case "pinning":
case types.PinningStatusPinning:
pinning++
case "queued":
case types.PinningStatusQueued:
queued++
}

Expand Down Expand Up @@ -750,7 +751,7 @@ var plumbSplitAddFileCmd = &cli.Command{
fmt.Println("failed to connect to pin delegates: ", err)
}

pins = append(pins, st.Requestid)
pins = append(pins, st.RequestID)
}

for range time.Tick(time.Second * 2) {
Expand All @@ -763,13 +764,13 @@ var plumbSplitAddFileCmd = &cli.Command{
}

switch status.Status {
case "pinned":
case types.PinningStatusPinned:
pinned++
case "failed":
case types.PinningStatusFailed:
failed++
case "pinning":
case types.PinningStatusPinning:
pinning++
case "queued":
case types.PinningStatusQueued:
queued++
}

Expand Down Expand Up @@ -1206,7 +1207,7 @@ var bargeStatusCmd = &cli.Command{
if len(pins) > 0 {
pin := pins[0]

if pin.Status == "pinned" {
if pin.Status == types.PinningStatusPinned {
// unchanged and pinned, no need to print anything
continue
}
Expand Down Expand Up @@ -1237,7 +1238,7 @@ type fileWithPin struct {

Cid string
Path string
Status string
Status types.PinningStatus
RequestID string
}

Expand Down Expand Up @@ -1302,7 +1303,7 @@ var bargeSyncCmd = &cli.Command{
continue
}

if f.Status == "pinned" {
if f.Status == types.PinningStatusPinned {
// TODO: add flag to allow a forced rechecking
continue
}
Expand Down Expand Up @@ -1336,12 +1337,12 @@ var bargeSyncCmd = &cli.Command{
}

switch st.Status {
case "pinned":
case types.PinningStatusPinned:
pinComplete = append(pinComplete, fp)
if err := r.DB.Model(Pin{}).Where("id = ?", fp.PinID).UpdateColumn("status", st.Status).Error; err != nil {
return err
}
case "failed":
case types.PinningStatusFailed:
needsNewPin = append(needsNewPin, fp)
if err := r.DB.Delete(Pin{ID: fp.PinID}).Error; err != nil {
return err
Expand Down Expand Up @@ -1375,15 +1376,15 @@ var bargeSyncCmd = &cli.Command{
continue
}

if pin.Status == "failed" {
if pin.Status == types.PinningStatusFailed {
// dont bother recording
continue
}

if err := r.DB.Create(&Pin{
File: nnp.FileID,
Cid: nnp.Cid,
RequestID: pin.Requestid,
RequestID: pin.RequestID,
Status: pin.Status,
}).Error; err != nil {
return err
Expand Down Expand Up @@ -1445,7 +1446,7 @@ var bargeSyncCmd = &cli.Command{
p := &Pin{
File: f.FileID,
Cid: fcid.String(),
RequestID: resp.Requestid,
RequestID: resp.RequestID,
Status: resp.Status,
}

Expand Down Expand Up @@ -1525,13 +1526,13 @@ var bargeSyncCmd = &cli.Command{
}

switch status.Status {
case "pinned":
case types.PinningStatusPinned:
newdone++
complete[req] = true
if err := r.DB.Model(Pin{}).Where("request_id = ?", req).UpdateColumn("status", "pinned").Error; err != nil {
if err := r.DB.Model(Pin{}).Where("request_id = ?", req).UpdateColumn("status", types.PinningStatusPinned).Error; err != nil {
return err
}
case "failed":
case types.PinningStatusFailed:
newdone++
failed[req] = true
if err := r.DB.Model(Pin{}).Where("request_id = ?", req).Delete(Pin{}).Error; err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/barge/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"time"

"github.com/application-research/estuary/pinner/types"
flatfs "github.com/ipfs/go-ds-flatfs"
leveldb "github.com/ipfs/go-ds-leveldb"
"github.com/ipfs/go-filestore"
Expand Down Expand Up @@ -50,7 +51,7 @@ type Pin struct {
File uint `gorm:"index"`
Cid string
RequestID string `gorm:"index"`
Status string
Status types.PinningStatus
}

func findRepo(cctx *cli.Context) (string, error) {
Expand Down
Loading