Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get repos from the BGS in Search #335

Merged
merged 15 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions atproto/identity/base_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ import (
"net/http"

"github.com/bluesky-social/indigo/atproto/syntax"
"golang.org/x/time/rate"
)

// The zero value ('BaseDirectory{}') is a usable Directory.
type BaseDirectory struct {
// if non-empty, this string should have URL method, hostname, and optional port; it should not have a path or trailing slash
PLCURL string
// If not nil, this limiter will be used to rate-limit requests to the PLCURL
PLCLimiter *rate.Limiter
// If not nil, this function will be called inline with DID Web lookups, and can be used to limit the number of requests to a given hostname
DIDWebLimitFunc func(ctx context.Context, hostname string) error
// HTTP client used for did:web, did:plc, and HTTP (well-known) handle resolution
HTTPClient http.Client
// DNS resolver used for DNS handle resolution. Calling code can use a custom Dialer to query against a specific DNS server, or re-implement the interface for even more control over the resolution process
Expand Down
30 changes: 30 additions & 0 deletions atproto/identity/cache_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/hashicorp/golang-lru/v2/expirable"
)
Expand All @@ -29,6 +31,26 @@ type IdentityEntry struct {
Err error
}

var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_cache_hits",
Help: "Number of cache hits for ATProto handle lookups",
})

var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_cache_misses",
Help: "Number of cache misses for ATProto handle lookups",
})

var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_cache_hits",
Help: "Number of cache hits for ATProto identity lookups",
})

var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_cache_misses",
Help: "Number of cache misses for ATProto identity lookups",
})

var _ Directory = (*CacheDirectory)(nil)

// Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration.
Expand Down Expand Up @@ -89,6 +111,7 @@ func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy
maybeEntry, ok := d.handleCache.Get(h)

if !ok {
handleCacheMisses.Inc()
entry, err = d.updateHandle(ctx, h)
if err != nil {
return "", err
Expand All @@ -97,10 +120,13 @@ func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy
entry = &maybeEntry
}
if d.IsHandleStale(entry) {
handleCacheMisses.Inc()
entry, err = d.updateHandle(ctx, h)
if err != nil {
return "", err
}
} else {
handleCacheHits.Inc()
}
return entry.DID, entry.Err
}
Expand Down Expand Up @@ -136,6 +162,7 @@ func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identi
maybeEntry, ok := d.identityCache.Get(did)

