From cf2d2ccddaa81f50dd7f21532c2f0bc2e0645a05 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 5 Jun 2024 19:04:44 +0200 Subject: [PATCH] refactor(experiment): make report goroutine safe (#1612) This diff refactors `*engine.experiment` to make the report field goroutine safe. It also moves at the bottom of experiment.go code that was intermixed with `*engine.experiment` methods. Part of https://github.com/ooni/probe/issues/2607 --- internal/engine/experiment.go | 113 +++++++++++++++++++++++----------- 1 file changed, 78 insertions(+), 35 deletions(-) diff --git a/internal/engine/experiment.go b/internal/engine/experiment.go index 466f8e333..4e0ee20ca 100644 --- a/internal/engine/experiment.go +++ b/internal/engine/experiment.go @@ -9,6 +9,7 @@ import ( "errors" "net/http" "runtime" + "sync" "time" "github.com/ooni/probe-cli/v3/internal/bytecounter" @@ -18,24 +19,52 @@ import ( "github.com/ooni/probe-cli/v3/internal/version" ) -// experiment implements Experiment. +// experimentMutableReport is the mutable experiment.report field. +// +// We isolate this into a separate data structure to ease code management. By using this +// pattern, we don't need to be concerned with locking mutexes multiple times and it's just +// a matter of using public methods exported by this struct, which are goroutine safe. +type experimentMutableReport struct { + mu sync.Mutex + report probeservices.ReportChannel +} + +// Set atomically sets the report possibly overriding a previously set report. +// +// This method is goroutine safe. +func (emr *experimentMutableReport) Set(report probeservices.ReportChannel) { + emr.mu.Lock() + emr.report = report + emr.mu.Unlock() +} + +// Get atomically gets the report possibly returning nil. +func (emr *experimentMutableReport) Get() (report probeservices.ReportChannel) { + emr.mu.Lock() + report = emr.report + emr.mu.Unlock() + return +} + +// experiment implements [model.Experiment]. type experiment struct { byteCounter *bytecounter.Counter callbacks model.ExperimentCallbacks measurer model.ExperimentMeasurer - report probeservices.ReportChannel + mrep *experimentMutableReport session *Session testName string testStartTime string testVersion string } -// newExperiment creates a new experiment given a measurer. +// newExperiment creates a new [*experiment] given a [model.ExperimentMeasurer]. func newExperiment(sess *Session, measurer model.ExperimentMeasurer) *experiment { return &experiment{ byteCounter: bytecounter.New(), callbacks: model.NewPrinterCallbacks(sess.Logger()), measurer: measurer, + mrep: &experimentMutableReport{}, session: sess, testName: measurer.ExperimentName(), testStartTime: model.MeasurementFormatTimeNowUTC(), @@ -43,46 +72,28 @@ func newExperiment(sess *Session, measurer model.ExperimentMeasurer) *experiment } } -// KibiBytesReceived implements Experiment.KibiBytesReceived. +// KibiBytesReceived implements [model.Experiment]. func (e *experiment) KibiBytesReceived() float64 { return e.byteCounter.KibiBytesReceived() } -// KibiBytesSent implements Experiment.KibiBytesSent. +// KibiBytesSent implements [model.Experiment]. func (e *experiment) KibiBytesSent() float64 { return e.byteCounter.KibiBytesSent() } -// Name implements Experiment.Name. +// Name implements [model.Experiment]. func (e *experiment) Name() string { return e.testName } -// ExperimentMeasurementSummaryKeysNotImplemented is the [model.MeasurementSummary] we use when -// the experiment TestKeys do not provide an implementation of [model.MeasurementSummary]. -type ExperimentMeasurementSummaryKeysNotImplemented struct{} - -var _ model.MeasurementSummaryKeys = &ExperimentMeasurementSummaryKeysNotImplemented{} - -// IsAnomaly implements MeasurementSummary. -func (*ExperimentMeasurementSummaryKeysNotImplemented) Anomaly() bool { - return false -} - -// MeasurementSummaryKeys returns the [model.MeasurementSummaryKeys] associated with a given measurement. -func MeasurementSummaryKeys(m *model.Measurement) model.MeasurementSummaryKeys { - if tk, ok := m.TestKeys.(model.MeasurementSummaryKeysProvider); ok { - return tk.MeasurementSummaryKeys() - } - return &ExperimentMeasurementSummaryKeysNotImplemented{} -} - -// ReportID implements Experiment.ReportID. +// ReportID implements [model.Experiment]. func (e *experiment) ReportID() string { - if e.report == nil { + report := e.mrep.Get() + if report == nil { return "" } - return e.report.ReportID() + return report.ReportID() } // experimentAsyncWrapper makes a sync experiment behave like it was async @@ -122,7 +133,7 @@ func (eaw *experimentAsyncWrapper) RunAsync( return out, nil } -// MeasureAsync implements Experiment.MeasureAsync. +// MeasureAsync implements [model.Experiment]. func (e *experiment) MeasureAsync( ctx context.Context, input string) (<-chan *model.Measurement, error) { err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes @@ -164,7 +175,7 @@ func (e *experiment) MeasureAsync( return out, nil } -// MeasureWithContext implements Experiment.MeasureWithContext. +// MeasureWithContext implements [model.Experiment]. func (e *experiment) MeasureWithContext( ctx context.Context, input string, ) (measurement *model.Measurement, err error) { @@ -183,13 +194,14 @@ func (e *experiment) MeasureWithContext( return } -// SubmitAndUpdateMeasurementContext implements Experiment.SubmitAndUpdateMeasurementContext. +// SubmitAndUpdateMeasurementContext implements [model.Experiment]. func (e *experiment) SubmitAndUpdateMeasurementContext( ctx context.Context, measurement *model.Measurement) error { - if e.report == nil { + report := e.mrep.Get() + if report == nil { return errors.New("report is not open") } - return e.report.SubmitMeasurement(ctx, measurement) + return report.SubmitMeasurement(ctx, measurement) } // newMeasurement creates a new measurement for this experiment with the given input. @@ -228,9 +240,12 @@ func (e *experiment) newMeasurement(input string) *model.Measurement { // OpenReportContext implements Experiment.OpenReportContext. func (e *experiment) OpenReportContext(ctx context.Context) error { - if e.report != nil { + // handle the case where we already opened the report + report := e.mrep.Get() + if report != nil { return nil // already open } + // use custom client to have proper byte accounting httpClient := &http.Client{ Transport: bytecounter.WrapHTTPTransport( @@ -244,12 +259,21 @@ func (e *experiment) OpenReportContext(ctx context.Context) error { return err } client.HTTPClient = httpClient // patch HTTP client to use + + // create the report template to open the report template := e.newReportTemplate() - e.report, err = client.OpenReport(ctx, template) + + // attempt to open the report + report, err = client.OpenReport(ctx, template) + + // handle the error case if err != nil { e.session.logger.Debugf("experiment: probe services error: %s", err.Error()) return err } + + // on success, assign the new report + e.mrep.Set(report) return nil } @@ -266,3 +290,22 @@ func (e *experiment) newReportTemplate() model.OOAPIReportTemplate { TestVersion: e.testVersion, } } + +// ExperimentMeasurementSummaryKeysNotImplemented is the [model.MeasurementSummary] we use when +// the experiment TestKeys do not provide an implementation of [model.MeasurementSummary]. +type ExperimentMeasurementSummaryKeysNotImplemented struct{} + +var _ model.MeasurementSummaryKeys = &ExperimentMeasurementSummaryKeysNotImplemented{} + +// IsAnomaly implements MeasurementSummary. +func (*ExperimentMeasurementSummaryKeysNotImplemented) Anomaly() bool { + return false +} + +// MeasurementSummaryKeys returns the [model.MeasurementSummaryKeys] associated with a given measurement. +func MeasurementSummaryKeys(m *model.Measurement) model.MeasurementSummaryKeys { + if tk, ok := m.TestKeys.(model.MeasurementSummaryKeysProvider); ok { + return tk.MeasurementSummaryKeys() + } + return &ExperimentMeasurementSummaryKeysNotImplemented{} +}