Skip to content

Commit

Permalink
Get repos from the BGS in Search (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 authored Sep 25, 2023
2 parents a2219fc + 8b10711 commit 9ac5e8a
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 22 deletions.
5 changes: 5 additions & 0 deletions atproto/identity/base_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ import (
"net/http"

"github.com/bluesky-social/indigo/atproto/syntax"
"golang.org/x/time/rate"
)

// The zero value ('BaseDirectory{}') is a usable Directory.
type BaseDirectory struct {
// if non-empty, this string should have URL method, hostname, and optional port; it should not have a path or trailing slash
PLCURL string
// If not nil, this limiter will be used to rate-limit requests to the PLCURL
PLCLimiter *rate.Limiter
// If not nil, this function will be called inline with DID Web lookups, and can be used to limit the number of requests to a given hostname
DIDWebLimitFunc func(ctx context.Context, hostname string) error
// HTTP client used for did:web, did:plc, and HTTP (well-known) handle resolution
HTTPClient http.Client
// DNS resolver used for DNS handle resolution. Calling code can use a custom Dialer to query against a specific DNS server, or re-implement the interface for even more control over the resolution process
Expand Down
30 changes: 30 additions & 0 deletions atproto/identity/cache_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/hashicorp/golang-lru/v2/expirable"
)
Expand All @@ -29,6 +31,26 @@ type IdentityEntry struct {
Err error
}

var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_cache_hits",
Help: "Number of cache hits for ATProto handle lookups",
})

var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_cache_misses",
Help: "Number of cache misses for ATProto handle lookups",
})

var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_cache_hits",
Help: "Number of cache hits for ATProto identity lookups",
})

var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_cache_misses",
Help: "Number of cache misses for ATProto identity lookups",
})

var _ Directory = (*CacheDirectory)(nil)

// Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration.
Expand Down Expand Up @@ -89,6 +111,7 @@ func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy
maybeEntry, ok := d.handleCache.Get(h)

if !ok {
handleCacheMisses.Inc()
entry, err = d.updateHandle(ctx, h)
if err != nil {
return "", err
Expand All @@ -97,10 +120,13 @@ func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy
entry = &maybeEntry
}
if d.IsHandleStale(entry) {
handleCacheMisses.Inc()
entry, err = d.updateHandle(ctx, h)
if err != nil {
return "", err
}
} else {
handleCacheHits.Inc()
}
return entry.DID, entry.Err
}
Expand Down Expand Up @@ -136,6 +162,7 @@ func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identi
maybeEntry, ok := d.identityCache.Get(did)