if !ok {
identityCacheMisses.Inc()
entry, err = d.updateDID(ctx, did)
if err != nil {
return nil, err
Expand All @@ -144,10 +171,13 @@ func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identi
entry = &maybeEntry
}
if d.IsIdentityStale(entry) {
identityCacheMisses.Inc()
entry, err = d.updateDID(ctx, did)
if err != nil {
return nil, err
}
} else {
identityCacheHits.Inc()
}
return entry.Identity, entry.Err
}
Expand Down
14 changes: 14 additions & 0 deletions atproto/identity/did.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (d *BaseDirectory) ResolveDIDWeb(ctx context.Context, did syntax.DID) (*DID

// TODO: use a more robust client
// TODO: allow ctx to specify unsafe http:// resolution, for testing?

if d.DIDWebLimitFunc != nil {
if err := d.DIDWebLimitFunc(ctx, hostname); err != nil {
return nil, fmt.Errorf("did:web limit func returned an error for (%s): %w", hostname, err)
}
}

resp, err := http.Get("https://" + hostname + "/.well-known/did.json")
// look for NXDOMAIN
var dnsErr *net.DNSError
Expand Down Expand Up @@ -101,6 +108,13 @@ func (d *BaseDirectory) ResolveDIDPLC(ctx context.Context, did syntax.DID) (*DID
if plcURL == "" {
plcURL = DefaultPLCURL
}

if d.PLCLimiter != nil {
if err := d.PLCLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("failed to wait for PLC limiter: %w", err)
}
}

resp, err := http.Get(plcURL + "/" + did.String())
if err != nil {
return nil, fmt.Errorf("failed did:plc directory resolution: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func DefaultBackfillOptions() *BackfillOptions {
ParallelRecordCreates: 100,
NSIDFilter: "",
SyncRequestsPerSecond: 2,
CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getCheckout",
CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getRepo",
}
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
// GET and CAR decode the body
client := &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
Timeout: 120 * time.Second,
Timeout: 600 * time.Second,
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
Expand Down
46 changes: 27 additions & 19 deletions backfill/gormstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,37 @@ func NewGormstore(db *gorm.DB) *Gormstore {
}

func (s *Gormstore) LoadJobs(ctx context.Context) error {
// Load all jobs from the database
var dbjobs []*GormDBJob
if err := s.db.Find(&dbjobs).Error; err != nil {
return err
}

limit := 20_000
offset := 0
s.lk.Lock()
defer s.lk.Unlock()

// Convert them to in-memory jobs
for i := range dbjobs {
dbj := dbjobs[i]
j := &Gormjob{
repo: dbj.Repo,
state: dbj.State,
bufferedOps: map[string][]*bufferedOp{},
createdAt: dbj.CreatedAt,
updatedAt: dbj.UpdatedAt,

dbj: dbj,
db: s.db,
for {
var dbjobs []*GormDBJob
// Load all jobs from the database
if err := s.db.Limit(limit).Offset(offset).Find(&dbjobs).Error; err != nil {
return err
}
if len(dbjobs) == 0 {
break
}
offset += len(dbjobs)

// Convert them to in-memory jobs
for i := range dbjobs {
dbj := dbjobs[i]
j := &Gormjob{
repo: dbj.Repo,
state: dbj.State,
bufferedOps: map[string][]*bufferedOp{},
createdAt: dbj.CreatedAt,
updatedAt: dbj.UpdatedAt,

dbj: dbj,
db: s.db,
}
s.jobs[dbj.Repo] = j
}
s.jobs[dbj.Repo] = j
}

return nil
Expand Down
10 changes: 9 additions & 1 deletion cmd/palomar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

_ "github.com/joho/godotenv/autoload"
"golang.org/x/time/rate"

"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/search"
Expand Down Expand Up @@ -132,6 +133,12 @@ var runCmd = &cli.Command{
Value: 20,
EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"},
},
&cli.IntFlag{
Name: "plc-rate-limit",
Usage: "max number of requests per second to PLC registry",
Value: 100,
EnvVars: []string{"PALOMAR_PLC_RATE_LIMIT"},
},
},
Action: func(cctx *cli.Context) error {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Expand All @@ -155,10 +162,11 @@ var runCmd = &cli.Command{
HTTPClient: http.Client{
Timeout: time.Second * 15,
},
PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1),
TryAuthoritativeDNS: true,
SkipDNSDomainSuffixes: []string{".bsky.social"},
}
dir := identity.NewCacheDirectory(&base, 200000, time.Hour*24, time.Minute*2)
dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2)

srv, err := search.NewServer(
db,
Expand Down
65 changes: 65 additions & 0 deletions search/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
bsky "github.com/bluesky-social/indigo/api/bsky"
Expand Down Expand Up @@ -51,6 +52,7 @@ func (s *Server) RunIndexer(ctx context.Context) error {
return fmt.Errorf("loading backfill jobs: %w", err)
}
go s.bf.Start()
go s.discoverRepos()

d := websocket.DefaultDialer
u, err := url.Parse(s.bgshost)
Expand Down Expand Up @@ -92,6 +94,15 @@ func (s *Server) RunIndexer(ctx context.Context) error {
return nil
}

// Check if we've backfilled this repo, if not, we should enqueue it
job, err := s.bfs.GetJob(ctx, evt.Repo)
if job == nil && err == nil {
logEvt.Info("enqueueing backfill job for new repo")
if err := s.bfs.EnqueueJob(evt.Repo); err != nil {
logEvt.Warn("failed to enqueue backfill job", "err", err)
}
}

r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
if err != nil {
// TODO: handle this case (instead of return nil)
Expand Down Expand Up @@ -159,6 +170,60 @@ func (s *Server) RunIndexer(ctx context.Context) error {
)
}

func (s *Server) discoverRepos() {
ctx := context.Background()
log := s.logger.With("func", "discoverRepos")
log.Info("starting repo discovery")

cursor := ""
limit := int64(500)

totalEnqueued := 0
totalSkipped := 0
totalErrored := 0

for {
resp, err := comatproto.SyncListRepos(ctx, s.bgsxrpc, cursor, limit)
if err != nil {
log.Error("failed to list repos", "err", err)
time.Sleep(5 * time.Second)
continue
}
log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor)
enqueued := 0
skipped := 0
errored := 0
for _, repo := range resp.Repos {
job, err := s.bfs.GetJob(ctx, repo.Did)
if job == nil && err == nil {
log.Info("enqueuing backfill job for new repo", "did", repo.Did)
if err := s.bfs.EnqueueJob(repo.Did); err != nil {
log.Warn("failed to enqueue backfill job", "err", err)
errored++
continue
}
enqueued++
} else if err != nil {
log.Warn("failed to get backfill job", "did", repo.Did, "err", err)
errored++
} else {
skipped++
}
}
log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored)
totalEnqueued += enqueued
totalSkipped += skipped
totalErrored += errored
if resp.Cursor != nil && *resp.Cursor != "" {
cursor = *resp.Cursor
} else {
break
}
}

log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored)
}

func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error {
// Since this gets called in a backfill job, we need to check if the path is a post or profile
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
Expand Down
53 changes: 53 additions & 0 deletions search/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,59 @@ func (s *Server) handleSearchActorsSkeleton(e echo.Context) error {
return e.JSON(200, out)
}

type IndexError struct {
DID string `json:"did"`
Err string `json:"err"`
}

func (s *Server) handleIndexRepos(e echo.Context) error {
ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleIndexRepos")
defer span.End()

dids, ok := e.QueryParams()["did"]
if !ok {
return e.JSON(400, map[string]any{
"error": "must pass at least one did to index",
})
}

for _, did := range dids {
_, err := syntax.ParseDID(did)
if err != nil {
return e.JSON(400, map[string]any{
"error": fmt.Sprintf("invalid DID (%s): %s", did, err),
})
}
}

errs := []IndexError{}
successes := 0
skipped := 0
for _, did := range dids {
job, err := s.bfs.GetJob(ctx, did)
if job == nil && err == nil {
err := s.bfs.EnqueueJob(did)
if err != nil {
errs = append(errs, IndexError{
DID: did,
Err: err.Error(),
})
continue
}
successes++
continue
}
skipped++
}

return e.JSON(200, map[string]any{
"numEnqueued": successes,
"numSkipped": skipped,
"numErrored": len(errs),
"errors": errs,
})
}

func (s *Server) SearchPosts(ctx context.Context, q string, offset, size int) (*SearchPostsSkeletonResp, error) {
resp, err := DoSearchPosts(ctx, s.dir, s.escli, s.postIndex, q, offset, size)
if err != nil {
Expand Down
Loading