Skip to content

Commit

Permalink
automod: test capture framework (#470)
Browse files Browse the repository at this point in the history
This PR is currently rebased on top of
#466, to demonstrate
testing that rule. **UPDATE:** that PR merged, so now against `main`

Adds a `hepa` command to "capture" the current state of a real-world
account: currently some account metadata (identity, profile, etc), plus
some recent post records. This gets serialized to JSON for easy dumping
to file, like:

```shell
go run ./cmd/hepa/ capture-recent atproto.com > automod/testdata/capture_atprotocom.json
```

Then, a test helper function which loads this file, and processes all
the post records using an engine fixture.

Combined, these fixtures make it easy to do test-driven-development of
new rules. You find an account which recently sent spam or violated some
policy, take a capture snapshot, set up a test case, and then write a
rule which triggers and satisfies the test.

Some notes:

- tried moving the "test helpers" in to a sub-package
(`indigo/automod/automodtest`) but hit a circular import, so left where
it is
- this won't work with all rule types, and some captures/rules may need
additional mocking (eg, additional identities in the mock directory),
but that should be fine
- it usually isn't appropriate to capture real-world content in to
public code. we can be careful about what we add in this repo (indigo);
the "hackerdarkweb" example included in this PR seems fine to snapshot
to me. the code does strip "Private" account metadata by default.
- probably could use docs/comments. i'm not sure where best to put
effort, feedback welcome!
  • Loading branch information
bnewbold authored Dec 15, 2023
2 parents fb51430 + 0a4a842 commit 705a15d
Show file tree
Hide file tree
Showing 15 changed files with 1,743 additions and 222 deletions.
2 changes: 1 addition & 1 deletion automod/action_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func alwaysReportAccountRule(evt *RecordEvent) error {
func TestAccountReportDedupe(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
engine := engineFixture()
engine := EngineTestFixture()
engine.Rules = RuleSet{
RecordRules: []RecordRuleFunc{
alwaysReportAccountRule,
Expand Down
21 changes: 21 additions & 0 deletions automod/capture_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package automod

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNoOpCaptureReplyRule(t *testing.T) {
assert := assert.New(t)

engine := EngineTestFixture()
capture := MustLoadCapture("testdata/capture_atprotocom.json")
assert.NoError(ProcessCaptureRules(&engine, capture))
c, err := engine.GetCount("automod-quota", "report", PeriodDay)
assert.NoError(err)
assert.Equal(0, c)
c, err = engine.GetCount("automod-quota", "takedown", PeriodDay)
assert.NoError(err)
assert.Equal(0, c)
}
4 changes: 2 additions & 2 deletions automod/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func alwaysReportRecordRule(evt *RecordEvent) error {
func TestTakedownCircuitBreaker(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
engine := engineFixture()
engine := EngineTestFixture()
dir := identity.NewMockDirectory()
engine.Directory = &dir
// note that this is a record-level action, not account-level
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestTakedownCircuitBreaker(t *testing.T) {
func TestReportCircuitBreaker(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
engine := engineFixture()
engine := EngineTestFixture()
dir := identity.NewMockDirectory()
engine.Directory = &dir
engine.Rules = RuleSet{
Expand Down
60 changes: 0 additions & 60 deletions automod/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"
"strings"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/automod/countstore"
Expand Down Expand Up @@ -167,65 +166,6 @@ func (e *Engine) ProcessRecordDelete(ctx context.Context, did syntax.DID, path s
return nil
}

func (e *Engine) FetchAndProcessRecord(ctx context.Context, aturi syntax.ATURI) error {
// resolve URI, identity, and record
if aturi.RecordKey() == "" {
return fmt.Errorf("need a full, not partial, AT-URI: %s", aturi)
}
ident, err := e.Directory.Lookup(ctx, aturi.Authority())
if err != nil {
return fmt.Errorf("resolving AT-URI authority: %v", err)
}
pdsURL := ident.PDSEndpoint()
if pdsURL == "" {
return fmt.Errorf("could not resolve PDS endpoint for AT-URI account: %s", ident.DID.String())
}
pdsClient := xrpc.Client{Host: ident.PDSEndpoint()}

e.Logger.Info("fetching record", "did", ident.DID.String(), "collection", aturi.Collection().String(), "rkey", aturi.RecordKey().String())
out, err := comatproto.RepoGetRecord(ctx, &pdsClient, "", aturi.Collection().String(), ident.DID.String(), aturi.RecordKey().String())
if err != nil {
return fmt.Errorf("fetching record from Relay (%s): %v", aturi, err)
}
if out.Cid == nil {
return fmt.Errorf("expected a CID in getRecord response")
}
return e.ProcessRecord(ctx, ident.DID, aturi.Path(), *out.Cid, out.Value.Val)
}

func (e *Engine) FetchAndProcessRecent(ctx context.Context, atid syntax.AtIdentifier, limit int) error {

ident, err := e.Directory.Lookup(ctx, atid)
if err != nil {
return fmt.Errorf("failed to resolve AT identifier: %v", err)
}
pdsURL := ident.PDSEndpoint()
if pdsURL == "" {
return fmt.Errorf("could not resolve PDS endpoint for account: %s", ident.DID.String())
}
pdsClient := xrpc.Client{Host: ident.PDSEndpoint()}

resp, err := comatproto.RepoListRecords(ctx, &pdsClient, "app.bsky.feed.post", "", int64(limit), ident.DID.String(), false, "", "")
if err != nil {
return fmt.Errorf("failed to fetch record list: %v", err)
}

e.Logger.Info("got recent posts", "did", ident.DID.String(), "pds", pdsURL, "count", len(resp.Records))
// records are most-recent first; we want recent but oldest-first, so iterate backwards
for i := range resp.Records {
rec := resp.Records[len(resp.Records)-i-1]
aturi, err := syntax.ParseATURI(rec.Uri)
if err != nil {
return fmt.Errorf("parsing PDS record response: %v", err)
}
err = e.ProcessRecord(ctx, ident.DID, aturi.Path(), rec.Cid, rec.Value.Val)
if err != nil {
return err
}
}
return nil
}

func (e *Engine) NewRecordEvent(am AccountMeta, path, recCID string, rec any) RecordEvent {
parts := strings.SplitN(path, "/", 2)
return RecordEvent{
Expand Down
55 changes: 1 addition & 54 deletions automod/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,20 @@ package automod

import (
"context"
"log/slog"
"testing"
"time"

appbsky "github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/automod/countstore"

"github.com/stretchr/testify/assert"
)

func simpleRule(evt *RecordEvent, post *appbsky.FeedPost) error {
for _, tag := range post.Tags {
if evt.InSet("bad-hashtags", tag) {
evt.AddRecordLabel("bad-hashtag")
break
}
}
for _, facet := range post.Facets {
for _, feat := range facet.Features {
if feat.RichtextFacet_Tag != nil {
tag := feat.RichtextFacet_Tag.Tag
if evt.InSet("bad-hashtags", tag) {
evt.AddRecordLabel("bad-hashtag")
break
}
}
}
}
return nil
}

func engineFixture() Engine {
rules := RuleSet{
PostRules: []PostRuleFunc{
simpleRule,
},
}
cache := NewMemCacheStore(10, time.Hour)
flags := NewMemFlagStore()
sets := NewMemSetStore()
sets.Sets["bad-hashtags"] = make(map[string]bool)
sets.Sets["bad-hashtags"]["slur"] = true
dir := identity.NewMockDirectory()
id1 := identity.Identity{
DID: syntax.DID("did:plc:abc111"),
Handle: syntax.Handle("handle.example.com"),
}
dir.Insert(id1)
engine := Engine{
Logger: slog.Default(),
Directory: &dir,
Counters: countstore.NewMemCountStore(),
Sets: sets,
Flags: flags,
Cache: cache,
Rules: rules,
}
return engine
}

func TestEngineBasics(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()

engine := engineFixture()
engine := EngineTestFixture()
id1 := identity.Identity{
DID: syntax.DID("did:plc:abc111"),
Handle: syntax.Handle("handle.example.com"),
Expand Down
20 changes: 13 additions & 7 deletions automod/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error {
}
}

// flags don't require admin auth
if len(newFlags) > 0 {
e.Engine.Flags.Add(ctx, e.Account.Identity.DID.String(), newFlags)
}

// if we can't actually talk to service, bail out early
if e.Engine.AdminClient == nil {
return nil
Expand Down Expand Up @@ -337,10 +342,6 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error {
}
}

if len(newFlags) > 0 {
e.Engine.Flags.Add(ctx, e.Account.Identity.DID.String(), newFlags)
}

// reports are additionally de-duped when persisting the action, so track with a flag
createdReports := false
for _, mr := range newReports {
Expand Down Expand Up @@ -493,9 +494,16 @@ func (e *RecordEvent) PersistRecordActions(ctx context.Context) error {
}
}
}

// flags don't require admin auth
if len(newFlags) > 0 {
e.Engine.Flags.Add(ctx, atURI, newFlags)
}

if e.Engine.AdminClient == nil {
return nil
}

strongRef := comatproto.RepoStrongRef{
Cid: e.CID,
Uri: atURI,
Expand All @@ -521,9 +529,7 @@ func (e *RecordEvent) PersistRecordActions(ctx context.Context) error {
return err
}
}
if len(newFlags) > 0 {
e.Engine.Flags.Add(ctx, atURI, newFlags)
}

for _, mr := range newReports {
e.Logger.Info("reporting record", "reasonType", mr.ReasonType, "comment", mr.Comment)
_, err := comatproto.ModerationCreateReport(ctx, xrpcc, &comatproto.ModerationCreateReport_Input{
Expand Down
113 changes: 113 additions & 0 deletions automod/fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package automod

import (
"context"
"fmt"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/xrpc"
)

func (e *Engine) FetchAndProcessRecord(ctx context.Context, aturi syntax.ATURI) error {
// resolve URI, identity, and record
if aturi.RecordKey() == "" {
return fmt.Errorf("need a full, not partial, AT-URI: %s", aturi)
}
ident, err := e.Directory.Lookup(ctx, aturi.Authority())
if err != nil {
return fmt.Errorf("resolving AT-URI authority: %v", err)
}
pdsURL := ident.PDSEndpoint()
if pdsURL == "" {
return fmt.Errorf("could not resolve PDS endpoint for AT-URI account: %s", ident.DID.String())
}
pdsClient := xrpc.Client{Host: ident.PDSEndpoint()}

e.Logger.Info("fetching record", "did", ident.DID.String(), "collection", aturi.Collection().String(), "rkey", aturi.RecordKey().String())
out, err := comatproto.RepoGetRecord(ctx, &pdsClient, "", aturi.Collection().String(), ident.DID.String(), aturi.RecordKey().String())
if err != nil {
return fmt.Errorf("fetching record from Relay (%s): %v", aturi, err)
}
if out.Cid == nil {
return fmt.Errorf("expected a CID in getRecord response")
}
return e.ProcessRecord(ctx, ident.DID, aturi.Path(), *out.Cid, out.Value.Val)
}

func (e *Engine) FetchRecent(ctx context.Context, atid syntax.AtIdentifier, limit int) (*identity.Identity, []*comatproto.RepoListRecords_Record, error) {
ident, err := e.Directory.Lookup(ctx, atid)
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve AT identifier: %v", err)
}
pdsURL := ident.PDSEndpoint()
if pdsURL == "" {
return nil, nil, fmt.Errorf("could not resolve PDS endpoint for account: %s", ident.DID.String())
}
pdsClient := xrpc.Client{Host: ident.PDSEndpoint()}

resp, err := comatproto.RepoListRecords(ctx, &pdsClient, "app.bsky.feed.post", "", int64(limit), ident.DID.String(), false, "", "")
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch record list: %v", err)
}
e.Logger.Info("got recent posts", "did", ident.DID.String(), "pds", pdsURL, "count", len(resp.Records))
return ident, resp.Records, nil
}

func (e *Engine) FetchAndProcessRecent(ctx context.Context, atid syntax.AtIdentifier, limit int) error {

ident, records, err := e.FetchRecent(ctx, atid, limit)
if err != nil {
return err
}
// records are most-recent first; we want recent but oldest-first, so iterate backwards
for i := range records {
rec := records[len(records)-i-1]
aturi, err := syntax.ParseATURI(rec.Uri)
if err != nil {
return fmt.Errorf("parsing PDS record response: %v", err)
}
err = e.ProcessRecord(ctx, ident.DID, aturi.Path(), rec.Cid, rec.Value.Val)
if err != nil {
return err
}
}
return nil
}

type AccountCapture struct {
CapturedAt syntax.Datetime `json:"capturedAt"`
AccountMeta AccountMeta `json:"accountMeta"`
PostRecords []comatproto.RepoListRecords_Record `json:"postRecords"`
}

func (e *Engine) CaptureRecent(ctx context.Context, atid syntax.AtIdentifier, limit int) (*AccountCapture, error) {
ident, records, err := e.FetchRecent(ctx, atid, limit)
if err != nil {
return nil, err
}
pr := []comatproto.RepoListRecords_Record{}
for _, r := range records {
if r != nil {
pr = append(pr, *r)
}
}

// clear any pre-parsed key, which would fail to marshal as JSON
ident.ParsedPublicKey = nil
am, err := e.GetAccountMeta(ctx, ident)
if err != nil {
return nil, err
}

// auto-clear sensitive PII (eg, account email)
am.Private = nil

ac := AccountCapture{
CapturedAt: syntax.DatetimeNow(),
AccountMeta: *am,
PostRecords: pr,
}
return &ac, nil
}
Loading

0 comments on commit 705a15d

Please sign in to comment.