diff --git a/internal/enginenetx/network.go b/internal/enginenetx/network.go index d37e30f96d..dfce34e9d5 100644 --- a/internal/enginenetx/network.go +++ b/internal/enginenetx/network.go @@ -9,6 +9,7 @@ import ( "net/http" "net/http/cookiejar" "net/url" + "time" "github.com/ooni/probe-cli/v3/internal/bytecounter" "github.com/ooni/probe-cli/v3/internal/model" @@ -54,7 +55,7 @@ func (n *Network) Close() error { // same as above but for the resolver's connections n.reso.CloseIdleConnections() - // make sure we sync stats to disk + // make sure we sync stats to disk and shutdown the background trimmer return n.stats.Close() } @@ -92,7 +93,8 @@ func NewNetwork( dialer := netxlite.NewDialerWithResolver(logger, resolver) // Create manager for keeping track of statistics - stats := newStatsManager(kvStore, logger) + const trimInterval = 30 * time.Second + stats := newStatsManager(kvStore, logger, trimInterval) // Create a TLS dialer ONLY used for dialing TLS connections. This dialer will use // happy-eyeballs and possibly custom policies for dialing TLS connections. diff --git a/internal/enginenetx/network_internal_test.go b/internal/enginenetx/network_internal_test.go index d796804354..cc05931079 100644 --- a/internal/enginenetx/network_internal_test.go +++ b/internal/enginenetx/network_internal_test.go @@ -42,10 +42,14 @@ func TestNetworkUnit(t *testing.T) { }, }, stats: &statsManager{ + cancel: func() { /* nothing */ }, + closeOnce: sync.Once{}, container: &statsContainer{}, kvStore: &kvstore.Memory{}, logger: model.DiscardLogger, mu: sync.Mutex{}, + pruned: make(chan any), + wg: &sync.WaitGroup{}, }, txp: expected, } @@ -67,10 +71,14 @@ func TestNetworkUnit(t *testing.T) { netx := &Network{ reso: expected, stats: &statsManager{ + cancel: func() { /* nothing */ }, + closeOnce: sync.Once{}, container: &statsContainer{}, kvStore: &kvstore.Memory{}, logger: model.DiscardLogger, mu: sync.Mutex{}, + pruned: make(chan any), + wg: &sync.WaitGroup{}, }, txp: &mocks.HTTPTransport{ MockCloseIdleConnections: func() { @@ -82,7 +90,41 @@ func TestNetworkUnit(t *testing.T) { t.Fatal(err) } if !called { - t.Fatal("did not call the transport's CloseIdleConnections") + t.Fatal("did not call the resolver's CloseIdleConnections") + } + }) + + t.Run("Close calls the .cancel field of the statsManager as a side effect", func(t *testing.T) { + var called bool + netx := &Network{ + reso: &mocks.Resolver{ + MockCloseIdleConnections: func() { + // nothing + }, + }, + stats: &statsManager{ + cancel: func() { + called = true + }, + closeOnce: sync.Once{}, + container: &statsContainer{}, + kvStore: &kvstore.Memory{}, + logger: model.DiscardLogger, + mu: sync.Mutex{}, + pruned: make(chan any), + wg: &sync.WaitGroup{}, + }, + txp: &mocks.HTTPTransport{ + MockCloseIdleConnections: func() { + // nothing + }, + }, + } + if err := netx.Close(); err != nil { + t.Fatal(err) + } + if !called { + t.Fatal("did not call the .cancel field of the statsManager") } }) diff --git a/internal/enginenetx/statsmanager.go b/internal/enginenetx/statsmanager.go index 5846b50832..a95c9aa9e0 100644 --- a/internal/enginenetx/statsmanager.go +++ b/internal/enginenetx/statsmanager.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "net" + "sort" "sync" "time" @@ -87,6 +88,86 @@ type statsTactic struct { Tactic *httpsDialerTactic } +// statsNilSafeSuccessRate is a convenience function for computing the success rate +// which returns zero as the success rate if CountStarted is zero. +// +// For robustness, be paranoid about nils here because the stats are +// written on the disk and a user could potentially edit them. +func statsNilSafeSuccessRate(t *statsTactic) (rate float64) { + if t != nil && t.CountStarted > 0 { + rate = float64(t.CountSuccess) / float64(t.CountStarted) + } + return +} + +// statsNilSafeLastUpdated is a convenience function for getting the .LastUpdated +// field that takes into account the case where t is nil. +func statsNilSafeLastUpdated(t *statsTactic) (output time.Time) { + if t != nil { + output = t.LastUpdated + } + return +} + +// statsNilSafeCountSuccess is a convenience function for getting the .CountSuccess +// counter that takes into account the case where t is nil. +func statsNilSafeCountSuccess(t *statsTactic) (output int64) { + if t != nil { + output = t.CountSuccess + } + return +} + +// statsDefensivelySortTacticsByDescendingSuccessRateWithAcceptPredicate sorts the input list +// by success rate taking into account that several entries could be malformed, and then +// filters the sorted list using the given boolean predicate to accept elements. +// +// The sorting criteria takes into account: +// +// 1. the success rate; or +// +// 2. the last updated time; or +// +// 3. the number of successes. +// +// The predicate allows to further restrict the returned list. +// +// This function operates on a deep copy of the input list, so it does not create data races. +func statsDefensivelySortTacticsByDescendingSuccessRateWithAcceptPredicate( + input []*statsTactic, acceptfunc func(*statsTactic) bool) []*statsTactic { + // first let's create a working list such that we don't modify + // the input in place thus avoiding any data race + work := []*statsTactic{} + for _, t := range input { + if t != nil && t.Tactic != nil { + work = append(work, t.Clone()) // DEEP COPY! + } + } + + // now let's sort work in place + sort.SliceStable(work, func(i, j int) bool { + if statsNilSafeSuccessRate(work[i]) > statsNilSafeSuccessRate(work[j]) { + return true + } + if statsNilSafeCountSuccess(work[i]) > statsNilSafeCountSuccess(work[j]) { + return true + } + if statsNilSafeLastUpdated(work[i]).Sub(statsNilSafeLastUpdated(work[j])) > 0 { + return true + } + return false + }) + + // finally let's apply the predicate to produce output + output := []*statsTactic{} + for _, t := range work { + if acceptfunc(t) { + output = append(output, t) + } + } + return output +} + func statsMaybeCloneMapStringInt64(input map[string]int64) (output map[string]int64) { // distinguish and preserve nil versus empty if input == nil { @@ -135,35 +216,60 @@ type statsDomainEndpoint struct { Tactics map[string]*statsTactic } -// statsDomainEndpointRemoveOldEntries returns a copy of a [*statsDomainEndpoint] with old entries removed. -func statsDomainEndpointRemoveOldEntries(input *statsDomainEndpoint) (output *statsDomainEndpoint) { - output = &statsDomainEndpoint{ - Tactics: map[string]*statsTactic{}, - } - oneWeek := 7 * 24 * time.Hour +// statsDomainEndpointPruneEntries returns a DEEP COPY of a [*statsDomainEndpoint] with old +// and excess entries removed, such that the overall size is not unbounded. +func statsDomainEndpointPruneEntries(input *statsDomainEndpoint) *statsDomainEndpoint { + tactics := []*statsTactic{} now := time.Now() // if .Tactics is empty here we're just going to do nothing for summary, tactic := range input.Tactics { - // we serialize stats to disk, so we cannot rule out the case where the user // explicitly edits the stats to include a malformed entry - if tactic == nil || tactic.Tactic == nil { + if summary == "" || tactic == nil || tactic.Tactic == nil { continue } + tactics = append(tactics, tactic) + } - // When .LastUpdated is the zero time.Time value, the check is going to fail - // exactly like the time was 1 or 5 or 10 years ago instead. - // - // See https://go.dev/play/p/HGQT17ueIkq where we show that the zero time - // is handled exactly like any time in the past (it was kinda obvious, but - // sometimes it also make sense to double check assumptions!) - if delta := now.Sub(tactic.LastUpdated); delta > oneWeek { - continue - } - output.Tactics[summary] = tactic.Clone() + // oneWeek is a constant representing one week of data. + const oneWeek = 7 * 24 * time.Hour + + // maxEntriesPerDomainEndpoint is the maximum number of entries per + // domain endpoint that we would like to keep overall. + const maxEntriesPerDomainEndpoint = 10 + + // Sort by descending success rate and cut all the entries that are older than + // a given threshold. Note that we need to be defensive here because we are dealing + // with data stored on disk that might have been modified to crash us. + // + // Note that statsDefensivelySortTacticsByDescendingSuccessRateWithAcceptPredicate + // operates on and returns a DEEP COPY of the original list. + tactics = statsDefensivelySortTacticsByDescendingSuccessRateWithAcceptPredicate( + tactics, func(st *statsTactic) bool { + // When .LastUpdated is the zero time.Time value, the check is going to fail + // exactly like the time was 1 or 5 or 10 years ago instead. + // + // See https://go.dev/play/p/HGQT17ueIkq where we show that the zero time + // is handled exactly like any time in the past (it was kinda obvious, but + // sometimes it also make sense to double check assumptions!) + delta := now.Sub(statsNilSafeLastUpdated(st)) + return delta < oneWeek + }) + + // Cut excess entries, if needed + if len(tactics) > maxEntriesPerDomainEndpoint { + tactics = tactics[:maxEntriesPerDomainEndpoint] } - return + + // return a new statsDomainEndpoint to the caller + output := &statsDomainEndpoint{ + Tactics: map[string]*statsTactic{}, + } + for _, t := range tactics { + output.Tactics[t.Tactic.tacticSummaryKey()] = t + } + return output } // statsContainerVersion is the current version of [statsContainer]. @@ -180,8 +286,8 @@ type statsContainer struct { Version int } -// statsDomainRemoveOldEntries returns a copy of a [*statsContainer] with old entries removed. -func statsContainerRemoveOldEntries(input *statsContainer) (output *statsContainer) { +// statsContainerPruneEntries returns a DEEP COPY of a [*statsContainer] with old entries removed. +func statsContainerPruneEntries(input *statsContainer) (output *statsContainer) { output = newStatsContainer() // if .DomainEndpoints is nil here we're just going to do nothing @@ -189,11 +295,11 @@ func statsContainerRemoveOldEntries(input *statsContainer) (output *statsContain // We serialize this data to disk, so we need to account for the case // where a user has manually edited the JSON to add a nil value - if inputStats == nil { + if domainEpnt == "" || inputStats == nil || len(inputStats.Tactics) <= 0 { continue } - prunedStats := statsDomainEndpointRemoveOldEntries(inputStats) + prunedStats := statsDomainEndpointPruneEntries(inputStats) // We don't want to include an entry when it's empty because all the // stats inside it have just been pruned @@ -208,7 +314,7 @@ func statsContainerRemoveOldEntries(input *statsContainer) (output *statsContain // GetStatsTacticLocked returns the tactic record for the given [*statsTactic] instance. // -// At the name implies, this function MUST be called while holding the [*statsManager] mutex. +// As the name implies, this function MUST be called while holding the [*statsManager] mutex. func (c *statsContainer) GetStatsTacticLocked(tactic *httpsDialerTactic) (*statsTactic, bool) { domainEpntRecord, found := c.DomainEndpoints[tactic.domainEndpointKey()] if !found || domainEpntRecord == nil { @@ -220,7 +326,7 @@ func (c *statsContainer) GetStatsTacticLocked(tactic *httpsDialerTactic) (*stats // SetStatsTacticLocked sets the tactic record for the given the given [*statsTactic] instance. // -// At the name implies, this function MUST be called while holding the [*statsManager] mutex. +// As the name implies, this function MUST be called while holding the [*statsManager] mutex. func (c *statsContainer) SetStatsTacticLocked(tactic *httpsDialerTactic, record *statsTactic) { domainEpntRecord, found := c.DomainEndpoints[tactic.domainEndpointKey()] if !found { @@ -254,6 +360,12 @@ func newStatsContainer() *statsContainer { // The zero value of this structure is not ready to use; please, use the // [newStatsManager] factory to create a new instance. type statsManager struct { + // cancel allows canceling the background stats pruner. + cancel context.CancelFunc + + // closeOnce gives .Close a "once" semantics + closeOnce sync.Once + // container is the container container for stats container *statsContainer @@ -265,6 +377,13 @@ type statsManager struct { // mu provides mutual exclusion when accessing the stats. mu sync.Mutex + + // pruned is a channel pruned on a best effort basis + // by the background goroutine that prunes. + pruned chan any + + // wg tells us when the background goroutine joined. + wg *sync.WaitGroup } // statsKey is the key used in the key-value store to access the state. @@ -299,24 +418,42 @@ func loadStatsContainer(kvStore model.KeyValueStore) (*statsContainer, error) { return nil, err } - // make sure we remove old entries - pruned := statsContainerRemoveOldEntries(&container) + // make sure we prune the data structure + pruned := statsContainerPruneEntries(&container) return pruned, nil } // newStatsManager constructs a new instance of [*statsManager]. -func newStatsManager(kvStore model.KeyValueStore, logger model.Logger) *statsManager { +func newStatsManager(kvStore model.KeyValueStore, logger model.Logger, trimInterval time.Duration) *statsManager { + runtimex.Assert(trimInterval > 0, "passed non-positive trimInterval") + root, err := loadStatsContainer(kvStore) if err != nil { root = newStatsContainer() } - return &statsManager{ + ctx, cancel := context.WithCancel(context.Background()) + + mt := &statsManager{ + cancel: cancel, + closeOnce: sync.Once{}, container: root, kvStore: kvStore, logger: logger, mu: sync.Mutex{}, + pruned: make(chan any), + wg: &sync.WaitGroup{}, } + + // run a background goroutine that trims the stats by removing excessive + // entries until the programmer calls (*statsManager).Close + mt.wg.Add(1) + go func() { + defer mt.wg.Done() + mt.trim(ctx, trimInterval) + }() + + return mt } var _ httpsDialerEventsHandler = &statsManager{} @@ -449,16 +586,57 @@ func (mt *statsManager) OnSuccess(tactic *httpsDialerTactic) { } // Close implements io.Closer -func (mt *statsManager) Close() error { - // TODO(bassosimone): do we need to apply a "once" semantics to this method? Perhaps no - // given that there is no resource that we can close only once... +func (mt *statsManager) Close() (err error) { + mt.closeOnce.Do(func() { + // interrupt the background goroutine + mt.cancel() + + func() { + // get exclusive access + defer mt.mu.Unlock() + mt.mu.Lock() + + // make sure we remove the unneeded entries one last time before saving them + container := statsContainerPruneEntries(mt.container) + + // write updated stats into the underlying key-value store + err = mt.kvStore.Set(statsKey, runtimex.Try1(json.Marshal(container))) + }() + + // wait for background goroutine to join + mt.wg.Wait() + }) + return +} - // get exclusive access - defer mt.mu.Unlock() - mt.mu.Lock() +// trim runs in the background and trims the mt.container struct +func (mt *statsManager) trim(ctx context.Context, interval time.Duration) { - // write updated stats into the underlying key-value store - return mt.kvStore.Set(statsKey, runtimex.Try1(json.Marshal(mt.container))) + // Note: we already manage mt.wg when we start this goroutine so there's NO NEED to do it here! + + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + + case <-t.C: + + // get exclusive access and edit the container + mt.mu.Lock() + mt.container = statsContainerPruneEntries(mt.container) + mt.mu.Unlock() + + // notify whoever's concerned that we pruned + // and do that best effort because it may be that nobody is concerned + select { + case mt.pruned <- true: + default: + } + + } + } } // LookupTacticsStats returns stats about tactics for a given domain and port. The returned diff --git a/internal/enginenetx/statsmanager_test.go b/internal/enginenetx/statsmanager_test.go index e5f4886bdc..1b8b2c4d40 100644 --- a/internal/enginenetx/statsmanager_test.go +++ b/internal/enginenetx/statsmanager_test.go @@ -4,7 +4,10 @@ import ( "context" "encoding/json" "errors" + "fmt" + "math/rand" "sort" + "strings" "sync" "testing" "time" @@ -558,7 +561,13 @@ func TestStatsManagerCallbacks(t *testing.T) { "162.55.247.208:443 sni=www.example.com verify=api.ooni.io": { CountStarted: 1, LastUpdated: fourtyFiveMinutesAgo, - Tactic: &httpsDialerTactic{}, // only required for cloning + Tactic: &httpsDialerTactic{ + Address: "162.55.247.208", + InitialDelay: 0, + Port: "443", + SNI: "www.example.com", + VerifyHostname: "api.ooni.io", + }, }, }, }, @@ -588,7 +597,13 @@ func TestStatsManagerCallbacks(t *testing.T) { "162.55.247.208:443 sni=www.example.com verify=api.ooni.io": { CountStarted: 1, CountTCPConnectInterrupt: 1, - Tactic: &httpsDialerTactic{}, + Tactic: &httpsDialerTactic{ + Address: "162.55.247.208", + InitialDelay: 0, + Port: "443", + SNI: "www.example.com", + VerifyHostname: "api.ooni.io", + }, }, }, }, @@ -635,7 +650,13 @@ func TestStatsManagerCallbacks(t *testing.T) { "162.55.247.208:443 sni=www.example.com verify=api.ooni.io": { CountStarted: 1, LastUpdated: fourtyFiveMinutesAgo, - Tactic: &httpsDialerTactic{}, // only for cloning + Tactic: &httpsDialerTactic{ + Address: "162.55.247.208", + InitialDelay: 0, + Port: "443", + SNI: "www.example.com", + VerifyHostname: "api.ooni.io", + }, }, }, }, @@ -665,7 +686,13 @@ func TestStatsManagerCallbacks(t *testing.T) { "162.55.247.208:443 sni=www.example.com verify=api.ooni.io": { CountStarted: 1, CountTLSHandshakeInterrupt: 1, - Tactic: &httpsDialerTactic{}, + Tactic: &httpsDialerTactic{ + Address: "162.55.247.208", + InitialDelay: 0, + Port: "443", + SNI: "www.example.com", + VerifyHostname: "api.ooni.io", + }, }, }, }, @@ -771,7 +798,9 @@ func TestStatsManagerCallbacks(t *testing.T) { } // create the stats manager - stats := newStatsManager(kvStore, logger) + const trimInterval = 30 * time.Second + stats := newStatsManager(kvStore, logger, trimInterval) + defer stats.Close() // invoke the proper stats callback tc.do(stats) @@ -892,7 +921,9 @@ func TestStatsManagerLookupTactics(t *testing.T) { } // create the stats manager - stats := newStatsManager(kvStore, log.Log) + const trimInterval = 30 * time.Second + stats := newStatsManager(kvStore, log.Log, trimInterval) + defer stats.Close() t.Run("when we're searching for a domain endpoint we know about", func(t *testing.T) { // obtain tactics @@ -1043,3 +1074,562 @@ func TestStatsContainer(t *testing.T) { }) }) } + +func TestStatsNilSafeSuccessRate(t *testing.T) { + t.Run("with nil entry", func(t *testing.T) { + var st *statsTactic + if statsNilSafeSuccessRate(st) != 0 { + t.Fatal("unexpected result") + } + }) + + t.Run("with non-nil entry", func(t *testing.T) { + st := &statsTactic{ + CountStarted: 10, + CountSuccess: 5, + } + if statsNilSafeSuccessRate(st) != 0.5 { + t.Fatal("unexpected result") + } + }) +} + +func TestStatsNilSafeLastUpdated(t *testing.T) { + t.Run("with nil entry", func(t *testing.T) { + var st *statsTactic + if !statsNilSafeLastUpdated(st).IsZero() { + t.Fatal("unexpected result") + } + }) + + t.Run("with non-nil entry", func(t *testing.T) { + expect := time.Now() + st := &statsTactic{ + LastUpdated: expect, + } + if statsNilSafeLastUpdated(st) != expect { + t.Fatal("unexpected result") + } + }) +} + +func TestStatsNilSafeCountSuccess(t *testing.T) { + t.Run("with nil entry", func(t *testing.T) { + var st *statsTactic + if statsNilSafeCountSuccess(st) != 0 { + t.Fatal("unexpected result") + } + }) + + t.Run("with non-nil entry", func(t *testing.T) { + st := &statsTactic{ + CountSuccess: 11, + } + if statsNilSafeCountSuccess(st) != 11 { + t.Fatal("unexpected result") + } + }) +} + +func TestStatsDefensivelySortTacticsByDescendingSuccessRateWithAcceptPredicate(t *testing.T) { + now := time.Now() + + // expect shows what we expect to see in output + expect := []*statsTactic{ + + // this one should be first because it has 100% success rate + // and the highest number of successes + { + CountStarted: 5, + CountSuccess: 5, + LastUpdated: now.Add(-5 * time.Second), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "www.repubblica.it", + VerifyHostname: "shelob.polito.it", + }, + }, + + // this one should be second because it has less successes + // than the first one albeit the same last updated + { + CountStarted: 4, + CountSuccess: 4, + LastUpdated: now.Add(-5 * time.Second), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "www.ilfattoquotidiano.it", + VerifyHostname: "shelob.polito.it", + }, + }, + + // this one should be third because it is a bit older + // albeit it has the same number of successes + { + CountStarted: 4, + CountSuccess: 4, + LastUpdated: now.Add(-7 * time.Second), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "www.ilpost.it", + VerifyHostname: "shelob.polito.it", + }, + }, + + // this one should come fourth because it has a lower success rate + { + CountStarted: 100, + CountSuccess: 95, + LastUpdated: now.Add(-2 * time.Second), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "www.polito.it", + VerifyHostname: "shelob.polito.it", + }, + }, + } + + // input contains the input we provide, which should contain + // a mixture of the above entries together with a bunch of + // entries with very bad values + input := []*statsTactic{ + + // this is the one that should sort last in output + expect[3], + + // a nil entry is obviously a good test case + nil, + + // an entry with a nil Tactic is also quite annoying + { + CountStarted: 55, + CountSuccess: 55, + LastUpdated: now.Add(-3 * time.Second), + Tactic: nil, + }, + + expect[1], + expect[2], + + // another nil entry because why not + nil, + + // another entry with nil Tactic because why not + { + CountStarted: 101, + CountSuccess: 44, + LastUpdated: now.Add(-33 * time.Second), + Tactic: nil, + }, + + // a legitimate entry that is going to be filtered out + // by a custom filtering function + // + // otherwise, this one should be the first entry + { + CountStarted: 128, + CountSuccess: 128, + LastUpdated: now.Add(-130 * time.Millisecond), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "kernel.org", + VerifyHostname: "shelob.polito.it", + }, + }, + + expect[0], + } + + got := statsDefensivelySortTacticsByDescendingSuccessRateWithAcceptPredicate( + input, func(st *statsTactic) bool { + return st != nil && st.Tactic != nil && strings.HasSuffix(st.Tactic.SNI, ".it") + }, + ) + + if diff := cmp.Diff(expect, got); diff != "" { + t.Fatal(diff) + } +} + +func TestStatsDomainEndpointPruneEntries(t *testing.T) { + t.Run("rejects tactics with empty summary, nil tactics and with nil .Tactics", func(t *testing.T) { + input := &statsDomainEndpoint{ + Tactics: map[string]*statsTactic{ + // empty summary + "": { + Tactic: &httpsDialerTactic{}, + }, + + // nil tactic + "antani": nil, + + // nil .Tactic + "foo": { + Tactic: nil, + }, + }, + } + + expect := &statsDomainEndpoint{ + Tactics: map[string]*statsTactic{}, + } + + got := statsDomainEndpointPruneEntries(input) + + if diff := cmp.Diff(expect, got); diff != "" { + t.Fatal(diff) + } + }) + + t.Run("prunes entries older than one week", func(t *testing.T) { + now := time.Now() + + input := &statsDomainEndpoint{ + Tactics: map[string]*statsTactic{ + "130.192.91.211:443 sni=polito.it verify=shelob.polito.it": { + CountStarted: 10, + CountSuccess: 10, + LastUpdated: now.Add(-24 * time.Hour * 8), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "polito.it", + VerifyHostname: "shelob.polito.it", + }, + }, + "130.192.91.211:443 sni=garr.it verify=shelob.polito.it": { + CountStarted: 10, + CountSuccess: 7, + LastUpdated: now.Add(-24 * time.Hour * 6), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "garr.it", + VerifyHostname: "shelob.polito.it", + }, + }, + }, + } + + expect := &statsDomainEndpoint{ + Tactics: map[string]*statsTactic{ + "130.192.91.211:443 sni=garr.it verify=shelob.polito.it": { + CountStarted: 10, + CountSuccess: 7, + LastUpdated: now.Add(-24 * time.Hour * 6), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "garr.it", + VerifyHostname: "shelob.polito.it", + }, + }, + }, + } + + got := statsDomainEndpointPruneEntries(input) + + if diff := cmp.Diff(expect, got); diff != "" { + t.Fatal(diff) + } + }) + + t.Run("reduces the number of entries", func(t *testing.T) { + var ( + inputs []*statsTactic + ) + + expect := &statsDomainEndpoint{ + Tactics: map[string]*statsTactic{}, + } + now := time.Now() + + // create successful entries + for idx := int64(0); idx < 7; idx++ { + tactic := &statsTactic{ + CountStarted: 10, + CountTCPConnectError: idx, + CountSuccess: 10 - idx, + HistoTCPConnectError: map[string]int64{ + "generic_timeout_error": idx, + }, + LastUpdated: now.Add(-time.Duration(idx) * time.Second), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: fmt.Sprintf("host%d.garr.it", idx), + VerifyHostname: "shelob.polito.it", + }, + } + inputs = append(inputs, tactic) + + // note how we're making entries such that each entry is less + // good than the subsequent one in terms of the success rate + expect.Tactics[tactic.Tactic.tacticSummaryKey()] = tactic + } + + // create failed entries + for idx := int64(7); idx < 255; idx++ { + tactic := &statsTactic{ + CountStarted: idx, + CountTCPConnectError: idx, + HistoTCPConnectError: map[string]int64{ + "generic_timeout_error": idx, + }, + LastUpdated: now.Add(-time.Duration(idx) * time.Second), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: fmt.Sprintf("host%d.garr.it", idx), + VerifyHostname: "shelob.polito.it", + }, + } + inputs = append(inputs, tactic) + + // we need three extra failures in the expected results + // and they must sort after successful entries + if idx < 10 { + expect.Tactics[tactic.Tactic.tacticSummaryKey()] = tactic + } + } + + // shuffle the input order + r := rand.New(rand.NewSource(time.Now().UnixNano())) + r.Shuffle(len(inputs), func(i, j int) { + inputs[i], inputs[j] = inputs[j], inputs[i] + }) + + // fill the input struct + input := &statsDomainEndpoint{ + Tactics: map[string]*statsTactic{}, + } + for _, entry := range inputs { + input.Tactics[entry.Tactic.tacticSummaryKey()] = entry + } + + got := statsDomainEndpointPruneEntries(input) + + // log the results because it may be useful in case something is wrong + t.Log(string(runtimex.Try1(json.MarshalIndent(got, "", " ")))) + + if diff := cmp.Diff(expect, got); diff != "" { + t.Fatal(diff) + } + }) +} + +func TestStatsContainerPruneEntries(t *testing.T) { + t.Run("with a nil .DomainEndpoints field", func(t *testing.T) { + input := &statsContainer{ + DomainEndpoints: nil, // explicitly + Version: statsContainerVersion, + } + + output := statsContainerPruneEntries(input) + + expect := &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{}, + Version: statsContainerVersion, + } + + if diff := cmp.Diff(expect, output); diff != "" { + t.Fatal(diff) + } + }) + + t.Run("we filter out empty summary, nil and nil/empty .Tactics", func(t *testing.T) { + input := &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{ + + // empty summary + "": {}, + + // nil entry + "antani": nil, + + // nil .Tactics + "foo": { + Tactics: nil, + }, + + // empty .Tactics + "bar": { + Tactics: map[string]*statsTactic{}, + }, + }, + Version: statsContainerVersion, + } + + output := statsContainerPruneEntries(input) + + expect := &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{}, + Version: statsContainerVersion, + } + + if diff := cmp.Diff(expect, output); diff != "" { + t.Fatal(diff) + } + }) + + t.Run("we avoid including into the results expired entries", func(t *testing.T) { + input := &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{ + "shelob.polito.it:443": { + Tactics: map[string]*statsTactic{ + "130.192.91.211:443 sni=garr.it verify=shelob.polito.it": { + CountStarted: 10, + CountSuccess: 10, + LastUpdated: time.Time{}, // a long time ago! + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "garr.it", + VerifyHostname: "shelob.polito.it", + }, + }, + }, + }, + }, + Version: statsContainerVersion, + } + + output := statsContainerPruneEntries(input) + + expect := &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{}, + Version: statsContainerVersion, + } + + if diff := cmp.Diff(expect, output); diff != "" { + t.Fatal(diff) + } + }) + + t.Run("on a successful case", func(t *testing.T) { + expectTactic := &statsTactic{ + CountStarted: 10, + CountSuccess: 10, + LastUpdated: time.Now().Add(-60 * time.Second), // recently + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "polito.it", + VerifyHostname: "shelob.polito.it", + }, + } + expectTacticSummary := expectTactic.Tactic.tacticSummaryKey() + + input := &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{ + "shelob.polito.it:443": { + Tactics: map[string]*statsTactic{ + "130.192.91.211:443 sni=garr.it verify=shelob.polito.it": { + CountStarted: 10, + CountSuccess: 10, + LastUpdated: time.Time{}, // a long time ago! + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "garr.it", + VerifyHostname: "shelob.polito.it", + }, + }, + expectTacticSummary: expectTactic, + }, + }, + }, + Version: statsContainerVersion, + } + + output := statsContainerPruneEntries(input) + + expect := &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{ + "shelob.polito.it:443": { + Tactics: map[string]*statsTactic{ + expectTacticSummary: expectTactic, + }, + }, + }, + Version: statsContainerVersion, + } + + if diff := cmp.Diff(expect, output); diff != "" { + t.Fatal(diff) + } + }) +} + +func TestStatsManagerTrimEntriesConcurrently(t *testing.T) { + // start stats manager that trims very frequently + store := &kvstore.Memory{} + sm := newStatsManager(store, model.DiscardLogger, 1*time.Second) + + // obtain exclusive access + sm.mu.Lock() + + // insert some data that needs pruning + sm.container = &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{ + "shelob.polito.it:443": { + Tactics: map[string]*statsTactic{ + "130.192.91.211:443 sni=garr.it verify=shelob.polito.it": { + CountStarted: 10, + CountSuccess: 10, + LastUpdated: time.Time{}, // a long time ago! + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "garr.it", + VerifyHostname: "shelob.polito.it", + }, + }, + }, + }, + }, + Version: statsContainerVersion, + } + + // let the worker continue to run + sm.mu.Unlock() + + // wait for pruning to happen + <-sm.pruned + + // order the background goroutine to shutdown + // and wait for the shutdown to complete + if err := sm.Close(); err != nil { + t.Fatal(err) + } + + // now check what actually ended up being written; note that we expect + // to see empty domain endpoints because we added a too old entry + expectedData := []byte(`{"DomainEndpoints":{},"Version":5}`) + data, err := store.Get(statsKey) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(expectedData, data); diff != "" { + t.Fatal(diff) + } +} diff --git a/internal/enginenetx/statspolicy.go b/internal/enginenetx/statspolicy.go index b70d0facc2..8ff144f6a8 100644 --- a/internal/enginenetx/statspolicy.go +++ b/internal/enginenetx/statspolicy.go @@ -8,7 +8,8 @@ package enginenetx import ( "context" - "sort" + + "github.com/ooni/probe-cli/v3/internal/runtimex" ) // statsPolicy is a policy that schedules tactics already known @@ -79,36 +80,17 @@ func statsPolicyPostProcessTactics(tactics []*statsTactic, good bool) (out []*ht return } - // nilSafeSuccessRate is a convenience function for computing the success rate - // which returns zero as the success rate if CountStarted is zero - // - // for robustness, be paranoid about nils here because the stats are - // written on the disk and a user could potentially edit them - nilSafeSuccessRate := func(t *statsTactic) (rate float64) { - if t != nil && t.CountStarted > 0 { - rate = float64(t.CountSuccess) / float64(t.CountStarted) - } - return - } - - // Implementation note: the function should implement the "less" semantics for - // ascending sorting, but we want descending sorting, so we use `>` instead - sort.SliceStable(tactics, func(i, j int) bool { - // TODO(bassosimone): should we also consider the number of samples - // we have and how recent a sample is? - return nilSafeSuccessRate(tactics[i]) > nilSafeSuccessRate(tactics[j]) - }) - - for _, t := range tactics { - // make sure we only include samples with 1+ successes; we don't want this policy - // to return what we already know it's not working and it will be the purpose of the - // fallback policy to generate new tactics to test - // - // additionally, as a precautionary and defensive measure, make sure t and t.Tactic - // are not nil before adding a malformed tactic to the return list - if t != nil && t.CountSuccess > 0 && t.Tactic != nil { - out = append(out, t.Tactic) - } + // only keep well-formed successful entries + onlySuccesses := statsDefensivelySortTacticsByDescendingSuccessRateWithAcceptPredicate( + tactics, func(st *statsTactic) bool { + return st != nil && st.Tactic != nil && st.CountSuccess > 0 + }, + ) + + // convert the statsTactic list into a list of tactics + for _, t := range onlySuccesses { + runtimex.Assert(t != nil && t.Tactic != nil && t.CountSuccess > 0, "expected well-formed *statsTactic") + out = append(out, t.Tactic) } return } diff --git a/internal/enginenetx/statspolicy_test.go b/internal/enginenetx/statspolicy_test.go index 32be8e8a3a..617979d789 100644 --- a/internal/enginenetx/statspolicy_test.go +++ b/internal/enginenetx/statspolicy_test.go @@ -13,7 +13,6 @@ import ( "github.com/ooni/probe-cli/v3/internal/netemx" "github.com/ooni/probe-cli/v3/internal/netxlite" "github.com/ooni/probe-cli/v3/internal/runtimex" - "github.com/ooni/probe-cli/v3/internal/testingx" ) func TestStatsPolicyWorkingAsIntended(t *testing.T) { @@ -135,12 +134,14 @@ func TestStatsPolicyWorkingAsIntended(t *testing.T) { t.Fatal(err) } - return newStatsManager(kvStore, log.Log) + const trimInterval = 30 * time.Second + return newStatsManager(kvStore, log.Log, trimInterval) } t.Run("when we have unique statistics", func(t *testing.T) { // create stats manager stats := createStatsManager("api.ooni.io:443", expectTacticsStats...) + defer stats.Close() // create the composed policy policy := &statsPolicy{ @@ -204,6 +205,7 @@ func TestStatsPolicyWorkingAsIntended(t *testing.T) { // create stats manager stats := createStatsManager("api.ooni.io:443", statsWithDupes...) + defer stats.Close() // create the composed policy policy := &statsPolicy{ @@ -261,6 +263,7 @@ func TestStatsPolicyWorkingAsIntended(t *testing.T) { t.Run("we avoid manipulating nil tactics", func(t *testing.T) { // create stats manager stats := createStatsManager("api.ooni.io:443", expectTacticsStats...) + defer stats.Close() // create the composed policy policy := &statsPolicy{ @@ -324,25 +327,68 @@ func TestStatsPolicyPostProcessTactics(t *testing.T) { } }) - t.Run("we filter out cases in which t or t.Tactic are nil", func(t *testing.T) { - expected := &statsTactic{} - ff := &testingx.FakeFiller{} - ff.Fill(&expected) - - input := []*statsTactic{nil, { - CountStarted: 0, - CountTCPConnectError: 0, - CountTCPConnectInterrupt: 0, - CountTLSHandshakeError: 0, - CountTLSHandshakeInterrupt: 0, - CountTLSVerificationError: 0, - CountSuccess: 0, - HistoTCPConnectError: map[string]int64{}, - HistoTLSHandshakeError: map[string]int64{}, - HistoTLSVerificationError: map[string]int64{}, - LastUpdated: time.Time{}, - Tactic: nil, - }, nil, expected} + t.Run("we filter out cases in which t or t.Tactic are nil or entry has no successes", func(t *testing.T) { + expected := &statsTactic{ + CountStarted: 7, + CountTCPConnectError: 3, + CountSuccess: 4, + HistoTCPConnectError: map[string]int64{ + "generic_timeout_error": 3, + }, + LastUpdated: time.Now().Add(-11 * time.Second), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "garr.it", + VerifyHostname: "shelob.polito.it", + }, + } + + input := []*statsTactic{ + // nil entry + nil, + + // entry with nil tactic + { + CountStarted: 0, + CountTCPConnectError: 0, + CountTCPConnectInterrupt: 0, + CountTLSHandshakeError: 0, + CountTLSHandshakeInterrupt: 0, + CountTLSVerificationError: 0, + CountSuccess: 0, + HistoTCPConnectError: map[string]int64{}, + HistoTLSHandshakeError: map[string]int64{}, + HistoTLSVerificationError: map[string]int64{}, + LastUpdated: time.Time{}, + Tactic: nil, + }, + + // another nil entry + nil, + + // an entry that should be OK + expected, + + // entry that is OK except that it does not contain any + // success so we don't expect to see it + { + CountStarted: 10, + CountTLSHandshakeError: 10, + HistoTLSHandshakeError: map[string]int64{ + "generic_timeout_error": 10, + }, + LastUpdated: time.Now().Add(-4 * time.Second), + Tactic: &httpsDialerTactic{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "polito.it", + VerifyHostname: "shelob.polito.it", + }, + }, + } got := statsPolicyPostProcessTactics(input, true)