Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into bolson/slog
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Dec 4, 2024
2 parents b7ffd2f + ff209b5 commit 7613d5e
Show file tree
Hide file tree
Showing 31 changed files with 216 additions and 96 deletions.
10 changes: 2 additions & 8 deletions atproto/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,11 @@ func extractBlobsAtom(atom any) []Blob {
out = append(out, v)
case []any:
for _, el := range v {
down := extractBlobsAtom(el)
for _, d := range down {
out = append(out, d)
}
out = append(out, extractBlobsAtom(el)...)
}
case map[string]any:
for _, val := range v {
down := extractBlobsAtom(val)
for _, d := range down {
out = append(out, d)
}
out = append(out, extractBlobsAtom(val)...)
}
default:
}
Expand Down
10 changes: 6 additions & 4 deletions automod/engine/fetch_account_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,16 @@ func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) (
ap.AccountTags = dedupeStrings(rd.Moderation.SubjectStatus.Tags)
if rd.Moderation.SubjectStatus.ReviewState != nil {
switch *rd.Moderation.SubjectStatus.ReviewState {
case "#reviewOpen":
case "tools.ozone.moderation.defs#reviewOpen":
ap.ReviewState = ReviewStateOpen
case "#reviewEscalated":
case "tools.ozone.moderation.defs#reviewEscalated":
ap.ReviewState = ReviewStateEscalated
case "#reviewClosed":
case "tools.ozone.moderation.defs#reviewClosed":
ap.ReviewState = ReviewStateClosed
case "#reviewNonde":
case "tools.ozone.moderation.defs#reviewNone":
ap.ReviewState = ReviewStateNone
default:
logger.Warn("unexpected ozone moderation review state", "state", rd.Moderation.SubjectStatus.ReviewState, "did", ident.DID)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion automod/engine/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func slackBody(header string, acct AccountMeta, newLabels, newFlags []string, ne
msg += fmt.Sprintf("Report `%s`: %s\n", rep.ReasonType, rep.Comment)
}
if newTakedown {
msg += fmt.Sprintf("Takedown!\n")
msg += "Takedown!\n"
}
return msg
}
4 changes: 1 addition & 3 deletions automod/flagstore/flagstore_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ func (s MemFlagStore) Add(ctx context.Context, key string, flags []string) error
if !ok {
v = []string{}
}
for _, f := range flags {
v = append(v, f)
}
v = append(v, flags...)
v = dedupeStrings(v)
s.Data[key] = v
return nil
Expand Down
4 changes: 1 addition & 3 deletions automod/helpers/bsky.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (

func ExtractHashtagsPost(post *appbsky.FeedPost) []string {
var tags []string
for _, tag := range post.Tags {
tags = append(tags, tag)
}
tags = append(tags, post.Tags...)
for _, facet := range post.Facets {
for _, feat := range facet.Features {
if feat.RichtextFacet_Tag != nil {
Expand Down
2 changes: 1 addition & 1 deletion automod/rules/harassment.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func HarassmentTrivialPostRule(c *automod.RecordContext, post *appbsky.FeedPost)

if count > 5 {
//c.AddRecordFlag("trivial-harassing-post")
c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("possible targetted harassment (also labeled; remove label if this isn't harassment!)"))
c.ReportAccount(automod.ReportReasonOther, "possible targetted harassment (also labeled; remove label if this isn't harassment!)")
c.AddAccountLabel("!hide")
c.Notify("slack")
}
Expand Down
3 changes: 1 addition & 2 deletions automod/rules/nostr.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rules

import (
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -37,7 +36,7 @@ func NostrSpamPostRule(c *automod.RecordContext, post *appbsky.FeedPost) error {
return nil
}

c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("likely nostr spam account (also labeled; remove label if this isn't spam!)"))
c.ReportAccount(automod.ReportReasonOther, "likely nostr spam account (also labeled; remove label if this isn't spam!)")
c.AddAccountLabel("!hide")
c.Notify("slack")
return nil
Expand Down
3 changes: 1 addition & 2 deletions automod/rules/promo.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rules

import (
"fmt"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -54,7 +53,7 @@ func AggressivePromotionRule(c *automod.RecordContext, post *appbsky.FeedPost) e
uniqueReplies := c.GetCountDistinct("reply-to", did, countstore.PeriodDay)
if uniqueReplies >= 10 {
c.AddAccountFlag("promo-multi-reply")
c.ReportAccount(automod.ReportReasonSpam, fmt.Sprintf("possible aggressive self-promotion"))
c.ReportAccount(automod.ReportReasonSpam, "possible aggressive self-promotion")
c.Notify("slack")
}

Expand Down
4 changes: 2 additions & 2 deletions automod/rules/quick.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func BotLinkProfileRule(c *automod.RecordContext, profile *appbsky.ActorProfile)
}
if strings.Contains(*profile.Description, "🏈🍕🌀") {
c.AddAccountFlag("profile-bot-string")
c.ReportAccount(automod.ReportReasonSpam, fmt.Sprintf("possible bot based on string in profile"))
c.ReportAccount(automod.ReportReasonSpam, "possible bot based on string in profile")
c.Notify("slack")
return nil
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func TrivialSpamPostRule(c *automod.RecordContext, post *appbsky.FeedPost) error
return nil
}

c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("trivial spam account (also labeled; remove label if this isn't spam!)"))
c.ReportAccount(automod.ReportReasonOther, "trivial spam account (also labeled; remove label if this isn't spam!)")
c.AddAccountLabel("!hide")
c.Notify("slack")
return nil
Expand Down
93 changes: 60 additions & 33 deletions backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
"time"

"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/repomgr"

"github.com/ipfs/go-cid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -71,15 +74,17 @@ type Backfiller struct {
ParallelRecordCreates int
// Prefix match for records to backfill i.e. app.bsky.feed.app/
// If empty, all records will be backfilled
NSIDFilter string
CheckoutPath string
NSIDFilter string
RelayHost string

syncLimiter *rate.Limiter

magicHeaderKey string
magicHeaderVal string

stop chan chan struct{}

Directory identity.Directory
}

var (
Expand Down Expand Up @@ -110,7 +115,7 @@ type BackfillOptions struct {
ParallelRecordCreates int
NSIDFilter string
SyncRequestsPerSecond int
CheckoutPath string
RelayHost string
}

func DefaultBackfillOptions() *BackfillOptions {
Expand All @@ -119,7 +124,7 @@ func DefaultBackfillOptions() *BackfillOptions {
ParallelRecordCreates: 100,
NSIDFilter: "",
SyncRequestsPerSecond: 2,
CheckoutPath: "https://bsky.network/xrpc/com.atproto.sync.getRepo",
RelayHost: "https://bsky.network",
}
}

Expand All @@ -145,8 +150,9 @@ func NewBackfiller(
ParallelRecordCreates: opts.ParallelRecordCreates,
NSIDFilter: opts.NSIDFilter,
syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1),
CheckoutPath: opts.CheckoutPath,
RelayHost: opts.RelayHost,
stop: make(chan chan struct{}, 1),
Directory: identity.DefaultDirectory(),
}
}

Expand Down Expand Up @@ -292,25 +298,12 @@ type recordResult struct {
err error
}

// BackfillRepo backfills a repo
func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) {
ctx, span := tracer.Start(ctx, "BackfillRepo")
defer span.End()

start := time.Now()

repoDid := job.Repo()
// Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo
func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*repo.Repo, error) {
url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did)

log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid)
if job.RetryCount() > 0 {
log = log.With("retry_count", job.RetryCount())
}
log.Info(fmt.Sprintf("processing backfill for %s", repoDid))

url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid)

