Skip to content

Commit

Permalink
renterd,sqlite: use metadata store for AllKeysChan
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Mar 5, 2024
1 parent c13d968 commit c551056
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
25 changes: 25 additions & 0 deletions persist/sqlite/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,28 @@ INNER JOIN blocks AS b ON (b.id = fs.child_id)`
})
return
}

// Pinned returns the next n pinned CIDs. If there are no remaining CIDs, the
// returned value should be (nil, nil)
func (s *Store) Pinned(offset, limit int) (roots []cid.Cid, err error) {
err = s.transaction(func(tx *txn) error {
rows, err := tx.Query(`SELECT b.cid FROM pinned_blocks pb
INNER JOIN blocks b ON (b.id=pb.block_id)
ORDER BY b.id ASC
LIMIT $1 OFFSET $2`, limit, offset)
if err != nil {
return fmt.Errorf("failed to query root cids: %w", err)
}
defer rows.Close()

for rows.Next() {
var root cid.Cid
if err := rows.Scan(dbDecode(&root)); err != nil {
return fmt.Errorf("failed to scan root cid: %w", err)
}
roots = append(roots, root)
}
return rows.Err()
})
return
}
1 change: 1 addition & 0 deletions renterd/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type (
Pin(PinnedBlock) error
Unpin(c cid.Cid) error

Pinned(offset, limit int) (roots []cid.Cid, err error)
BlockLocation(c cid.Cid) (bucket, key string, err error)
}
)
26 changes: 7 additions & 19 deletions renterd/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,38 +173,26 @@ func (bs *BlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
log := bs.log.Named("AllKeysChan")
ch := make(chan cid.Cid)
go func() {
var marker string
for {
resp, err := bs.busClient.ListObjects(ctx, bs.bucket, api.ListObjectOptions{
Marker: marker,
Limit: 100,
})

for i := 0; ; i += 1000 {
cids, err := bs.metadata.Pinned(i, 1000)
if err != nil {
bs.log.Error("failed to get root CIDs", zap.Error(err))
close(ch)
return
} else if len(resp.Objects) == 0 {
} else if len(cids) == 0 {
close(ch)
return
}

for _, obj := range resp.Objects {
name := obj.Name
if p := strings.Split(obj.Name, "/"); len(p) > 1 {
name = p[len(p)-1]
}
c, err := cid.Parse(name)
if err != nil {
log.Debug("skipping invalid key", zap.String("key", obj.Name), zap.String("name", name))
continue
}
log.Debug("found key", zap.Stringer("cid", c))
log.Debug("got root CIDs", zap.Int("count", len(cids)))
for _, c := range cids {
select {
case ch <- c:
case <-ctx.Done():
close(ch)
return
}
marker = obj.Name
}
}
}()
Expand Down

0 comments on commit c551056

Please sign in to comment.