if !ok {
identityCacheMisses.Inc()
entry, err = d.updateDID(ctx, did)
if err != nil {
return nil, err
Expand All @@ -144,10 +171,13 @@ func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identi
entry = &maybeEntry
}
if d.IsIdentityStale(entry) {
identityCacheMisses.Inc()
entry, err = d.updateDID(ctx, did)
if err != nil {
return nil, err
}
} else {
identityCacheHits.Inc()
}
return entry.Identity, entry.Err
}
Expand Down
14 changes: 14 additions & 0 deletions atproto/identity/did.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (d *BaseDirectory) ResolveDIDWeb(ctx context.Context, did syntax.DID) (*DID

// TODO: use a more robust client
// TODO: allow ctx to specify unsafe http:// resolution, for testing?

if d.DIDWebLimitFunc != nil {
if err := d.DIDWebLimitFunc(ctx, hostname); err != nil {
return nil, fmt.Errorf("did:web limit func returned an error for (%s): %w", hostname, err)
}
}

resp, err := http.Get("https://" + hostname + "/.well-known/did.json")
// look for NXDOMAIN
var dnsErr *net.DNSError
Expand Down Expand Up @@ -101,6 +108,13 @@ func (d *BaseDirectory) ResolveDIDPLC(ctx context.Context, did syntax.DID) (*DID
if plcURL == "" {
plcURL = DefaultPLCURL
}

if d.PLCLimiter != nil {
if err := d.PLCLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("failed to wait for PLC limiter: %w", err)
}
}

resp, err := http.Get(plcURL + "/" + did.String())
if err != nil {
return nil, fmt.Errorf("failed did:plc directory resolution: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func DefaultBackfillOptions() *BackfillOptions {
ParallelRecordCreates: 100,
NSIDFilter: "",
SyncRequestsPerSecond: 2,
CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getCheckout",
CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getRepo",
}
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
// GET and CAR decode the body
client := &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
Timeout: 120 * time.Second,
Timeout: 600 * time.Second,
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
Expand Down
46 changes: 27 additions & 19 deletions backfill/gormstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,37 @@ func NewGormstore(db *gorm.DB) *Gormstore {
}

func (s *Gormstore) LoadJobs(ctx context.Context) error {
// Load all jobs from the database
var dbjobs []*GormDBJob
if err := s.db.Find(&dbjobs).Error; err != nil {
return err
}

limit := 20_000
offset := 0
s.lk.Lock()
defer s.lk.Unlock()

// Convert them to in-memory jobs
for i := range dbjobs {
dbj := dbjobs[i]
j := &Gormjob{
repo: dbj.Repo,
state: dbj.State,
bufferedOps: map[string][]*bufferedOp{},
createdAt: dbj.CreatedAt,
updatedAt: dbj.UpdatedAt,

dbj: dbj,
db: s.db,
for {
var dbjobs []*GormDBJob
// Load all jobs from the database
if err := s.db.Limit(limit).Offset(offset).Find(&dbjobs).Error; err != nil {
return err
}
if len(dbjobs) == 0 {
break
}
offset += len(dbjobs)

// Convert them to in-memory jobs
for i := range dbjobs {
dbj := dbjobs[i]
j := &Gormjob{
repo: dbj.Repo,
state: dbj.State,
bufferedOps: map[string][]*bufferedOp{},
createdAt: dbj.CreatedAt,
updatedAt: dbj.UpdatedAt,

dbj: dbj,
db: s.db,
}
s.jobs[dbj.Repo] = j
}
s.jobs[dbj.Repo] = j
}

return nil
Expand Down
10 changes: 9 additions & 1 deletion cmd/palomar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

_ "github.com/joho/godotenv/autoload"
"golang.org/x/time/rate"

"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/search"
Expand Down Expand Up @@ -132,6 +133,12 @@ var runCmd = &cli.Command{
Value: 20,
EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"},
},
&cli.IntFlag{
Name: "plc-rate-limit",
Usage: "max number of requests per second to PLC registry",
Value: 100,
EnvVars: []string{"PALOMAR_PLC_RATE_LIMIT"},
},
},
Action: func(cctx *cli.Context) error {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Expand All @@ -155,10 +162,11 @@ var runCmd = &cli.Command{
HTTPClient: http.Client{
Timeout: time.Second * 15,
},
PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1),
TryAuthoritativeDNS: true,
SkipDNSDomainSuffixes: []string{".bsky.social"},
}
dir := identity.NewCacheDirectory(&base, 200000, time.Hour*24, time.Minute*2)
dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2)

srv, err := search.NewServer(
db,
Expand Down
65 changes: 65 additions & 0 deletions search/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
bsky "github.com/bluesky-social/indigo/api/bsky"
Expand Down Expand Up @@ -51,6 +52,7 @@ func (s *Server) RunIndexer(ctx context.Context) error {
return fmt.Errorf("loading backfill jobs: %w", err)
}
go s.bf.Start()
go s.discoverRepos()

d := websocket.DefaultDialer
u, err := url.Parse(s.bgshost)
Expand Down Expand Up @@ -92,6 +94,15 @@ func (s *Server) RunIndexer(ctx context.Context) error {
return nil
}

// Check if we've backfilled this repo, if not, we should enqueue it
job, err := s.bfs.GetJob(ctx, evt.Repo)
if job == nil && err == nil {
logEvt.Info("enqueueing backfill job for new repo")
if err := s.bfs.EnqueueJob(evt.Repo); err != nil {
logEvt.Warn("failed to enqueue backfill job", "err", err)
}
}

r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
if err != nil {
// TODO: handle this case (instead of return nil)
Expand Down Expand Up @@ -159,6 +170,60 @@ func (s *Server) RunIndexer(ctx context.Context) error {
)
}

func (s *Server) discoverRepos() {
ctx := context.Background()
log := s.logger.With("func", "discoverRepos")
log.Info("starting repo discovery")

cursor := ""
limit := int64(500)

totalEnqueued := 0
totalSkipped := 0
totalErrored := 0

for {
resp, err := comatproto.SyncListRepos(ctx, s.bgsxrpc, cursor, limit)
if err != nil {
log.Error("failed to list repos", "err", err)
time.Sleep(5 * time.Second)
continue
}
log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor)
enqueued := 0
skipped := 0
errored := 0
for _, repo := range resp.Repos {
job, err := s.bfs.GetJob(ctx, repo.Did)
if job == nil && err == nil {
log.Info("enqueuing backfill job for new repo", "did", repo.Did)
if err := s.bfs.EnqueueJob(repo.Did); err != nil {
log.Warn("failed to enqueue backfill job", "err", err)
errored++
continue
}
enqueued++
} else if err != nil {
log.Warn("failed to get backfill job", "did", repo.Did, "err", err)
errored++
} else {
skipped++
}
}
log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored)
totalEnqueued += enqueued
totalSkipped += skipped
totalErrored += errored
if resp.Cursor != nil && *resp.Cursor != "" {
cursor = *resp.Cursor
} else {
break
}
}

log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored)
}

func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error {
// Since this gets called in a backfill job, we need to check if the path is a post or profile
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
Expand Down
53 changes: 53 additions & 0 deletions search/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,59 @@ func (s *Server) handleSearchActorsSkeleton(e echo.Context) error {
return e.JSON(200, out)
}

type IndexError struct {
DID string `json:"did"`
Err string `json:"err"`
}

func (s *Server) handleIndexRepos(e echo.Context) error {
ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleIndexRepos")
defer span.End()

dids, ok := e.QueryParams()["did"]
if !ok {
return e.JSON(400, map[string]any{
"error": "must pass at least one did to index",
})
}

for _, did := range dids {
_, err := syntax.ParseDID(did)
if err != nil {
return e.JSON(400, map[string]any{
"error": fmt.Sprintf("invalid DID (%s): %s", did, err),
})
}
}

errs := []IndexError{}
successes := 0
skipped := 0
for _, did := range dids {
job, err := s.bfs.GetJob(ctx, did)
if job == nil && err == nil {
err := s.bfs.EnqueueJob(did)
if err != nil {
errs = append(errs, IndexError{
DID: did,
Err: err.Error(),
})
continue
}
successes++
continue
}
skipped++
}

return e.JSON(200, map[string]any{
"numEnqueued": successes,
"numSkipped": skipped,
"numErrored": len(errs),
"errors": errs,
})
}

func (s *Server) SearchPosts(ctx context.Context, q string, offset, size int) (*SearchPostsSkeletonResp, error) {
resp, err := DoSearchPosts(ctx, s.dir, s.escli, s.postIndex, q, offset, size)
if err != nil {
Expand Down
Loading

0 comments on commit 9ac5e8a

Please sign in to comment.