Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

automod: identical reply rule #466

Merged
merged 8 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions automod/countstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package automod

import (
"context"
"fmt"
"log/slog"
"time"
)

const (
PeriodTotal = "total"
PeriodDay = "day"
PeriodHour = "hour"
)

type CountStore interface {
GetCount(ctx context.Context, name, val, period string) (int, error)
Increment(ctx context.Context, name, val string) error
IncrementPeriod(ctx context.Context, name, val, period string) error
// TODO: batch increment method
GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error)
IncrementDistinct(ctx context.Context, name, bucket, val string) error
}

// TODO: this implementation isn't race-safe (yet)!
type MemCountStore struct {
Counts map[string]int
DistinctCounts map[string]map[string]bool
}

func NewMemCountStore() MemCountStore {
return MemCountStore{
Counts: make(map[string]int),
DistinctCounts: make(map[string]map[string]bool),
}
}

func PeriodBucket(name, val, period string) string {
switch period {
case PeriodTotal:
return fmt.Sprintf("%s/%s", name, val)
case PeriodDay:
t := time.Now().UTC().Format(time.DateOnly)
return fmt.Sprintf("%s/%s/%s", name, val, t)
case PeriodHour:
t := time.Now().UTC().Format(time.RFC3339)[0:13]
return fmt.Sprintf("%s/%s/%s", name, val, t)
default:
slog.Warn("unhandled counter period", "period", period)
return fmt.Sprintf("%s/%s", name, val)
}
}

func (s MemCountStore) GetCount(ctx context.Context, name, val, period string) (int, error) {
v, ok := s.Counts[PeriodBucket(name, val, period)]
if !ok {
return 0, nil
}
return v, nil
}

func (s MemCountStore) Increment(ctx context.Context, name, val string) error {
for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} {
s.IncrementPeriod(ctx, name, val, p)
}
return nil
}

func (s MemCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error {
k := PeriodBucket(name, val, period)
v, ok := s.Counts[k]
if !ok {
v = 0
}
v = v + 1
s.Counts[k] = v
return nil
}

func (s MemCountStore) GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) {
v, ok := s.DistinctCounts[PeriodBucket(name, bucket, period)]
if !ok {
return 0, nil
}
return len(v), nil
}

