Skip to content

Commit

Permalink
Add support for returning measurement_uid as part of oonimkall API (#…
Browse files Browse the repository at this point in the history
hellais authored Dec 11, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 6a8a893 commit a0e3456
Showing 22 changed files with 85 additions and 66 deletions.
2 changes: 1 addition & 1 deletion cmd/ooniprobe/internal/nettests/nettests.go
Original file line number Diff line number Diff line change
@@ -215,7 +215,7 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []model.Experim
// Implementation note: SubmitMeasurement will fail here if we did fail
// to open the report but we still want to continue. There will be a
// bit of a spew in the logs, perhaps, but stopping seems less efficient.
if err := exp.SubmitAndUpdateMeasurementContext(context.Background(), measurement); err != nil {
if _, err := exp.SubmitAndUpdateMeasurementContext(context.Background(), measurement); err != nil {
log.Debug(color.RedString("failure.measurement_submission"))
if err := db.UploadFailed(c.msmts[idx64], err.Error()); err != nil {
return errors.Wrap(err, "failed to mark upload as failed")
2 changes: 1 addition & 1 deletion internal/cmd/oonireport/oonireport.go
Original file line number Diff line number Diff line change
@@ -106,7 +106,7 @@ func submitAll(ctx context.Context, lines []string, subm model.Submitter) (int,
for _, line := range lines {
mm := toMeasurement(line)
// submit the measurement
err := subm.Submit(ctx, mm)
_, err := subm.Submit(ctx, mm)
if err != nil {
return submitted, err
}
4 changes: 2 additions & 2 deletions internal/engine/experiment.go
Original file line number Diff line number Diff line change
@@ -104,10 +104,10 @@ func (e *experiment) ReportID() string {

// SubmitAndUpdateMeasurementContext implements [model.Experiment].
func (e *experiment) SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error {
ctx context.Context, measurement *model.Measurement) (string, error) {
report := e.mrep.Get()
if report == nil {
return errors.New("report is not open")
return "", errors.New("report is not open")
}
return report.SubmitMeasurement(ctx, measurement)
}
6 changes: 3 additions & 3 deletions internal/engine/experiment_integration_test.go
Original file line number Diff line number Diff line change
@@ -280,7 +280,7 @@ func runexperimentflow(t *testing.T, experiment model.Experiment, input string)
}
filename := tempfile.Name()
tempfile.Close()
err = experiment.SubmitAndUpdateMeasurementContext(ctx, measurement)
_, err = experiment.SubmitAndUpdateMeasurementContext(ctx, measurement)
if err != nil {
t.Fatal(err)
}
@@ -323,7 +323,7 @@ func TestOpenReportIdempotent(t *testing.T) {
t.Fatal("unexpected initial report ID")
}
ctx := context.Background()
if err := exp.SubmitAndUpdateMeasurementContext(ctx, &model.Measurement{}); err == nil {
if _, err := exp.SubmitAndUpdateMeasurementContext(ctx, &model.Measurement{}); err == nil {
t.Fatal("we should not be able to submit before OpenReport")
}
err = exp.OpenReportContext(ctx)
@@ -403,7 +403,7 @@ func TestSubmitAndUpdateMeasurementWithClosedReport(t *testing.T) {
}
exp := builder.NewExperiment()
m := new(model.Measurement)
err = exp.SubmitAndUpdateMeasurementContext(context.Background(), m)
_, err = exp.SubmitAndUpdateMeasurementContext(context.Background(), m)
if err == nil {
t.Fatal("expected an error here")
}
4 changes: 2 additions & 2 deletions internal/mocks/experiment.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ type Experiment struct {
MockSaveMeasurement func(measurement *model.Measurement, filePath string) error

MockSubmitAndUpdateMeasurementContext func(
ctx context.Context, measurement *model.Measurement) error
ctx context.Context, measurement *model.Measurement) (string, error)

MockOpenReportContext func(ctx context.Context) error
}
@@ -53,7 +53,7 @@ func (e *Experiment) SaveMeasurement(measurement *model.Measurement, filePath st
}

func (e *Experiment) SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error {
ctx context.Context, measurement *model.Measurement) (string, error) {
return e.MockSubmitAndUpdateMeasurementContext(ctx, measurement)
}

6 changes: 3 additions & 3 deletions internal/mocks/experiment_test.go
Original file line number Diff line number Diff line change
@@ -93,11 +93,11 @@ func TestExperiment(t *testing.T) {
t.Run("SubmitAndUpdateMeasurementContext", func(t *testing.T) {
expected := errors.New("mocked err")
e := &Experiment{
MockSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) error {
return expected
MockSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) (string, error) {
return "", expected
},
}
err := e.SubmitAndUpdateMeasurementContext(context.Background(), &model.Measurement{})
_, err := e.SubmitAndUpdateMeasurementContext(context.Background(), &model.Measurement{})
if !errors.Is(err, expected) {
t.Fatal("unexpected err", err)
}
4 changes: 2 additions & 2 deletions internal/mocks/submitter.go
Original file line number Diff line number Diff line change
@@ -8,10 +8,10 @@ import (

// Submitter mocks model.Submitter.
type Submitter struct {
MockSubmit func(ctx context.Context, m *model.Measurement) error
MockSubmit func(ctx context.Context, m *model.Measurement) (string, error)
}

// Submit calls MockSubmit
func (s *Submitter) Submit(ctx context.Context, m *model.Measurement) error {
func (s *Submitter) Submit(ctx context.Context, m *model.Measurement) (string, error) {
return s.MockSubmit(ctx, m)
}
6 changes: 3 additions & 3 deletions internal/mocks/submitter_test.go
Original file line number Diff line number Diff line change
@@ -12,11 +12,11 @@ func TestSubmitter(t *testing.T) {
t.Run("Submit", func(t *testing.T) {
expect := errors.New("mocked error")
s := &Submitter{
MockSubmit: func(ctx context.Context, m *model.Measurement) error {
return expect
MockSubmit: func(ctx context.Context, m *model.Measurement) (string, error) {
return "", expect
},
}
err := s.Submit(context.Background(), &model.Measurement{})
_, err := s.Submit(context.Background(), &model.Measurement{})
if !errors.Is(err, expect) {
t.Fatal("unexpected err", err)
}
4 changes: 2 additions & 2 deletions internal/model/experiment.go
Original file line number Diff line number Diff line change
@@ -186,7 +186,7 @@ type Experiment interface {
// SubmitAndUpdateMeasurementContext submits a measurement and updates the
// fields whose value has changed as part of the submission.
SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *Measurement) error
ctx context.Context, measurement *Measurement) (string, error)

// OpenReportContext will open a report using the given context
// to possibly limit the lifetime of this operation.
@@ -322,7 +322,7 @@ type ExperimentTargetLoader interface {
type Submitter interface {
// Submit submits the measurement and updates its
// report ID field in case of success.
Submit(ctx context.Context, m *Measurement) error
Submit(ctx context.Context, m *Measurement) (string, error)
}

// Saver saves a measurement on some persistent storage.
7 changes: 4 additions & 3 deletions internal/oonirun/experiment.go
Original file line number Diff line number Diff line change
@@ -263,10 +263,11 @@ type experimentSubmitterWrapper struct {
logger model.Logger
}

func (sw *experimentSubmitterWrapper) Submit(ctx context.Context, idx int, m *model.Measurement) error {
if err := sw.child.Submit(ctx, idx, m); err != nil {
func (sw *experimentSubmitterWrapper) Submit(ctx context.Context, idx int, m *model.Measurement) (string, error) {
mstUID, err := sw.child.Submit(ctx, idx, m)
if err != nil {
sw.logger.Warnf("submitting measurement failed: %s", err.Error())
}
// policy: we do not stop the loop if measurement submission fails
return nil
return mstUID, nil
}
4 changes: 2 additions & 2 deletions internal/oonirun/experiment_test.go
Original file line number Diff line number Diff line change
@@ -93,9 +93,9 @@ func TestExperimentRunWithFailureToSubmitAndShuffle(t *testing.T) {
newTargetLoaderFn: nil,
newSubmitterFn: func(ctx context.Context) (model.Submitter, error) {
subm := &mocks.Submitter{
MockSubmit: func(ctx context.Context, m *model.Measurement) error {
MockSubmit: func(ctx context.Context, m *model.Measurement) (string, error) {
failedToSubmit++
return errors.New("mocked error")
return "", errors.New("mocked error")
},
}
return subm, nil
6 changes: 3 additions & 3 deletions internal/oonirun/inputprocessor.go
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ func (ipsw inputProcessorSaverWrapper) SaveMeasurement(
// InputProcessorSubmitterWrapper is InputProcessor's
// wrapper for a Submitter implementation.
type InputProcessorSubmitterWrapper interface {
Submit(ctx context.Context, idx int, m *model.Measurement) error
Submit(ctx context.Context, idx int, m *model.Measurement) (string, error)
}

type inputProcessorSubmitterWrapper struct {
@@ -101,7 +101,7 @@ func NewInputProcessorSubmitterWrapper(submitter Submitter) InputProcessorSubmit
}

func (ipsw inputProcessorSubmitterWrapper) Submit(
ctx context.Context, idx int, m *model.Measurement) error {
ctx context.Context, idx int, m *model.Measurement) (string, error) {
return ipsw.submitter.Submit(ctx, m)
}

@@ -141,7 +141,7 @@ func (ip *InputProcessor) run(ctx context.Context) (int, error) {
return 0, err
}
meas.AddAnnotations(ip.Annotations)
err = ip.Submitter.Submit(ctx, idx, meas)
_, err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
// TODO(bassosimone): when re-reading this code, I find it confusing that
// we return on error because I am always like "wait, this is not the right
4 changes: 2 additions & 2 deletions internal/oonirun/inputprocessor_test.go
Original file line number Diff line number Diff line change
@@ -55,9 +55,9 @@ type FakeInputProcessorSubmitter struct {
}

func (fips *FakeInputProcessorSubmitter) Submit(
ctx context.Context, m *model.Measurement) error {
ctx context.Context, m *model.Measurement) (string, error) {
fips.M = append(fips.M, m)
return fips.Err
return "", fips.Err
}

func TestInputProcessorSubmissionFailed(t *testing.T) {
6 changes: 3 additions & 3 deletions internal/oonirun/submitter.go
Original file line number Diff line number Diff line change
@@ -46,8 +46,8 @@ func NewSubmitter(ctx context.Context, config SubmitterConfig) (Submitter, error

type stubSubmitter struct{}

func (stubSubmitter) Submit(ctx context.Context, m *model.Measurement) error {
return nil
func (stubSubmitter) Submit(ctx context.Context, m *model.Measurement) (string, error) {
return "", nil
}

var _ Submitter = stubSubmitter{}
@@ -57,7 +57,7 @@ type realSubmitter struct {
logger model.Logger
}

func (rs realSubmitter) Submit(ctx context.Context, m *model.Measurement) error {
func (rs realSubmitter) Submit(ctx context.Context, m *model.Measurement) (string, error) {
rs.logger.Info("submitting measurement to OONI collector; please be patient...")
return rs.subm.Submit(ctx, m)
}
9 changes: 5 additions & 4 deletions internal/oonirun/submitter_test.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,8 @@ func TestSubmitterNotEnabled(t *testing.T) {
t.Fatal("we did not get a stubSubmitter instance")
}
m := new(model.Measurement)
if err := submitter.Submit(ctx, m); err != nil {
_, err = submitter.Submit(ctx, m)
if err != nil {
t.Fatal(err)
}
}
@@ -32,11 +33,11 @@ type FakeSubmitter struct {
Error error
}

func (fs *FakeSubmitter) Submit(ctx context.Context, m *model.Measurement) error {
func (fs *FakeSubmitter) Submit(ctx context.Context, m *model.Measurement) (string, error) {
if fs.Calls != nil {
fs.Calls.Add(1)
}
return fs.Error
return "", fs.Error
}

var _ Submitter = &FakeSubmitter{}
@@ -83,7 +84,7 @@ func TestNewSubmitterWithFailedSubmission(t *testing.T) {
t.Fatal(err)
}
m := new(model.Measurement)
err = submitter.Submit(context.Background(), m)
_, err = submitter.Submit(context.Background(), m)
if !errors.Is(err, expected) {
t.Fatalf("not the error we expected: %+v", err)
}
2 changes: 1 addition & 1 deletion internal/oonirun/v2_test.go
Original file line number Diff line number Diff line change
@@ -486,7 +486,7 @@ func TestV2MeasureDescriptor(t *testing.T) {
// represents a fundamental failure in setting up the experiment
sess.MockNewSubmitter = func(ctx context.Context) (model.Submitter, error) {
subm := &mocks.Submitter{
MockSubmit: func(ctx context.Context, m *model.Measurement) error {
MockSubmit: func(ctx context.Context, m *model.Measurement) (string, error) {
panic("should not be called")
},
}
14 changes: 7 additions & 7 deletions internal/probeservices/collector.go
Original file line number Diff line number Diff line change
@@ -101,15 +101,15 @@ func (r reportChan) CanSubmit(m *model.Measurement) bool {
// such that it contains the report ID for which it has been
// submitted. Otherwise, we'll set the report ID to the empty
// string, so that you know which measurements weren't submitted.
func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) error {
func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) (string, error) {
// TODO(bassosimone): do we need to prevent measurement submission
// if the measurement isn't consistent with the orig template?

m.ReportID = r.ID

URL, err := urlx.ResolveReference(r.client.BaseURL, fmt.Sprintf("/report/%s", r.ID), "")
if err != nil {
return err
return "", err
}

apiReq := model.OOAPICollectorUpdateRequest{
@@ -131,13 +131,13 @@ func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement)

if err != nil {
m.ReportID = ""
return err
return "", err
}

// TODO(bassosimone): we should use the session logger here but for now this stopgap
// solution will allow observing the measurement URL for CLI users.
log.Printf("Measurement URL: https://explorer.ooni.org/m/%s", updateResponse.MeasurementUID)
return nil
return updateResponse.MeasurementUID, nil
}

// ReportID returns the report ID.
@@ -150,7 +150,7 @@ func (r reportChan) ReportID() string {
type ReportChannel interface {
CanSubmit(m *model.Measurement) bool
ReportID() string
SubmitMeasurement(ctx context.Context, m *model.Measurement) error
SubmitMeasurement(ctx context.Context, m *model.Measurement) (string, error)
}

var _ ReportChannel = &reportChan{}
@@ -182,14 +182,14 @@ func NewSubmitter(opener ReportOpener, logger model.Logger) *Submitter {

// Submit submits the current measurement to the OONI backend created using
// the ReportOpener passed to the constructor.
func (sub *Submitter) Submit(ctx context.Context, m *model.Measurement) error {
func (sub *Submitter) Submit(ctx context.Context, m *model.Measurement) (string, error) {
var err error
sub.mu.Lock()
defer sub.mu.Unlock()
if sub.channel == nil || !sub.channel.CanSubmit(m) {
sub.channel, err = sub.opener.OpenReport(ctx, NewReportTemplate(m))
if err != nil {
return err
return "", err
}
sub.logger.Infof("New reportID: %s", sub.channel.ReportID())
}
42 changes: 26 additions & 16 deletions internal/probeservices/collector_test.go
Original file line number Diff line number Diff line change
@@ -108,7 +108,8 @@ func TestReportLifecycle(t *testing.T) {

// attempt to submit the measurement to the backend, which should succeed
// since we've just opened a report for it
if err = report.SubmitMeasurement(context.Background(), &measurement); err != nil {
_, err = report.SubmitMeasurement(context.Background(), &measurement)
if err != nil {
t.Fatal(err)
}

@@ -168,7 +169,8 @@ func TestReportLifecycle(t *testing.T) {

// attempt to submit the measurement to the backend, which should succeed
// since we've just opened a report for it
if err = report.SubmitMeasurement(context.Background(), &measurement); err != nil {
_, err = report.SubmitMeasurement(context.Background(), &measurement)
if err != nil {
t.Fatal(err)
}

@@ -231,7 +233,8 @@ func TestReportLifecycle(t *testing.T) {

// attempt to submit the measurement to the backend, which should succeed
// since we've just opened a report for it
if err = report.SubmitMeasurement(context.Background(), &measurement); err != nil {
_, err = report.SubmitMeasurement(context.Background(), &measurement)
if err != nil {
t.Fatal(err)
}

@@ -376,7 +379,7 @@ func TestReportLifecycle(t *testing.T) {
}

// update the report
err := rc.SubmitMeasurement(context.Background(), &model.Measurement{})
_, err := rc.SubmitMeasurement(context.Background(), &model.Measurement{})

// we do expect an error
if !errors.Is(err, netxlite.ECONNRESET) {
@@ -418,7 +421,7 @@ func TestReportLifecycle(t *testing.T) {
}

// update the report
err := rc.SubmitMeasurement(context.Background(), &model.Measurement{})
_, err := rc.SubmitMeasurement(context.Background(), &model.Measurement{})

// we do expect an error
if err == nil || err.Error() != "unexpected end of JSON input" {
@@ -444,7 +447,7 @@ func TestReportLifecycle(t *testing.T) {
}

// update the report
err := rc.SubmitMeasurement(context.Background(), &model.Measurement{})
_, err := rc.SubmitMeasurement(context.Background(), &model.Measurement{})

// we do expect an error
if err == nil || err.Error() != `parse "\t\t\t": net/url: invalid control character in URL` {
@@ -665,7 +668,8 @@ func TestReportLifecycle(t *testing.T) {

// attempt to submit the measurement to the backend, which should succeed
// since we've just opened a report for it
if err = report.SubmitMeasurement(context.Background(), &measurement); err != nil {
_, err = report.SubmitMeasurement(context.Background(), &measurement)
if err != nil {
t.Fatal(err)
}

@@ -687,14 +691,14 @@ func (rrc *RecordingReportChannel) CanSubmit(m *model.Measurement) bool {
return reflect.DeepEqual(NewReportTemplate(m), rrc.tmpl)
}

func (rrc *RecordingReportChannel) SubmitMeasurement(ctx context.Context, m *model.Measurement) error {
func (rrc *RecordingReportChannel) SubmitMeasurement(ctx context.Context, m *model.Measurement) (string, error) {
if ctx.Err() != nil {
return ctx.Err()
return "", ctx.Err()
}
rrc.mu.Lock()
defer rrc.mu.Unlock()
rrc.m = append(rrc.m, m)
return nil
return "", nil
}

func (rrc *RecordingReportChannel) Close(ctx context.Context) error {
@@ -755,15 +759,18 @@ func TestSubmitterLifecyle(t *testing.T) {
submitter := NewSubmitter(rro, log.Log)
ctx := context.Background()
m1 := makeMeasurementWithoutTemplate("example")
if err := submitter.Submit(ctx, m1); err != nil {
_, err := submitter.Submit(ctx, m1)
if err != nil {
t.Fatal(err)
}
m2 := makeMeasurementWithoutTemplate("example")
if err := submitter.Submit(ctx, m2); err != nil {
_, err = submitter.Submit(ctx, m2)
if err != nil {
t.Fatal(err)
}
m3 := makeMeasurementWithoutTemplate("example_extended")
if err := submitter.Submit(ctx, m3); err != nil {
_, err = submitter.Submit(ctx, m3)
if err != nil {
t.Fatal(err)
}
if len(rro.channels) != 2 {
@@ -783,15 +790,18 @@ func TestSubmitterCannotOpenNewChannel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // fail immediately
m1 := makeMeasurementWithoutTemplate("example")
if err := submitter.Submit(ctx, m1); !errors.Is(err, context.Canceled) {
_, err := submitter.Submit(ctx, m1)
if !errors.Is(err, context.Canceled) {
t.Fatal("not the error we expected")
}
m2 := makeMeasurementWithoutTemplate("example")
if err := submitter.Submit(ctx, m2); !errors.Is(err, context.Canceled) {
_, err = submitter.Submit(ctx, m2)
if !errors.Is(err, context.Canceled) {
t.Fatal(err)
}
m3 := makeMeasurementWithoutTemplate("example_extended")
if err := submitter.Submit(ctx, m3); !errors.Is(err, context.Canceled) {
_, err = submitter.Submit(ctx, m3)
if !errors.Is(err, context.Canceled) {
t.Fatal(err)
}
if len(rro.channels) != 0 {
7 changes: 6 additions & 1 deletion pkg/oonimkall/session.go
Original file line number Diff line number Diff line change
@@ -302,6 +302,9 @@ type SubmitMeasurementResults struct {

// UpdatedReportID is the report ID used for the measurement.
UpdatedReportID string

// MeasurementUID is the measurement unique identifier returned from the backend
MeasurementUID string
}

// Submit submits the given measurement and returns the results.
@@ -322,14 +325,16 @@ func (sess *Session) Submit(ctx *Context, measurement string) (*SubmitMeasuremen
if err := json.Unmarshal([]byte(measurement), &mm); err != nil {
return nil, err
}
if err := sess.submitter.Submit(ctx.ctx, &mm); err != nil {
muid, err := sess.submitter.Submit(ctx.ctx, &mm)
if err != nil {
return nil, err
}
data, err := json.Marshal(mm)
runtimex.PanicOnError(err, "json.Marshal should not fail here")
return &SubmitMeasurementResults{
UpdatedMeasurement: string(data),
UpdatedReportID: mm.ReportID,
MeasurementUID: muid,
}, nil
}

1 change: 1 addition & 0 deletions pkg/oonimkall/taskmodel.go
Original file line number Diff line number Diff line change
@@ -112,6 +112,7 @@ type eventMeasurementGeneric struct {
Idx int64 `json:"idx"`
Input string `json:"input"`
JSONStr string `json:"json_str,omitempty"`
MeasurementUID string `json:"measurement_uid,omitempty"`
}

type eventStatusEnd struct {
3 changes: 2 additions & 1 deletion pkg/oonimkall/taskrunner.go
Original file line number Diff line number Diff line change
@@ -352,13 +352,14 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
// if possible, submit the measurement to the OONI backend
if !r.settings.Options.NoCollector {
logger.Info("Submitting measurement... please, be patient")
err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m)
muid, err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m)
warnOnFailure(logger, "cannot submit measurement", err)
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
Idx: int64(idx),
Input: target.Input(),
JSONStr: string(data),
Failure: measurementSubmissionFailure(err),
MeasurementUID: muid,
})
}

8 changes: 4 additions & 4 deletions pkg/oonimkall/taskrunner_test.go
Original file line number Diff line number Diff line change
@@ -228,8 +228,8 @@ func TestTaskRunnerRun(t *testing.T) {
MockMeasureWithContext: func(ctx context.Context, target model.ExperimentTarget) (*model.Measurement, error) {
return &model.Measurement{}, nil
},
MockSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) error {
return nil
MockSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) (string, error) {
return "", nil
},
},

@@ -667,8 +667,8 @@ func TestTaskRunnerRun(t *testing.T) {
},
}
}
fake.Experiment.MockSubmitAndUpdateMeasurementContext = func(ctx context.Context, measurement *model.Measurement) error {
return errors.New("cannot submit")
fake.Experiment.MockSubmitAndUpdateMeasurementContext = func(ctx context.Context, measurement *model.Measurement) (string, error) {
return "", errors.New("cannot submit")
}
runner.newSession = fake.NewSession
events := runAndCollect(runner, emitter)

0 comments on commit a0e3456

Please sign in to comment.