Skip to content

Commit

Permalink
in postgres use optimized where in query (#681)
Browse files Browse the repository at this point in the history
We need to find a good way to run these tests on postgres
  • Loading branch information
ericvolp12 authored Aug 16, 2024
2 parents f07f35d + a354d28 commit 092ea1c
Showing 1 changed file with 31 additions and 32 deletions.
63 changes: 31 additions & 32 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -32,6 +33,7 @@ import (
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -1097,7 +1099,7 @@ func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error {
return nil
}

chunkSize := 10000
chunkSize := 2000
for i := 0; i < len(shs); i += chunkSize {
sl := shs[i:]
if len(sl) > chunkSize {
Expand Down Expand Up @@ -1196,31 +1198,6 @@ func (cb *compBucket) isEmpty() bool {
return len(cb.shards) == 0
}

func (cs *CarStore) copyShardBlocksFiltered(ctx context.Context, sh *CarShard, w io.Writer, keep map[cid.Cid]bool) error {
fi, err := os.Open(sh.Path)
if err != nil {
return err
}
defer fi.Close()

rr, err := car.NewCarReader(fi)
if err != nil {
return err
}

for {
blk, err := rr.Next()
if err != nil {
return err
}

if keep[blk.Cid()] {
_, err := LdWrite(w, blk.Cid().Bytes(), blk.RawData())
return err
}
}
}

func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
// TODO: some overwrite protections
fi, err := os.CreateTemp(cs.rootDir, fnameForShard(user, seq))
Expand Down Expand Up @@ -1254,27 +1231,49 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint)

span.SetAttributes(attribute.Int("shards", len(shardIds)))

chunkSize := 10000
chunkSize := 2000
out := make([]blockRef, 0, len(shardIds))
for i := 0; i < len(shardIds); i += chunkSize {
sl := shardIds[i:]
if len(sl) > chunkSize {
sl = sl[:chunkSize]
}

var brefs []blockRef
if err := cs.meta.Raw(`select * from block_refs where shard in (?)`, sl).Scan(&brefs).Error; err != nil {
return nil, err
if err := blockRefsForShards(ctx, cs.meta, sl, &out); err != nil {
return nil, fmt.Errorf("getting block refs: %w", err)
}

out = append(out, brefs...)
}

span.SetAttributes(attribute.Int("refs", len(out)))

return out, nil
}

func valuesStatementForShards(shards []uint) string {
sb := new(strings.Builder)
for i, v := range shards {
sb.WriteByte('(')
sb.WriteString(strconv.Itoa(int(v)))
sb.WriteByte(')')
if i != len(shards)-1 {
sb.WriteByte(',')
}
}
return sb.String()
}

func blockRefsForShards(ctx context.Context, db *gorm.DB, shards []uint, obuf *[]blockRef) error {
// Check the database driver
switch db.Dialector.(type) {
case *postgres.Dialector:
sval := valuesStatementForShards(shards)
q := fmt.Sprintf(`SELECT block_refs.* FROM block_refs INNER JOIN (VALUES %s) AS vals(v) ON block_refs.shard = v`, sval)
return db.Raw(q).Scan(obuf).Error
default:
return db.Raw(`SELECT * FROM block_refs WHERE shard IN (?)`, shards).Scan(obuf).Error
}
}

func shardSize(sh *CarShard) (int64, error) {
st, err := os.Stat(sh.Path)
if err != nil {
Expand Down

0 comments on commit 092ea1c

Please sign in to comment.