diff --git a/internal/dslx/dns.go b/internal/dslx/dns.go index 88d3bd65ec..b4e3e18bb6 100644 --- a/internal/dslx/dns.go +++ b/internal/dslx/dns.go @@ -71,8 +71,8 @@ type ResolvedAddresses struct { // DNSLookupGetaddrinfo returns a function that resolves a domain name to // IP addresses using libc's getaddrinfo function. -func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *Maybe[*ResolvedAddresses]] { - return FuncAdapter[*DomainToResolve, *Maybe[*ResolvedAddresses]](func(ctx context.Context, input *DomainToResolve) *Maybe[*ResolvedAddresses] { +func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses] { + return Operation[*DomainToResolve, *ResolvedAddresses](func(ctx context.Context, input *DomainToResolve) *Maybe[*ResolvedAddresses] { // create trace trace := rt.NewTrace(rt.IDGenerator().Add(1), rt.ZeroTime(), input.Tags...) @@ -115,8 +115,8 @@ func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *Maybe[*ResolvedAdd // DNSLookupUDP returns a function that resolves a domain name to // IP addresses using the given DNS-over-UDP resolver. -func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *Maybe[*ResolvedAddresses]] { - return FuncAdapter[*DomainToResolve, *Maybe[*ResolvedAddresses]](func(ctx context.Context, input *DomainToResolve) *Maybe[*ResolvedAddresses] { +func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *ResolvedAddresses] { + return Operation[*DomainToResolve, *ResolvedAddresses](func(ctx context.Context, input *DomainToResolve) *Maybe[*ResolvedAddresses] { // create trace trace := rt.NewTrace(rt.IDGenerator().Add(1), rt.ZeroTime(), input.Tags...) diff --git a/internal/dslx/dns_test.go b/internal/dslx/dns_test.go index 2ef3b82d3f..753605408b 100644 --- a/internal/dslx/dns_test.go +++ b/internal/dslx/dns_test.go @@ -66,7 +66,7 @@ func TestGetaddrinfo(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) cancel() // immediately cancel the lookup - res := f.Apply(ctx, domain) + res := f.Apply(ctx, NewMaybeWithValue(domain)) if res.Observations == nil || len(res.Observations) <= 0 { t.Fatal("unexpected empty observations") } @@ -88,7 +88,7 @@ func TestGetaddrinfo(t *testing.T) { }, })), ) - res := f.Apply(context.Background(), domain) + res := f.Apply(context.Background(), NewMaybeWithValue(domain)) if res.Observations == nil || len(res.Observations) <= 0 { t.Fatal("unexpected empty observations") } @@ -115,7 +115,7 @@ func TestGetaddrinfo(t *testing.T) { }, })), ) - res := f.Apply(context.Background(), domain) + res := f.Apply(context.Background(), NewMaybeWithValue(domain)) if res.Observations == nil || len(res.Observations) <= 0 { t.Fatal("unexpected empty observations") } @@ -154,7 +154,7 @@ func TestLookupUDP(t *testing.T) { f := DNSLookupUDP(NewMinimalRuntime(model.DiscardLogger, time.Now()), "1.1.1.1:53") ctx, cancel := context.WithCancel(context.Background()) cancel() - res := f.Apply(ctx, domain) + res := f.Apply(ctx, NewMaybeWithValue(domain)) if res.Observations == nil || len(res.Observations) <= 0 { t.Fatal("unexpected empty observations") } @@ -184,7 +184,7 @@ func TestLookupUDP(t *testing.T) { })), "1.1.1.1:53", ) - res := f.Apply(context.Background(), domain) + res := f.Apply(context.Background(), NewMaybeWithValue(domain)) if res.Observations == nil || len(res.Observations) <= 0 { t.Fatal("unexpected empty observations") } @@ -219,7 +219,7 @@ func TestLookupUDP(t *testing.T) { })), "1.1.1.1:53", ) - res := f.Apply(context.Background(), domain) + res := f.Apply(context.Background(), NewMaybeWithValue(domain)) if res.Observations == nil || len(res.Observations) <= 0 { t.Fatal("unexpected empty observations") } diff --git a/internal/dslx/fxasync.go b/internal/dslx/fxasync.go index 7ecdc3fd49..cfb428ec7e 100644 --- a/internal/dslx/fxasync.go +++ b/internal/dslx/fxasync.go @@ -33,7 +33,7 @@ type Parallelism int func Map[A, B any]( ctx context.Context, parallelism Parallelism, - fx Func[A, *Maybe[B]], + fx Func[A, B], inputs <-chan A, ) <-chan *Maybe[B] { // create channel for returning results @@ -49,7 +49,7 @@ func Map[A, B any]( go func() { defer wg.Done() for a := range inputs { - r <- fx.Apply(ctx, a) + r <- fx.Apply(ctx, NewMaybeWithValue(a)) } }() } @@ -81,7 +81,7 @@ func Parallel[A, B any]( ctx context.Context, parallelism Parallelism, input A, - fn ...Func[A, *Maybe[B]], + fn ...Func[A, B], ) []*Maybe[B] { c := ParallelAsync(ctx, parallelism, input, StreamList(fn...)) return Collect(c) @@ -94,7 +94,7 @@ func ParallelAsync[A, B any]( ctx context.Context, parallelism Parallelism, input A, - funcs <-chan Func[A, *Maybe[B]], + funcs <-chan Func[A, B], ) <-chan *Maybe[B] { // create channel for returning results r := make(chan *Maybe[B]) @@ -109,7 +109,7 @@ func ParallelAsync[A, B any]( go func() { defer wg.Done() for fx := range funcs { - r <- fx.Apply(ctx, input) + r <- fx.Apply(ctx, NewMaybeWithValue(input)) } }() } @@ -126,7 +126,7 @@ func ParallelAsync[A, B any]( // ApplyAsync is equivalent to calling Apply but returns a channel. func ApplyAsync[A, B any]( ctx context.Context, - fx Func[A, *Maybe[B]], + fx Func[A, B], input A, ) <-chan *Maybe[B] { return Map(ctx, Parallelism(1), fx, StreamList(input)) diff --git a/internal/dslx/fxasync_test.go b/internal/dslx/fxasync_test.go index cf1af053e4..0e98242053 100644 --- a/internal/dslx/fxasync_test.go +++ b/internal/dslx/fxasync_test.go @@ -4,9 +4,11 @@ import ( "context" "sync" "testing" + + "github.com/ooni/probe-cli/v3/internal/runtimex" ) -func getFnWait(wg *sync.WaitGroup) Func[int, *Maybe[int]] { +func getFnWait(wg *sync.WaitGroup) Func[int, int] { return &fnWait{wg} } @@ -14,10 +16,11 @@ type fnWait struct { wg *sync.WaitGroup // set to n corresponding to the number of used goroutines } -func (f *fnWait) Apply(ctx context.Context, i int) *Maybe[int] { +func (f *fnWait) Apply(ctx context.Context, i *Maybe[int]) *Maybe[int] { + runtimex.Assert(i.Error == nil, "did not expect to see an error here") f.wg.Done() f.wg.Wait() // continue when n goroutines have reached this point - return &Maybe[int]{State: i + 1} + return &Maybe[int]{State: i.State + 1} } /* @@ -86,7 +89,7 @@ func TestParallel(t *testing.T) { t.Run(name, func(t *testing.T) { wg := sync.WaitGroup{} wg.Add(tt.funcs) - funcs := []Func[int, *Maybe[int]]{} + funcs := []Func[int, int]{} for i := 0; i < tt.funcs; i++ { funcs = append(funcs, getFnWait(&wg)) } diff --git a/internal/dslx/fxcore.go b/internal/dslx/fxcore.go index 329a8fc23c..6977833963 100644 --- a/internal/dslx/fxcore.go +++ b/internal/dslx/fxcore.go @@ -14,15 +14,23 @@ import ( // Func is a function f: (context.Context, A) -> B. type Func[A, B any] interface { - Apply(ctx context.Context, a A) B + Apply(ctx context.Context, a *Maybe[A]) *Maybe[B] } -// FuncAdapter adapts a func to be a Func. -type FuncAdapter[A, B any] func(ctx context.Context, a A) B +// Operation adapts a golang function to behave like a Func. +type Operation[A, B any] func(ctx context.Context, a A) *Maybe[B] // Apply implements Func. -func (fa FuncAdapter[A, B]) Apply(ctx context.Context, a A) B { - return fa(ctx, a) +func (op Operation[A, B]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[B] { + if a.Error != nil { + return &Maybe[B]{ + Error: a.Error, + Observations: a.Observations, + Operation: a.Operation, + State: *new(B), // zero value + } + } + return op(ctx, a.State) } // Maybe is the result of an operation implemented by this package @@ -42,8 +50,18 @@ type Maybe[State any] struct { State State } +// NewMaybeWithValue constructs a Maybe containing the given value. +func NewMaybeWithValue[State any](value State) *Maybe[State] { + return &Maybe[State]{ + Error: nil, + Observations: []*Observations{}, + Operation: "", + State: value, + } +} + // Compose2 composes two operations such as [TCPConnect] and [TLSHandshake]. -func Compose2[A, B, C any](f Func[A, *Maybe[B]], g Func[B, *Maybe[C]]) Func[A, *Maybe[C]] { +func Compose2[A, B, C any](f Func[A, B], g Func[B, C]) Func[A, C] { return &compose2Func[A, B, C]{ f: f, g: g, @@ -52,14 +70,15 @@ func Compose2[A, B, C any](f Func[A, *Maybe[B]], g Func[B, *Maybe[C]]) Func[A, * // compose2Func is the type returned by [Compose2]. type compose2Func[A, B, C any] struct { - f Func[A, *Maybe[B]] - g Func[B, *Maybe[C]] + f Func[A, B] + g Func[B, C] } // Apply implements Func -func (h *compose2Func[A, B, C]) Apply(ctx context.Context, a A) *Maybe[C] { +func (h *compose2Func[A, B, C]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[C] { mb := h.f.Apply(ctx, a) runtimex.Assert(mb != nil, "h.f.Apply returned a nil pointer") + if mb.Error != nil { return &Maybe[C]{ Error: mb.Error, @@ -68,8 +87,10 @@ func (h *compose2Func[A, B, C]) Apply(ctx context.Context, a A) *Maybe[C] { State: *new(C), // zero value } } - mc := h.g.Apply(ctx, mb.State) + + mc := h.g.Apply(ctx, mb) runtimex.Assert(mc != nil, "h.g.Apply returned a nil pointer") + op := mc.Operation if op == "" { // propagate the previous operation name, if this operation has none op = mb.Operation @@ -99,24 +120,16 @@ func (c *Counter[T]) Value() int64 { } // Func returns a Func[T, *Maybe[T]] that updates the counter. -func (c *Counter[T]) Func() Func[T, *Maybe[T]] { - return &counterFunc[T]{c} -} - -// counterFunc is the Func returned by CounterFunc.Func. -type counterFunc[T any] struct { - c *Counter[T] -} - -// Apply implements Func. -func (c *counterFunc[T]) Apply(ctx context.Context, value T) *Maybe[T] { - c.c.n.Add(1) - return &Maybe[T]{ - Error: nil, - Observations: nil, - Operation: "", // we cannot fail, so no need to store operation name - State: value, - } +func (c *Counter[T]) Func() Func[T, T] { + return Operation[T, T](func(ctx context.Context, value T) *Maybe[T] { + c.n.Add(1) + return &Maybe[T]{ + Error: nil, + Observations: nil, + Operation: "", // we cannot fail, so no need to store operation name + State: value, + } + }) } // FirstErrorExcludingBrokenIPv6Errors returns the first error and failed operation in a list of diff --git a/internal/dslx/fxcore_test.go b/internal/dslx/fxcore_test.go index a9abf7b3a8..885394567d 100644 --- a/internal/dslx/fxcore_test.go +++ b/internal/dslx/fxcore_test.go @@ -4,12 +4,15 @@ import ( "context" "errors" "testing" + "time" + "github.com/ooni/probe-cli/v3/internal/mocks" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/runtimex" ) -func getFn(err error, name string) Func[int, *Maybe[int]] { +func getFn(err error, name string) Func[int, int] { return &fn{err: err, name: name} } @@ -18,10 +21,11 @@ type fn struct { name string } -func (f *fn) Apply(ctx context.Context, i int) *Maybe[int] { +func (f *fn) Apply(ctx context.Context, i *Maybe[int]) *Maybe[int] { + runtimex.Assert(i.Error == nil, "did not expect to see an error here") return &Maybe[int]{ Error: f.err, - State: i + 1, + State: i.State + 1, Observations: []*Observations{ { NetworkEvents: []*model.ArchivalNetworkEvent{{Tags: []string{"apply"}}}, @@ -31,6 +35,37 @@ func (f *fn) Apply(ctx context.Context, i int) *Maybe[int] { } } +func TestStageAdapter(t *testing.T) { + t.Run("make sure that we handle a previous stage failure", func(t *testing.T) { + unet := &mocks.UnderlyingNetwork{ + // explicitly empty so we crash if we try using underlying network functionality + } + netx := &netxlite.Netx{Underlying: unet} + + // create runtime + rt := NewMinimalRuntime(model.DiscardLogger, time.Now(), MinimalRuntimeOptionMeasuringNetwork(netx)) + + // create measurement pipeline where we run DNS lookups + pipeline := DNSLookupGetaddrinfo(rt) + + // create input that contains an error + input := &Maybe[*DomainToResolve]{ + Error: errors.New("mocked error"), + Observations: []*Observations{}, + Operation: "", + State: nil, + } + + // run the pipeline + output := pipeline.Apply(context.Background(), input) + + // make sure the output contains the same error as the input + if !errors.Is(output.Error, input.Error) { + t.Fatal("unexpected error") + } + }) +} + /* Test cases: - Compose 2 functions: @@ -53,7 +88,7 @@ func TestCompose2(t *testing.T) { f1 := getFn(tt.err, "maybe fail") f2 := getFn(nil, "succeed") composit := Compose2(f1, f2) - r := composit.Apply(context.Background(), tt.input) + r := composit.Apply(context.Background(), NewMaybeWithValue(tt.input)) if r.Error != tt.err { t.Fatalf("unexpected error") } @@ -73,7 +108,7 @@ func TestGen(t *testing.T) { incFunc := getFn(nil, "succeed") composit := Compose14(incFunc, incFunc, incFunc, incFunc, incFunc, incFunc, incFunc, incFunc, incFunc, incFunc, incFunc, incFunc, incFunc, incFunc) - r := composit.Apply(context.Background(), 0) + r := composit.Apply(context.Background(), NewMaybeWithValue(0)) if r.Error != nil { t.Fatalf("unexpected error: %s", r.Error) } @@ -91,8 +126,8 @@ func TestObservations(t *testing.T) { fn1 := getFn(nil, "succeed") fn2 := getFn(nil, "succeed") composit := Compose2(fn1, fn2) - r1 := composit.Apply(context.Background(), 3) - r2 := composit.Apply(context.Background(), 42) + r1 := composit.Apply(context.Background(), NewMaybeWithValue(3)) + r2 := composit.Apply(context.Background(), NewMaybeWithValue(42)) if len(r1.Observations) != 2 || len(r2.Observations) != 2 { t.Fatalf("unexpected number of observations") } @@ -123,7 +158,7 @@ func TestCounter(t *testing.T) { fn := getFn(tt.err, "maybe fail") cnt := NewCounter[int]() composit := Compose2(fn, cnt.Func()) - r := composit.Apply(context.Background(), 42) + r := composit.Apply(context.Background(), NewMaybeWithValue(42)) cntVal := cnt.Value() if cntVal != tt.expect { t.Fatalf("unexpected counter value") diff --git a/internal/dslx/fxgen.go b/internal/dslx/fxgen.go index 7d56cbf7b5..08ec72a656 100644 --- a/internal/dslx/fxgen.go +++ b/internal/dslx/fxgen.go @@ -11,10 +11,10 @@ func Compose3[ T2 any, T3 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], -) Func[T0, *Maybe[T3]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], +) Func[T0, T3] { return Compose2(f0, Compose2(f1, f2)) } @@ -26,11 +26,11 @@ func Compose4[ T3 any, T4 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], -) Func[T0, *Maybe[T4]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], +) Func[T0, T4] { return Compose2(f0, Compose3(f1, f2, f3)) } @@ -43,12 +43,12 @@ func Compose5[ T4 any, T5 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], -) Func[T0, *Maybe[T5]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], +) Func[T0, T5] { return Compose2(f0, Compose4(f1, f2, f3, f4)) } @@ -62,13 +62,13 @@ func Compose6[ T5 any, T6 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], -) Func[T0, *Maybe[T6]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], +) Func[T0, T6] { return Compose2(f0, Compose5(f1, f2, f3, f4, f5)) } @@ -83,14 +83,14 @@ func Compose7[ T6 any, T7 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], - f6 Func[T6, *Maybe[T7]], -) Func[T0, *Maybe[T7]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], + f6 Func[T6, T7], +) Func[T0, T7] { return Compose2(f0, Compose6(f1, f2, f3, f4, f5, f6)) } @@ -106,15 +106,15 @@ func Compose8[ T7 any, T8 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], - f6 Func[T6, *Maybe[T7]], - f7 Func[T7, *Maybe[T8]], -) Func[T0, *Maybe[T8]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], + f6 Func[T6, T7], + f7 Func[T7, T8], +) Func[T0, T8] { return Compose2(f0, Compose7(f1, f2, f3, f4, f5, f6, f7)) } @@ -131,16 +131,16 @@ func Compose9[ T8 any, T9 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], - f6 Func[T6, *Maybe[T7]], - f7 Func[T7, *Maybe[T8]], - f8 Func[T8, *Maybe[T9]], -) Func[T0, *Maybe[T9]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], + f6 Func[T6, T7], + f7 Func[T7, T8], + f8 Func[T8, T9], +) Func[T0, T9] { return Compose2(f0, Compose8(f1, f2, f3, f4, f5, f6, f7, f8)) } @@ -158,17 +158,17 @@ func Compose10[ T9 any, T10 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], - f6 Func[T6, *Maybe[T7]], - f7 Func[T7, *Maybe[T8]], - f8 Func[T8, *Maybe[T9]], - f9 Func[T9, *Maybe[T10]], -) Func[T0, *Maybe[T10]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], + f6 Func[T6, T7], + f7 Func[T7, T8], + f8 Func[T8, T9], + f9 Func[T9, T10], +) Func[T0, T10] { return Compose2(f0, Compose9(f1, f2, f3, f4, f5, f6, f7, f8, f9)) } @@ -187,18 +187,18 @@ func Compose11[ T10 any, T11 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], - f6 Func[T6, *Maybe[T7]], - f7 Func[T7, *Maybe[T8]], - f8 Func[T8, *Maybe[T9]], - f9 Func[T9, *Maybe[T10]], - f10 Func[T10, *Maybe[T11]], -) Func[T0, *Maybe[T11]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], + f6 Func[T6, T7], + f7 Func[T7, T8], + f8 Func[T8, T9], + f9 Func[T9, T10], + f10 Func[T10, T11], +) Func[T0, T11] { return Compose2(f0, Compose10(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10)) } @@ -218,19 +218,19 @@ func Compose12[ T11 any, T12 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], - f6 Func[T6, *Maybe[T7]], - f7 Func[T7, *Maybe[T8]], - f8 Func[T8, *Maybe[T9]], - f9 Func[T9, *Maybe[T10]], - f10 Func[T10, *Maybe[T11]], - f11 Func[T11, *Maybe[T12]], -) Func[T0, *Maybe[T12]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], + f6 Func[T6, T7], + f7 Func[T7, T8], + f8 Func[T8, T9], + f9 Func[T9, T10], + f10 Func[T10, T11], + f11 Func[T11, T12], +) Func[T0, T12] { return Compose2(f0, Compose11(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11)) } @@ -251,20 +251,20 @@ func Compose13[ T12 any, T13 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], - f6 Func[T6, *Maybe[T7]], - f7 Func[T7, *Maybe[T8]], - f8 Func[T8, *Maybe[T9]], - f9 Func[T9, *Maybe[T10]], - f10 Func[T10, *Maybe[T11]], - f11 Func[T11, *Maybe[T12]], - f12 Func[T12, *Maybe[T13]], -) Func[T0, *Maybe[T13]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], + f6 Func[T6, T7], + f7 Func[T7, T8], + f8 Func[T8, T9], + f9 Func[T9, T10], + f10 Func[T10, T11], + f11 Func[T11, T12], + f12 Func[T12, T13], +) Func[T0, T13] { return Compose2(f0, Compose12(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12)) } @@ -286,20 +286,20 @@ func Compose14[ T13 any, T14 any, ]( - f0 Func[T0, *Maybe[T1]], - f1 Func[T1, *Maybe[T2]], - f2 Func[T2, *Maybe[T3]], - f3 Func[T3, *Maybe[T4]], - f4 Func[T4, *Maybe[T5]], - f5 Func[T5, *Maybe[T6]], - f6 Func[T6, *Maybe[T7]], - f7 Func[T7, *Maybe[T8]], - f8 Func[T8, *Maybe[T9]], - f9 Func[T9, *Maybe[T10]], - f10 Func[T10, *Maybe[T11]], - f11 Func[T11, *Maybe[T12]], - f12 Func[T12, *Maybe[T13]], - f13 Func[T13, *Maybe[T14]], -) Func[T0, *Maybe[T14]] { + f0 Func[T0, T1], + f1 Func[T1, T2], + f2 Func[T2, T3], + f3 Func[T3, T4], + f4 Func[T4, T5], + f5 Func[T5, T6], + f6 Func[T6, T7], + f7 Func[T7, T8], + f8 Func[T8, T9], + f9 Func[T9, T10], + f10 Func[T10, T11], + f11 Func[T11, T12], + f12 Func[T12, T13], + f13 Func[T13, T14], +) Func[T0, T14] { return Compose2(f0, Compose13(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13)) } diff --git a/internal/dslx/http_test.go b/internal/dslx/http_test.go index 0f49c2c594..ade2904555 100644 --- a/internal/dslx/http_test.go +++ b/internal/dslx/http_test.go @@ -242,7 +242,7 @@ func TestHTTPRequest(t *testing.T) { httpRequest := HTTPRequest( NewMinimalRuntime(model.DiscardLogger, time.Now()), ) - res := httpRequest.Apply(context.Background(), &httpTransport) + res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport)) if res.Error != io.EOF { t.Fatal("not the error we expected") } @@ -262,7 +262,7 @@ func TestHTTPRequest(t *testing.T) { } rt := NewMinimalRuntime(model.DiscardLogger, time.Now()) httpRequest := HTTPRequest(rt) - res := httpRequest.Apply(context.Background(), &httpTransport) + res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport)) if res.Error == nil || !strings.HasPrefix(res.Error.Error(), `parse "https://%09/": invalid URL escape "%09"`) { t.Fatal("not the error we expected", res.Error) } @@ -282,7 +282,7 @@ func TestHTTPRequest(t *testing.T) { httpRequest := HTTPRequest( NewMinimalRuntime(model.DiscardLogger, time.Now()), ) - res := httpRequest.Apply(context.Background(), &httpTransport) + res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport)) if res.Error != nil { t.Fatal("expected error") } @@ -334,7 +334,7 @@ func TestHTTPRequest(t *testing.T) { httpRequest := HTTPRequest( NewMinimalRuntime(model.DiscardLogger, time.Now()), ) - res := httpRequest.Apply(context.Background(), &httpTransport) + res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport)) if res.Error != nil { t.Fatal("unexpected error") } @@ -355,7 +355,7 @@ func TestHTTPRequest(t *testing.T) { httpRequest := HTTPRequest( NewMinimalRuntime(model.DiscardLogger, time.Now()), ) - res := httpRequest.Apply(context.Background(), &httpTransport) + res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport)) if res.Error != nil { t.Fatal("unexpected error") } @@ -383,7 +383,7 @@ func TestHTTPRequest(t *testing.T) { HTTPRequestOptionURLPath("/path/to/example"), HTTPRequestOptionUserAgent("Mozilla/5.0 Gecko/geckotrail Firefox/firefoxversion"), ) - res := httpRequest.Apply(context.Background(), &httpTransport) + res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport)) if res.Error != nil { t.Fatal("unexpected error") } @@ -433,7 +433,7 @@ func TestHTTPTCP(t *testing.T) { f := HTTPConnectionTCP( NewMinimalRuntime(model.DiscardLogger, time.Now()), ) - res := f.Apply(context.Background(), tcpConn) + res := f.Apply(context.Background(), NewMaybeWithValue(tcpConn)) if res.Error != nil { t.Fatalf("unexpected error: %s", res.Error) } @@ -478,7 +478,7 @@ func TestHTTPQUIC(t *testing.T) { f := HTTPConnectionQUIC( NewMinimalRuntime(model.DiscardLogger, time.Now()), ) - res := f.Apply(context.Background(), quicConn) + res := f.Apply(context.Background(), NewMaybeWithValue(quicConn)) if res.Error != nil { t.Fatalf("unexpected error: %s", res.Error) } @@ -523,7 +523,7 @@ func TestHTTPTLS(t *testing.T) { f := HTTPConnectionTLS( NewMinimalRuntime(model.DiscardLogger, time.Now()), ) - res := f.Apply(context.Background(), tlsConn) + res := f.Apply(context.Background(), NewMaybeWithValue(tlsConn)) if res.Error != nil { t.Fatalf("unexpected error: %s", res.Error) } diff --git a/internal/dslx/httpcore.go b/internal/dslx/httpcore.go index 3812383167..09396cb7f5 100644 --- a/internal/dslx/httpcore.go +++ b/internal/dslx/httpcore.go @@ -101,8 +101,8 @@ func HTTPRequestOptionUserAgent(value string) HTTPRequestOption { } // HTTPRequest issues an HTTP request using a transport and returns a response. -func HTTPRequest(rt Runtime, options ...HTTPRequestOption) Func[*HTTPConnection, *Maybe[*HTTPResponse]] { - return FuncAdapter[*HTTPConnection, *Maybe[*HTTPResponse]](func(ctx context.Context, input *HTTPConnection) *Maybe[*HTTPResponse] { +func HTTPRequest(rt Runtime, options ...HTTPRequestOption) Func[*HTTPConnection, *HTTPResponse] { + return Operation[*HTTPConnection, *HTTPResponse](func(ctx context.Context, input *HTTPConnection) *Maybe[*HTTPResponse] { // setup const timeout = 10 * time.Second ctx, cancel := context.WithTimeout(ctx, timeout) diff --git a/internal/dslx/httpquic.go b/internal/dslx/httpquic.go index b6905b0975..2aa15f22a0 100644 --- a/internal/dslx/httpquic.go +++ b/internal/dslx/httpquic.go @@ -11,13 +11,13 @@ import ( ) // HTTPRequestOverQUIC returns a Func that issues HTTP requests over QUIC. -func HTTPRequestOverQUIC(rt Runtime, options ...HTTPRequestOption) Func[*QUICConnection, *Maybe[*HTTPResponse]] { +func HTTPRequestOverQUIC(rt Runtime, options ...HTTPRequestOption) Func[*QUICConnection, *HTTPResponse] { return Compose2(HTTPConnectionQUIC(rt), HTTPRequest(rt, options...)) } // HTTPConnectionQUIC converts a QUIC connection into an HTTP connection. -func HTTPConnectionQUIC(rt Runtime) Func[*QUICConnection, *Maybe[*HTTPConnection]] { - return FuncAdapter[*QUICConnection, *Maybe[*HTTPConnection]](func(ctx context.Context, input *QUICConnection) *Maybe[*HTTPConnection] { +func HTTPConnectionQUIC(rt Runtime) Func[*QUICConnection, *HTTPConnection] { + return Operation[*QUICConnection, *HTTPConnection](func(ctx context.Context, input *QUICConnection) *Maybe[*HTTPConnection] { // create transport httpTransport := netxlite.NewHTTP3Transport( rt.Logger(), diff --git a/internal/dslx/httptcp.go b/internal/dslx/httptcp.go index c747f7696b..0c1b9645b8 100644 --- a/internal/dslx/httptcp.go +++ b/internal/dslx/httptcp.go @@ -11,13 +11,13 @@ import ( ) // HTTPRequestOverTCP returns a Func that issues HTTP requests over TCP. -func HTTPRequestOverTCP(rt Runtime, options ...HTTPRequestOption) Func[*TCPConnection, *Maybe[*HTTPResponse]] { +func HTTPRequestOverTCP(rt Runtime, options ...HTTPRequestOption) Func[*TCPConnection, *HTTPResponse] { return Compose2(HTTPConnectionTCP(rt), HTTPRequest(rt, options...)) } // HTTPConnectionTCP converts a TCP connection into an HTTP connection. -func HTTPConnectionTCP(rt Runtime) Func[*TCPConnection, *Maybe[*HTTPConnection]] { - return FuncAdapter[*TCPConnection, *Maybe[*HTTPConnection]](func(ctx context.Context, input *TCPConnection) *Maybe[*HTTPConnection] { +func HTTPConnectionTCP(rt Runtime) Func[*TCPConnection, *HTTPConnection] { + return Operation[*TCPConnection, *HTTPConnection](func(ctx context.Context, input *TCPConnection) *Maybe[*HTTPConnection] { // TODO(https://github.com/ooni/probe/issues/2534): here we're using the QUIRKY netxlite.NewHTTPTransport // function, but we can probably avoid using it, given that this code is // not using tracing and does not care about those quirks. diff --git a/internal/dslx/httptls.go b/internal/dslx/httptls.go index 539034b00d..9e3de76be0 100644 --- a/internal/dslx/httptls.go +++ b/internal/dslx/httptls.go @@ -11,13 +11,13 @@ import ( ) // HTTPRequestOverTLS returns a Func that issues HTTP requests over TLS. -func HTTPRequestOverTLS(rt Runtime, options ...HTTPRequestOption) Func[*TLSConnection, *Maybe[*HTTPResponse]] { +func HTTPRequestOverTLS(rt Runtime, options ...HTTPRequestOption) Func[*TLSConnection, *HTTPResponse] { return Compose2(HTTPConnectionTLS(rt), HTTPRequest(rt, options...)) } // HTTPConnectionTLS converts a TLS connection into an HTTP connection. -func HTTPConnectionTLS(rt Runtime) Func[*TLSConnection, *Maybe[*HTTPConnection]] { - return FuncAdapter[*TLSConnection, *Maybe[*HTTPConnection]](func(ctx context.Context, input *TLSConnection) *Maybe[*HTTPConnection] { +func HTTPConnectionTLS(rt Runtime) Func[*TLSConnection, *HTTPConnection] { + return Operation[*TLSConnection, *HTTPConnection](func(ctx context.Context, input *TLSConnection) *Maybe[*HTTPConnection] { // TODO(https://github.com/ooni/probe/issues/2534): here we're using the QUIRKY netxlite.NewHTTPTransport // function, but we can probably avoid using it, given that this code is // not using tracing and does not care about those quirks. diff --git a/internal/dslx/integration_test.go b/internal/dslx/integration_test.go index 97070d9663..e8fdbf37bb 100644 --- a/internal/dslx/integration_test.go +++ b/internal/dslx/integration_test.go @@ -50,7 +50,7 @@ func TestMakeSureWeCollectSpeedSamples(t *testing.T) { } // measure the endpoint - result := f0.Apply(context.Background(), epnt) + result := f0.Apply(context.Background(), NewMaybeWithValue(epnt)) // get observations observations := ExtractObservations(result) diff --git a/internal/dslx/quic.go b/internal/dslx/quic.go index 3442b6822d..2f6539cba5 100644 --- a/internal/dslx/quic.go +++ b/internal/dslx/quic.go @@ -16,8 +16,8 @@ import ( ) // QUICHandshake returns a function performing QUIC handshakes. -func QUICHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*Endpoint, *Maybe[*QUICConnection]] { - return FuncAdapter[*Endpoint, *Maybe[*QUICConnection]](func(ctx context.Context, input *Endpoint) *Maybe[*QUICConnection] { +func QUICHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*Endpoint, *QUICConnection] { + return Operation[*Endpoint, *QUICConnection](func(ctx context.Context, input *Endpoint) *Maybe[*QUICConnection] { // create trace trace := rt.NewTrace(rt.IDGenerator().Add(1), rt.ZeroTime(), input.Tags...) diff --git a/internal/dslx/quic_test.go b/internal/dslx/quic_test.go index 9512783a71..17328fa4f4 100644 --- a/internal/dslx/quic_test.go +++ b/internal/dslx/quic_test.go @@ -92,7 +92,7 @@ func TestQUICHandshake(t *testing.T) { Network: "udp", Tags: tt.tags, } - res := quicHandshake.Apply(context.Background(), endpoint) + res := quicHandshake.Apply(context.Background(), NewMaybeWithValue(endpoint)) if res.Error != tt.expectErr { t.Fatalf("unexpected error: %s", res.Error) } diff --git a/internal/dslx/tcp.go b/internal/dslx/tcp.go index d576b84fce..4b9d684f9e 100644 --- a/internal/dslx/tcp.go +++ b/internal/dslx/tcp.go @@ -14,8 +14,8 @@ import ( ) // TCPConnect returns a function that establishes TCP connections. -func TCPConnect(rt Runtime) Func[*Endpoint, *Maybe[*TCPConnection]] { - return FuncAdapter[*Endpoint, *Maybe[*TCPConnection]](func(ctx context.Context, input *Endpoint) *Maybe[*TCPConnection] { +func TCPConnect(rt Runtime) Func[*Endpoint, *TCPConnection] { + return Operation[*Endpoint, *TCPConnection](func(ctx context.Context, input *Endpoint) *Maybe[*TCPConnection] { // create trace trace := rt.NewTrace(rt.IDGenerator().Add(1), rt.ZeroTime(), input.Tags...) diff --git a/internal/dslx/tcp_test.go b/internal/dslx/tcp_test.go index f5f1e28532..900aa94a4e 100644 --- a/internal/dslx/tcp_test.go +++ b/internal/dslx/tcp_test.go @@ -69,7 +69,7 @@ func TestTCPConnect(t *testing.T) { Network: "tcp", Tags: tt.tags, } - res := tcpConnect.Apply(context.Background(), endpoint) + res := tcpConnect.Apply(context.Background(), NewMaybeWithValue(endpoint)) if res.Error != tt.expectErr { t.Fatalf("unexpected error: %s", res.Error) } diff --git a/internal/dslx/tls.go b/internal/dslx/tls.go index dd1f4f0838..b1c0ed7f53 100644 --- a/internal/dslx/tls.go +++ b/internal/dslx/tls.go @@ -48,8 +48,8 @@ func TLSHandshakeOptionServerName(value string) TLSHandshakeOption { } // TLSHandshake returns a function performing TSL handshakes. -func TLSHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*TCPConnection, *Maybe[*TLSConnection]] { - return FuncAdapter[*TCPConnection, *Maybe[*TLSConnection]](func(ctx context.Context, input *TCPConnection) *Maybe[*TLSConnection] { +func TLSHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*TCPConnection, *TLSConnection] { + return Operation[*TCPConnection, *TLSConnection](func(ctx context.Context, input *TCPConnection) *Maybe[*TLSConnection] { // keep using the same trace trace := input.Trace diff --git a/internal/dslx/tls_test.go b/internal/dslx/tls_test.go index 36df4a79ca..6d8f8e266b 100644 --- a/internal/dslx/tls_test.go +++ b/internal/dslx/tls_test.go @@ -169,7 +169,7 @@ func TestTLSHandshake(t *testing.T) { Network: "tcp", Trace: trace, } - res := tlsHandshake.Apply(context.Background(), &tcpConn) + res := tlsHandshake.Apply(context.Background(), NewMaybeWithValue(&tcpConn)) if res.Error != tt.expectErr { t.Fatalf("unexpected error: %s", res.Error) } diff --git a/internal/tutorial/dslx/chapter02/README.md b/internal/tutorial/dslx/chapter02/README.md index 359344e4c4..6cd72d5903 100644 --- a/internal/tutorial/dslx/chapter02/README.md +++ b/internal/tutorial/dslx/chapter02/README.md @@ -262,7 +262,7 @@ system resolver, or a custom UDP resolver. Then we apply the `dnsInput` argument to `lookupFn` to get a `dnsResult`. ```Go - dnsResult := lookupFn.Apply(ctx, dnsInput) + dnsResult := lookupFn.Apply(ctx, dslx.NewMaybeWithValue(dnsInput)) ``` @@ -362,8 +362,8 @@ data structure called `Maybe`, which contains either the endpoint measurement re (on success) or an error (in case of failure). ```Go - var targetResult *dslx.Maybe[*dslx.TLSConnection] = pipelineTarget.Apply(ctx, endpoint) - var controlResult *dslx.Maybe[*dslx.TLSConnection] = pipelineControl.Apply(ctx, endpoint) + var targetResult *dslx.Maybe[*dslx.TLSConnection] = pipelineTarget.Apply(ctx, dslx.NewMaybeWithValue(endpoint)) + var controlResult *dslx.Maybe[*dslx.TLSConnection] = pipelineControl.Apply(ctx, dslx.NewMaybeWithValue(endpoint)) ``` diff --git a/internal/tutorial/dslx/chapter02/main.go b/internal/tutorial/dslx/chapter02/main.go index ef0e3c62c4..9c033b30a9 100644 --- a/internal/tutorial/dslx/chapter02/main.go +++ b/internal/tutorial/dslx/chapter02/main.go @@ -263,7 +263,7 @@ func (m *Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { // Then we apply the `dnsInput` argument to `lookupFn` to get a `dnsResult`. // // ```Go - dnsResult := lookupFn.Apply(ctx, dnsInput) + dnsResult := lookupFn.Apply(ctx, dslx.NewMaybeWithValue(dnsInput)) // ``` // @@ -363,8 +363,8 @@ func (m *Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { // (on success) or an error (in case of failure). // // ```Go - var targetResult *dslx.Maybe[*dslx.TLSConnection] = pipelineTarget.Apply(ctx, endpoint) - var controlResult *dslx.Maybe[*dslx.TLSConnection] = pipelineControl.Apply(ctx, endpoint) + var targetResult *dslx.Maybe[*dslx.TLSConnection] = pipelineTarget.Apply(ctx, dslx.NewMaybeWithValue(endpoint)) + var controlResult *dslx.Maybe[*dslx.TLSConnection] = pipelineControl.Apply(ctx, dslx.NewMaybeWithValue(endpoint)) // ``` //