diff --git a/internal/engine/experiment.go b/internal/engine/experiment.go index 4e0ee20caf..41cc47a175 100644 --- a/internal/engine/experiment.go +++ b/internal/engine/experiment.go @@ -96,104 +96,6 @@ func (e *experiment) ReportID() string { return report.ReportID() } -// experimentAsyncWrapper makes a sync experiment behave like it was async -type experimentAsyncWrapper struct { - *experiment -} - -var _ model.ExperimentMeasurerAsync = &experimentAsyncWrapper{} - -// RunAsync implements ExperimentMeasurerAsync.RunAsync. -func (eaw *experimentAsyncWrapper) RunAsync( - ctx context.Context, sess model.ExperimentSession, input string, - callbacks model.ExperimentCallbacks) (<-chan *model.ExperimentAsyncTestKeys, error) { - out := make(chan *model.ExperimentAsyncTestKeys) - measurement := eaw.experiment.newMeasurement(input) - start := time.Now() - args := &model.ExperimentArgs{ - Callbacks: eaw.callbacks, - Measurement: measurement, - Session: eaw.session, - } - err := eaw.experiment.measurer.Run(ctx, args) - stop := time.Now() - if err != nil { - return nil, err - } - go func() { - defer close(out) // signal the reader we're done! - out <- &model.ExperimentAsyncTestKeys{ - Extensions: measurement.Extensions, - Input: measurement.Input, - MeasurementRuntime: stop.Sub(start).Seconds(), - TestKeys: measurement.TestKeys, - TestHelpers: measurement.TestHelpers, - } - }() - return out, nil -} - -// MeasureAsync implements [model.Experiment]. -func (e *experiment) MeasureAsync( - ctx context.Context, input string) (<-chan *model.Measurement, error) { - err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes - if err != nil { - return nil, err - } - ctx = bytecounter.WithSessionByteCounter(ctx, e.session.byteCounter) - ctx = bytecounter.WithExperimentByteCounter(ctx, e.byteCounter) - var async model.ExperimentMeasurerAsync - if v, okay := e.measurer.(model.ExperimentMeasurerAsync); okay { - async = v - } else { - async = &experimentAsyncWrapper{e} - } - in, err := async.RunAsync(ctx, e.session, input, e.callbacks) - if err != nil { - return nil, err - } - out := make(chan *model.Measurement) - go func() { - defer close(out) // we need to signal the consumer we're done - for tk := range in { - measurement := e.newMeasurement(input) - measurement.Extensions = tk.Extensions - measurement.Input = tk.Input - measurement.MeasurementRuntime = tk.MeasurementRuntime - measurement.TestHelpers = tk.TestHelpers - measurement.TestKeys = tk.TestKeys - if err := model.ScrubMeasurement(measurement, e.session.ProbeIP()); err != nil { - // If we fail to scrub the measurement then we are not going to - // submit it. Most likely causes of error here are unlikely, - // e.g., the TestKeys being not serializable. - e.session.Logger().Warnf("can't scrub measurement: %s", err.Error()) - continue - } - out <- measurement - } - }() - return out, nil -} - -// MeasureWithContext implements [model.Experiment]. -func (e *experiment) MeasureWithContext( - ctx context.Context, input string, -) (measurement *model.Measurement, err error) { - out, err := e.MeasureAsync(ctx, input) - if err != nil { - return nil, err - } - for m := range out { - if measurement == nil { - measurement = m // as documented just return the first one - } - } - if measurement == nil { - err = errors.New("experiment returned no measurements") - } - return -} - // SubmitAndUpdateMeasurementContext implements [model.Experiment]. func (e *experiment) SubmitAndUpdateMeasurementContext( ctx context.Context, measurement *model.Measurement) error { @@ -277,6 +179,76 @@ func (e *experiment) OpenReportContext(ctx context.Context) error { return nil } +// MeasureWithContext implements [model.Experiment]. +func (e *experiment) MeasureWithContext(ctx context.Context, input string) (*model.Measurement, error) { + // Here we ensure that we have already looked up the probe location + // information such that we correctly populate the measurement and also + // VERY IMPORTANTLY to scrub the IP address from the measurement. + // + // TODO(bassosimone,DecFox): historically we did this only for measuring + // and not for opening a report, which probably is not correct. Because the + // function call is idempotent, call it also when opening a report? + if err := e.session.MaybeLookupLocationContext(ctx); err != nil { + return nil, err + } + + // Tweak the context such that the bytes sent and received are accounted + // to both the session's byte counter and to the experiment's byte counter. + ctx = bytecounter.WithSessionByteCounter(ctx, e.session.byteCounter) + ctx = bytecounter.WithExperimentByteCounter(ctx, e.byteCounter) + + // Create a new measurement that the experiment measurer will finish filling + // by adding the test keys etc. Please, note that, as of 2024-06-05, we're using + // the measurement Input to provide input to an experiment. We'll probably + // change this, when we'll have finished implementing richer input. + measurement := e.newMeasurement(input) + + // Record when we started the experiment, to compute the runtime. + start := time.Now() + + // Prepare the arguments for the experiment measurer + args := &model.ExperimentArgs{ + Callbacks: e.callbacks, + Measurement: measurement, + Session: e.session, + } + + // Invoke the measurer. Conventionally, an error being returned here + // indicates that something went wrong during the measurement. For example, + // it could be that the user provided us with a malformed input. In case + // there's censorship, by all means the experiment should return a nil error + // and fill the measurement accordingly. + err := e.measurer.Run(ctx, args) + + // Record when the experiment finished running. + stop := time.Now() + + // Handle the case where there was a fundamental error. + if err != nil { + return nil, err + } + + // Make sure we record the measurement runtime. + measurement.MeasurementRuntime = stop.Sub(start).Seconds() + + // Scub the measurement removing the probe IP addr from it. We are 100% sure we know + // our own IP addr, since we called MaybeLookupLocation above. Obviously, we aren't + // going to submit the measurement in case we can't scrub it, so we just return an error + // if this specific corner case happens. + // + // TODO(bassosimone,DecFox): a dual stack client MAY be such that we discover its IPv4 + // address but the IPv6 address is present inside the measurement. We should ensure that + // we improve our discovering capabilities to also cover this specific case. + if err := model.ScrubMeasurement(measurement, e.session.ProbeIP()); err != nil { + e.session.Logger().Warnf("can't scrub measurement: %s", err.Error()) + return nil, err + } + + // We're all good! Let us return the measurement to the caller, which will + // addtionally take care that we're submitting it, if needed. + return measurement, nil +} + func (e *experiment) newReportTemplate() model.OOAPIReportTemplate { return model.OOAPIReportTemplate{ DataFormatVersion: model.OOAPIReportDefaultDataFormatVersion, diff --git a/internal/mocks/experiment.go b/internal/mocks/experiment.go index daa28eb0bd..3c833b8cb8 100644 --- a/internal/mocks/experiment.go +++ b/internal/mocks/experiment.go @@ -16,8 +16,6 @@ type Experiment struct { MockReportID func() string - MockMeasureAsync func(ctx context.Context, input string) (<-chan *model.Measurement, error) - MockMeasureWithContext func( ctx context.Context, input string) (measurement *model.Measurement, err error) @@ -45,11 +43,6 @@ func (e *Experiment) ReportID() string { return e.MockReportID() } -func (e *Experiment) MeasureAsync( - ctx context.Context, input string) (<-chan *model.Measurement, error) { - return e.MockMeasureAsync(ctx, input) -} - func (e *Experiment) MeasureWithContext( ctx context.Context, input string) (measurement *model.Measurement, err error) { return e.MockMeasureWithContext(ctx, input) diff --git a/internal/mocks/experiment_test.go b/internal/mocks/experiment_test.go index dc31b680bc..017752d178 100644 --- a/internal/mocks/experiment_test.go +++ b/internal/mocks/experiment_test.go @@ -57,22 +57,6 @@ func TestExperiment(t *testing.T) { } }) - t.Run("MeasureAsync", func(t *testing.T) { - expected := errors.New("mocked err") - e := &Experiment{ - MockMeasureAsync: func(ctx context.Context, input string) (<-chan *model.Measurement, error) { - return nil, expected - }, - } - out, err := e.MeasureAsync(context.Background(), "xo") - if !errors.Is(err, expected) { - t.Fatal("unexpected err", err) - } - if out != nil { - t.Fatal("expected nil") - } - }) - t.Run("MeasureWithContext", func(t *testing.T) { expected := errors.New("mocked err") e := &Experiment{ diff --git a/internal/model/experiment.go b/internal/model/experiment.go index 6802bda9e8..f070cddf21 100644 --- a/internal/model/experiment.go +++ b/internal/model/experiment.go @@ -53,57 +53,13 @@ type ExperimentSession interface { UserAgent() string } -// ExperimentAsyncTestKeys is the type of test keys returned by an experiment -// when running in async fashion rather than in sync fashion. -type ExperimentAsyncTestKeys struct { - // Extensions contains the extensions used by this experiment. - Extensions map[string]int64 - - // Input is the input this measurement refers to. - Input MeasurementInput - - // MeasurementRuntime is the total measurement runtime. - MeasurementRuntime float64 - - // TestHelpers contains the test helpers used in the experiment - TestHelpers map[string]interface{} - - // TestKeys contains the actual test keys. - TestKeys interface{} -} - -// ExperimentMeasurerAsync is a measurer that can run in async fashion. -// -// Currently this functionality is optional, but we will likely -// migrate all experiments to use this functionality in 2022. -type ExperimentMeasurerAsync interface { - // RunAsync runs the experiment in async fashion. - // - // Arguments: - // - // - ctx is the context for deadline/timeout/cancellation - // - // - sess is the measurement session - // - // - input is the input URL to measure - // - // - callbacks contains the experiment callbacks - // - // Returns either a channel where TestKeys are posted or an error. - // - // An error indicates that specific preconditions for running the experiment - // are not met (e.g., the input URL is invalid). - // - // On success, the experiment will post on the channel each new - // measurement until it is done and closes the channel. - RunAsync(ctx context.Context, sess ExperimentSession, input string, - callbacks ExperimentCallbacks) (<-chan *ExperimentAsyncTestKeys, error) -} - -// ExperimentCallbacks contains experiment event-handling callbacks +// ExperimentCallbacks contains experiment event-handling callbacks. type ExperimentCallbacks interface { - // OnProgress provides information about an experiment progress. - OnProgress(percentage float64, message string) + // OnProgress provides information about the experiment's progress. + // + // The prog field is a number between 0.0 and 1.0 representing progress, where + // 0.0 corresponds to 0% and 1.0 corresponds to 100%. + OnProgress(prog float64, message string) } // PrinterCallbacks is the default event handler @@ -166,51 +122,21 @@ type Experiment interface { // ReportID returns the open report's ID, if we have opened a report // successfully before, or an empty string, otherwise. - // - // Deprecated: new code should use a Submitter. ReportID() string - // MeasureAsync runs an async measurement. This operation could post - // one or more measurements onto the returned channel. We'll close the - // channel when we've emitted all the measurements. - // - // Arguments: - // - // - ctx is the context for deadline/cancellation/timeout; - // - // - input is the input (typically a URL but it could also be - // just an endpoint or an empty string for input-less experiments - // such as, e.g., ndt7 and dash). - // - // Return value: - // - // - on success, channel where to post measurements (the channel - // will be closed when done) and nil error; - // - // - on failure, nil channel and non-nil error. - MeasureAsync(ctx context.Context, input string) (<-chan *Measurement, error) - // MeasureWithContext performs a synchronous measurement. // // Return value: strictly either a non-nil measurement and // a nil error or a nil measurement and a non-nil error. - // - // CAVEAT: while this API is perfectly fine for experiments that - // return a single measurement, it will only return the first measurement - // when used with an asynchronous experiment. MeasureWithContext(ctx context.Context, input string) (measurement *Measurement, err error) // SubmitAndUpdateMeasurementContext submits a measurement and updates the // fields whose value has changed as part of the submission. - // - // Deprecated: new code should use a Submitter. SubmitAndUpdateMeasurementContext( ctx context.Context, measurement *Measurement) error // OpenReportContext will open a report using the given context // to possibly limit the lifetime of this operation. - // - // Deprecated: new code should use a Submitter. OpenReportContext(ctx context.Context) error } diff --git a/internal/oonirun/experiment.go b/internal/oonirun/experiment.go index d5175f6410..611dd23fbe 100644 --- a/internal/oonirun/experiment.go +++ b/internal/oonirun/experiment.go @@ -242,12 +242,12 @@ type experimentWrapper struct { total int } -func (ew *experimentWrapper) MeasureAsync( - ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { +func (ew *experimentWrapper) MeasureWithContext( + ctx context.Context, input string, idx int) (*model.Measurement, error) { if input != "" { ew.logger.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input) } - return ew.child.MeasureAsync(ctx, input, idx) + return ew.child.MeasureWithContext(ctx, input, idx) } // experimentSubmitterWrapper implements a submission policy where we don't diff --git a/internal/oonirun/experiment_test.go b/internal/oonirun/experiment_test.go index 022ba49c64..cd84e89b23 100644 --- a/internal/oonirun/experiment_test.go +++ b/internal/oonirun/experiment_test.go @@ -49,16 +49,11 @@ func TestExperimentRunWithFailureToSubmitAndShuffle(t *testing.T) { }, MockNewExperiment: func() model.Experiment { exp := &mocks.Experiment{ - MockMeasureAsync: func(ctx context.Context, input string) (<-chan *model.Measurement, error) { - out := make(chan *model.Measurement) - go func() { - defer close(out) - ff := &testingx.FakeFiller{} - var meas model.Measurement - ff.Fill(&meas) - out <- &meas - }() - return out, nil + MockMeasureWithContext: func(ctx context.Context, input string) (*model.Measurement, error) { + ff := &testingx.FakeFiller{} + var meas model.Measurement + ff.Fill(&meas) + return &meas, nil }, MockKibiBytesReceived: func() float64 { calledKibiBytesReceived++ diff --git a/internal/oonirun/inputprocessor.go b/internal/oonirun/inputprocessor.go index c764d61df9..08e0574b18 100644 --- a/internal/oonirun/inputprocessor.go +++ b/internal/oonirun/inputprocessor.go @@ -10,15 +10,13 @@ import ( // InputProcessorExperiment is the Experiment // according to InputProcessor. type InputProcessorExperiment interface { - MeasureAsync( - ctx context.Context, input string) (<-chan *model.Measurement, error) + MeasureWithContext(ctx context.Context, input string) (*model.Measurement, error) } // InputProcessorExperimentWrapper is a wrapper for an // Experiment that also allow to pass around the input index. type InputProcessorExperimentWrapper interface { - MeasureAsync( - ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) + MeasureWithContext(ctx context.Context, input string, idx int) (*model.Measurement, error) } // NewInputProcessorExperimentWrapper creates a new @@ -32,9 +30,9 @@ type inputProcessorExperimentWrapper struct { exp InputProcessorExperiment } -func (ipew inputProcessorExperimentWrapper) MeasureAsync( - ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { - return ipew.exp.MeasureAsync(ctx, input) +func (ipew inputProcessorExperimentWrapper) MeasureWithContext( + ctx context.Context, input string, idx int) (*model.Measurement, error) { + return ipew.exp.MeasureWithContext(ctx, input) } var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{} @@ -142,29 +140,21 @@ func (ip *InputProcessor) run(ctx context.Context) (int, error) { return stopMaxRuntime, nil } input := url.URL - var measurements []*model.Measurement - source, err := ip.Experiment.MeasureAsync(ctx, input, idx) + meas, err := ip.Experiment.MeasureWithContext(ctx, input, idx) if err != nil { return 0, err } - // NOTE: we don't want to intermix measuring with submitting - // therefore we collect all measurements first - for meas := range source { - measurements = append(measurements, meas) + meas.AddAnnotations(ip.Annotations) + meas.Options = ip.Options + err = ip.Submitter.Submit(ctx, idx, meas) + if err != nil { + return 0, err } - for _, meas := range measurements { - meas.AddAnnotations(ip.Annotations) - meas.Options = ip.Options - err = ip.Submitter.Submit(ctx, idx, meas) - if err != nil { - return 0, err - } - // Note: must be after submission because submission modifies - // the measurement to include the report ID. - err = ip.Saver.SaveMeasurement(idx, meas) - if err != nil { - return 0, err - } + // Note: must be after submission because submission modifies + // the measurement to include the report ID. + err = ip.Saver.SaveMeasurement(idx, meas) + if err != nil { + return 0, err } } return stopNormal, nil diff --git a/internal/oonirun/inputprocessor_test.go b/internal/oonirun/inputprocessor_test.go index 3808bc2247..83d32e9f09 100644 --- a/internal/oonirun/inputprocessor_test.go +++ b/internal/oonirun/inputprocessor_test.go @@ -15,8 +15,8 @@ type FakeInputProcessorExperiment struct { M []*model.Measurement } -func (fipe *FakeInputProcessorExperiment) MeasureAsync( - ctx context.Context, input string) (<-chan *model.Measurement, error) { +func (fipe *FakeInputProcessorExperiment) MeasureWithContext( + ctx context.Context, input string) (*model.Measurement, error) { if fipe.Err != nil { return nil, fipe.Err } @@ -30,12 +30,7 @@ func (fipe *FakeInputProcessorExperiment) MeasureAsync( m.AddAnnotation("foo", "baz") // would be bar below m.Input = model.MeasurementInput(input) fipe.M = append(fipe.M, m) - out := make(chan *model.Measurement) - go func() { - defer close(out) - out <- m - }() - return out, nil + return m, nil } func TestInputProcessorMeasurementFailed(t *testing.T) { diff --git a/internal/oonirun/v1_test.go b/internal/oonirun/v1_test.go index 76121a5f8f..6e6e473e25 100644 --- a/internal/oonirun/v1_test.go +++ b/internal/oonirun/v1_test.go @@ -28,16 +28,11 @@ func newMinimalFakeSession() *mocks.Session { }, MockNewExperiment: func() model.Experiment { exp := &mocks.Experiment{ - MockMeasureAsync: func(ctx context.Context, input string) (<-chan *model.Measurement, error) { - out := make(chan *model.Measurement) - go func() { - defer close(out) - ff := &testingx.FakeFiller{} - var meas model.Measurement - ff.Fill(&meas) - out <- &meas - }() - return out, nil + MockMeasureWithContext: func(ctx context.Context, input string) (*model.Measurement, error) { + ff := &testingx.FakeFiller{} + var meas model.Measurement + ff.Fill(&meas) + return &meas, nil }, MockKibiBytesReceived: func() float64 { return 1.1 diff --git a/internal/oonirun/v2_test.go b/internal/oonirun/v2_test.go index 8f2260f680..f2eab58f5b 100644 --- a/internal/oonirun/v2_test.go +++ b/internal/oonirun/v2_test.go @@ -379,7 +379,7 @@ func TestV2MeasureDescriptor(t *testing.T) { }, MockNewExperiment: func() model.Experiment { exp := &mocks.Experiment{ - MockMeasureAsync: func(ctx context.Context, input string) (<-chan *model.Measurement, error) { + MockMeasureWithContext: func(ctx context.Context, input string) (*model.Measurement, error) { return nil, expected }, MockKibiBytesReceived: func() float64 {