if job.Rev() != "" {
url = url + fmt.Sprintf("&since=%s", job.Rev())
if since != "" {
url = url + fmt.Sprintf("&since=%s", since)
}

// GET and CAR decode the body
Expand All @@ -320,8 +313,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
state := fmt.Sprintf("failed (create request: %s)", err.Error())
return state, fmt.Errorf("failed to create request: %w", err)
return nil, fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Accept", "application/vnd.ipld.car")
Expand All @@ -334,8 +326,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)

resp, err := client.Do(req)
if err != nil {
state := fmt.Sprintf("failed (do request: %s)", err.Error())
return state, fmt.Errorf("failed to send request: %w", err)
return nil, fmt.Errorf("failed to send request: %w", err)
}

if resp.StatusCode != http.StatusOK {
Expand All @@ -345,8 +336,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)
} else {
reason = resp.Status
}
state := fmt.Sprintf("failed (%s)", reason)
return state, fmt.Errorf("failed to get repo: %s", reason)
return nil, fmt.Errorf("failed to get repo: %s", reason)
}

instrumentedReader := instrumentedReader{
Expand All @@ -356,10 +346,47 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)

defer instrumentedReader.Close()

r, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
if err != nil {
state := "failed (couldn't read repo CAR from response body)"
return state, fmt.Errorf("failed to read repo from car: %w", err)
return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err)
}
return repo, nil
}

// BackfillRepo backfills a repo
func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) {
ctx, span := tracer.Start(ctx, "BackfillRepo")
defer span.End()

start := time.Now()

repoDID := job.Repo()

log := slog.With("source", "backfiller_backfill_repo", "repo", repoDID)
if job.RetryCount() > 0 {
log = log.With("retry_count", job.RetryCount())
}
log.Info(fmt.Sprintf("processing backfill for %s", repoDID))

// first try with Relay endpoint
r, err := b.fetchRepo(ctx, repoDID, job.Rev(), b.RelayHost)
if err != nil {
slog.Warn("repo CAR fetch from relay failed", "did", repoDID, "since", job.Rev(), "relayHost", b.RelayHost, "err", err)
// fallback to direct PDS fetch
ident, err := b.Directory.LookupDID(ctx, syntax.DID(repoDID))
if err != nil {
return "failed resolving DID to PDS repo", fmt.Errorf("resolving DID for PDS repo fetch: %w", err)
}
pdsHost := ident.PDSEndpoint()
if pdsHost == "" {
return "DID document missing PDS endpoint", fmt.Errorf("no PDS endpoint for DID: %s", repoDID)
}
r, err = b.fetchRepo(ctx, repoDID, job.Rev(), pdsHost)
if err != nil {
slog.Warn("repo CAR fetch from PDS failed", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err)
return "repo CAR fetch from PDS failed", err
}
slog.Info("repo CAR fetch from PDS successful", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err)
}

numRecords := 0
Expand Down Expand Up @@ -396,7 +423,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)

raw := blk.RawData()

err = b.HandleCreateRecord(ctx, repoDid, rev, item.recordPath, &raw, &item.nodeCid)
err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid)
if err != nil {
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)}
continue
Expand Down
Loading

0 comments on commit 7613d5e

Please sign in to comment.