func (s MemCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error {
for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} {
k := PeriodBucket(name, bucket, p)
m, ok := s.DistinctCounts[k]
if !ok {
m = make(map[string]bool)
}
m[val] = true
s.DistinctCounts[k] = m
}
return nil
}
9 changes: 5 additions & 4 deletions automod/countstore/countstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,24 @@ const (
// Incrementing -- both the "Increment" and "IncrementDistinct" variants -- increases
// a count in each supported period bucket size.
// In other words, one call to CountStore.Increment causes three increments internally:
// one to the count for the hour, one to the count for the day, and one to thte all-time count.
// one to the count for the hour, one to the count for the day, and one to the all-time count.
// The "IncrementPeriod" method allows only incrementing a single period bucket. Care must be taken to match the "GetCount" period with the incremented period when using this variant.
//
// The exact implementation and precision of the "*Distinct" methods may vary:
// in the MemCountStore implementation, it is precise (it's based on large maps);
// in the RedisCountStore implementation, it uses the Redis "pfcount" feature,
// which is based on a HyperLogLog datastructure which has probablistic properties
// which is based on a HyperLogLog datastructure which has probabilistic properties
// (see https://redis.io/commands/pfcount/ ).
//
// Memory growth and availablity of information over time also varies by implementation.
// Memory growth and availability of information over time also varies by implementation.
// The RedisCountStore implementation uses Redis's key expiration primitives;
// only the all-time counts go without expiration.
// The MemCountStore grows without bound (it's intended to be used in testing
// and other non-production operations).
//
type CountStore interface {
GetCount(ctx context.Context, name, val, period string) (int, error)
Increment(ctx context.Context, name, val string) error
IncrementPeriod(ctx context.Context, name, val, period string) error
// TODO: batch increment method
GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error)
IncrementDistinct(ctx context.Context, name, bucket, val string) error
Expand Down
17 changes: 12 additions & 5 deletions automod/countstore/countstore_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,21 @@ func (s MemCountStore) GetCount(ctx context.Context, name, val, period string) (

func (s MemCountStore) Increment(ctx context.Context, name, val string) error {
for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} {
k := periodBucket(name, val, p)
s.Counts.Compute(k, func(oldVal int, _ bool) (int, bool) {
return oldVal+1, false
})
if err := s.IncrementPeriod(ctx, name, val, p); err != nil {
return err
}
}
return nil
}

func (s MemCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error {
k := periodBucket(name, val, period)
s.Counts.Compute(k, func(oldVal int, _ bool) (int, bool) {
return oldVal + 1, false
})
return nil
}

func (s MemCountStore) GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) {
v, ok := s.DistinctCounts.Load(periodBucket(name, bucket, period))
if !ok {
Expand All @@ -52,7 +59,7 @@ func (s MemCountStore) GetCountDistinct(ctx context.Context, name, bucket, perio
func (s MemCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error {
for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} {
k := periodBucket(name, bucket, p)
s.DistinctCounts.Compute(k,func(nested *xsync.MapOf[string, bool], _ bool) (*xsync.MapOf[string, bool], bool) {
s.DistinctCounts.Compute(k, func(nested *xsync.MapOf[string, bool], _ bool) (*xsync.MapOf[string, bool], bool) {
if nested == nil {
nested = xsync.NewMapOf[string, bool]()
}
Expand Down
20 changes: 20 additions & 0 deletions automod/countstore/countstore_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,26 @@ func (s *RedisCountStore) Increment(ctx context.Context, name, val string) error
return err
}

// Variant of Increment() which only acts on a single specified time period. The intended us of this variant is to control the total number of counters persisted, by using a relatively short time period, for which the counters will expire.
func (s *RedisCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error {

// multiple ops in a single redis round-trip
multi := s.Client.Pipeline()

key := redisCountPrefix + periodBucket(name, val, period)
multi.Incr(ctx, key)

switch period {
case PeriodHour:
multi.Expire(ctx, key, 2*time.Hour)
case PeriodDay:
multi.Expire(ctx, key, 48*time.Hour)
}

_, err := multi.Exec(ctx)
return err
}

func (s *RedisCountStore) GetCountDistinct(ctx context.Context, name, val, period string) (int, error) {
key := redisDistinctPrefix + periodBucket(name, val, period)
c, err := s.Client.PFCount(ctx, key).Result()
Expand Down
102 changes: 79 additions & 23 deletions automod/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type ModReport struct {
}

type CounterRef struct {
Name string
Val string
Name string
Val string
Period *string
}

type CounterDistinctRef struct {
Expand All @@ -26,22 +27,39 @@ type CounterDistinctRef struct {
Val string
}

// base type for events specific to an account, usually derived from a repo event stream message (one such message may result in multiple `RepoEvent`)
// Base type for events specific to an account, usually derived from a repo event stream message (one such message may result in multiple `RepoEvent`)
//
// Events are both containers for data about the event itself (similar to an HTTP request type); aggregate results and state (counters, mod actions) to be persisted after all rules are run; and act as an API for additional network reads and operations.
//
// events are both containers for data about the event itself (similar to an HTTP request type); aggregate results and state (counters, mod actions) to be persisted after all rules are run; and act as an API for additional network reads and operations.
// Handling of moderation actions (such as labels, flags, and reports) are deferred until the end of all rule execution, then de-duplicated against any pre-existing actions on the account.
type RepoEvent struct {
Engine *Engine
Err error
Logger *slog.Logger
Account AccountMeta
CounterIncrements []CounterRef
// Back-reference to Engine that is processing this event. Pointer, but must not be nil.
Engine *Engine
// Any error encountered while processing the event can be stashed in this field and handled at the end of all processing.
Err error
// slog logger handle, with event-specific structured fields pre-populated. Pointer, but expected to not be nil.
Logger *slog.Logger
// Metadata for the account (identity) associated with this event (aka, the repo owner)
Account AccountMeta
// List of counters which should be incremented as part of processing this event. These are collected during rule execution and persisted in bulk at the end.
CounterIncrements []CounterRef
// Similar to "CounterIncrements", but for "distinct" style counters
CounterDistinctIncrements []CounterDistinctRef // TODO: better variable names
AccountLabels []string
AccountFlags []string
AccountReports []ModReport
AccountTakedown bool
// Label values which should be applied to the overall account, as a result of rule execution.
AccountLabels []string
// Moderation flags (similar to labels, but private) which should be applied to the overall account, as a result of rule execution.
AccountFlags []string
// Reports which should be filed against this account, as a result of rule execution.
AccountReports []ModReport
// If "true", indicates that a rule indicates that the entire account should have a takedown.
AccountTakedown bool
}

// Immediate fetches a count from the event's engine's countstore. Returns 0 by default (if counter has never been incremented).
//
// "name" is the counter namespace.
// "val" is the specific counter with that namespace.
// "period" is the time period bucke (one of the fixed "Period*" values)
func (e *RepoEvent) GetCount(name, val, period string) int {
v, err := e.Engine.GetCount(name, val, period)
if err != nil {
Expand All @@ -51,10 +69,20 @@ func (e *RepoEvent) GetCount(name, val, period string) int {
return v
}

// Enqueues the named counter to be incremented at the end of all rule processing. Will automatically increment for all time periods.
//
// "name" is the counter namespace.
// "val" is the specific counter with that namespace.
func (e *RepoEvent) Increment(name, val string) {
e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val})
}

// Enqueues the named counter to be incremented at the end of all rule processing. Will only increment the indicated time period bucket.
func (e *RepoEvent) IncrementPeriod(name, val string, period string) {
e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val, Period: &period})
}

// Immediate fetches an estimated (statistical) count of distinct string values in the indicated bucket and time period.
func (e *RepoEvent) GetCountDistinct(name, bucket, period string) int {
v, err := e.Engine.GetCountDistinct(name, bucket, period)
if err != nil {
Expand All @@ -64,10 +92,12 @@ func (e *RepoEvent) GetCountDistinct(name, bucket, period string) int {
return v
}

// Enqueues the named "distinct value" counter based on the supplied string value ("val") to be incremented at the end of all rule processing. Will automatically increment for all time periods.
func (e *RepoEvent) IncrementDistinct(name, bucket, val string) {
e.CounterDistinctIncrements = append(e.CounterDistinctIncrements, CounterDistinctRef{Name: name, Bucket: bucket, Val: val})
}

// Checks the Engine's setstore for whether the indicated "val" is a member of the "name" set.
func (e *RepoEvent) InSet(name, val string) bool {
v, err := e.Engine.InSet(name, val)
if err != nil {
Expand All @@ -77,18 +107,22 @@ func (e *RepoEvent) InSet(name, val string) bool {
return v
}

// Enqueues the entire account to be taken down at the end of rule processing.
func (e *RepoEvent) TakedownAccount() {
e.AccountTakedown = true
}

// Enqueues the provided label (string value) to be added to the account at the end of rule processing.
func (e *RepoEvent) AddAccountLabel(val string) {
e.AccountLabels = append(e.AccountLabels, val)
}

// Enqueues the provided flag (string value) to be recorded (in the Engine's flagstore) at the end of rule processing.
func (e *RepoEvent) AddAccountFlag(val string) {
e.AccountFlags = append(e.AccountFlags, val)
}

// Enqueues a moderation report to be filed against the account at the end of rule processing.
func (e *RepoEvent) ReportAccount(reason, comment string) {
e.AccountReports = append(e.AccountReports, ModReport{ReasonType: reason, Comment: comment})
}
Expand Down Expand Up @@ -247,9 +281,16 @@ func (e *RepoEvent) PersistActions(ctx context.Context) error {
func (e *RepoEvent) PersistCounters(ctx context.Context) error {
// TODO: dedupe this array
for _, ref := range e.CounterIncrements {
err := e.Engine.Counters.Increment(ctx, ref.Name, ref.Val)
if err != nil {
return err
if ref.Period != nil {
err := e.Engine.Counters.IncrementPeriod(ctx, ref.Name, ref.Val, *ref.Period)
if err != nil {
return err
}
} else {
err := e.Engine.Counters.Increment(ctx, ref.Name, ref.Val)
if err != nil {
return err
}
}
}
for _, ref := range e.CounterDistinctIncrements {
Expand All @@ -270,36 +311,50 @@ func (e *RepoEvent) CanonicalLogLine() {
)
}

// Alias of RepoEvent
type IdentityEvent struct {
RepoEvent
}

// Extends RepoEvent. Represents the creation of a single record in the given repository.
type RecordEvent struct {
RepoEvent

Record any
Collection string
RecordKey string
CID string
RecordLabels []string
// The un-marshalled record, as a go struct, from the api/atproto or api/bsky type packages.
Record any
// The "collection" part of the repo path for this record. Must be an NSID, though this isn't indicated by the type of this field.
Collection string
// The "record key" (rkey) part of repo path.
RecordKey string
// CID of the canonical CBOR version of the record, as matches the repo value.
CID string
// Same as "AccountLabels", but at record-level
RecordLabels []string
// Same as "AccountTakedown", but at record-level
RecordTakedown bool
RecordReports []ModReport
RecordFlags []string
// Same as "AccountReports", but at record-level
RecordReports []ModReport
// Same as "AccountFlags", but at record-level
RecordFlags []string
// TODO: commit metadata
}

// Enqueues the record to be taken down at the end of rule processing.
func (e *RecordEvent) TakedownRecord() {
e.RecordTakedown = true
}

// Enqueues the provided label (string value) to be added to the record at the end of rule processing.
func (e *RecordEvent) AddRecordLabel(val string) {
e.RecordLabels = append(e.RecordLabels, val)
}

// Enqueues the provided flag (string value) to be recorded (in the Engine's flagstore) at the end of rule processing.
func (e *RecordEvent) AddRecordFlag(val string) {
e.RecordFlags = append(e.RecordFlags, val)
}

// Enqueues a moderation report to be filed against the record at the end of rule processing.
func (e *RecordEvent) ReportRecord(reason, comment string) {
e.RecordReports = append(e.RecordReports, ModReport{ReasonType: reason, Comment: comment})
}
Expand Down Expand Up @@ -414,6 +469,7 @@ func (e *RecordEvent) CanonicalLogLine() {
)
}

// Extends RepoEvent. Represents the deletion of a single record in the given repository.
type RecordDeleteEvent struct {
RepoEvent

Expand Down
Loading
Loading