Skip to content

Commit

Permalink
feat(dslx): introduce MeasureResolvedAddresses (#1388)
Browse files Browse the repository at this point in the history
This diff introduces MeasureResolvedAddresses and other ancillary
functions that allow us to express measuring endpoints in a very simple
way and with less code than with previous dslx code.

As a result, we can deprecate a bunch of dslx functions (we will
actually clean them up at a later time).

While there, make StreamList simpler and more efficient: we can stream
w/o creating a goroutine. It suffices to create a sufficiently buffered
channel and fill it, and we can do all the work in the current
goroutine.

While there, fix a bug where QUICHandshake was mistakenly using the host
network rather than the measuring network to create UDP listening
sockets, which, well..., generated lots of confusion for me 😅 🤦.

While there, make sure the netemx.MustNewScenario function creates
HTTP/3 listeners for DNS servers. Previously, we did not create such
listeners, which however is necessary because we very often use DoH3.

While there, make sure we have issues for upcoming work.

Closes ooni/probe#2618.
  • Loading branch information
bassosimone authored Oct 25, 2023
1 parent 420aadb commit 5c50be4
Show file tree
Hide file tree
Showing 15 changed files with 423 additions and 21 deletions.
2 changes: 2 additions & 0 deletions internal/dslx/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type EndpointPort uint16
// ToEndpoints transforms this set of IP addresses to a list of endpoints. We will
// combine each IP address with the network and the port to construct an endpoint and
// we will also apply any additional option to each endpoint.
//
// Deprecated: use MakeEndpoint instead.
func (as *AddressSet) ToEndpoints(
network EndpointNetwork, port EndpointPort, options ...EndpointOption) (v []*Endpoint) {
for addr := range as.M {
Expand Down
35 changes: 28 additions & 7 deletions internal/dslx/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ type ResolvedAddresses struct {
Domain string
}

// Flatten transforms a [ResolvedAddresses] into a slice of zero or more [ResolvedAddress].
func (ra *ResolvedAddresses) Flatten() (out []*ResolvedAddress) {
for _, ipAddr := range ra.Addresses {
out = append(out, &ResolvedAddress{
Address: ipAddr,
Domain: ra.Domain,
})
}
return
}

// ResolvedAddress is a single address resolved using a DNS lookup function.
type ResolvedAddress struct {
// Address is the address that was resolved.
Address string

// Domain is the domain from which we resolved the address.
Domain string
}

// DNSLookupGetaddrinfo returns a function that resolves a domain name to
// IP addresses using libc's getaddrinfo function.
func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses] {
Expand All @@ -90,18 +110,17 @@ func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses]
// lookup
addrs, err := resolver.LookupHost(ctx, input.Domain)

// stop the operation logger
ol.Stop(err)

// save the observations
rt.SaveObservations(maybeTraceToObservations(trace)...)

// handle error case
if err != nil {
ol.Stop(err)
return nil, err
}

// handle success
ol.Stop(addrs)
state := &ResolvedAddresses{
Addresses: addrs,
Domain: input.Domain,
Expand Down Expand Up @@ -141,18 +160,17 @@ func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *ResolvedA
// lookup
addrs, err := resolver.LookupHost(ctx, input.Domain)

// stop the operation logger
ol.Stop(err)

// save the observations
rt.SaveObservations(maybeTraceToObservations(trace)...)

// handle error case
if err != nil {
ol.Stop(err)
return nil, err
}

// handle success
ol.Stop(addrs)
state := &ResolvedAddresses{
Addresses: addrs,
Domain: input.Domain,
Expand All @@ -170,8 +188,11 @@ var ErrDNSLookupParallel = errors.New("dslx: DNSLookupParallel failed")
// processing observations or by creating a per-DNS-resolver pipeline.
func DNSLookupParallel(fxs ...Func[*DomainToResolve, *ResolvedAddresses]) Func[*DomainToResolve, *ResolvedAddresses] {
return Operation[*DomainToResolve, *ResolvedAddresses](func(ctx context.Context, domain *DomainToResolve) (*ResolvedAddresses, error) {
// TODO(https://github.com/ooni/probe/issues/2619): we may want to configure this
const parallelism = Parallelism(3)

// run all the DNS resolvers in parallel
results := Parallel(ctx, Parallelism(2), domain, fxs...)
results := Parallel(ctx, parallelism, domain, fxs...)

// reduce addresses
addressSet := NewAddressSet()
Expand Down
41 changes: 41 additions & 0 deletions internal/dslx/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ package dslx
// Manipulate endpoints
//

import (
"context"
"net"
"strconv"
)

type (
// EndpointNetwork is the network of the endpoint
EndpointNetwork string
Expand Down Expand Up @@ -70,3 +76,38 @@ func NewEndpoint(
}
return epnt
}

// MakeEndpoint returns a [Func] that creates an [*Endpoint] given [*ResolvedAddress].
func MakeEndpoint(network EndpointNetwork, port EndpointPort, options ...EndpointOption) Func[*ResolvedAddress, *Endpoint] {
return Operation[*ResolvedAddress, *Endpoint](func(ctx context.Context, addr *ResolvedAddress) (*Endpoint, error) {
// create the destination endpoint address
addrport := EndpointAddress(net.JoinHostPort(addr.Address, strconv.Itoa(int(port))))

// make sure we include the proper domain name first but allow the caller
// to potentially override the domain name using options
allOptions := []EndpointOption{
EndpointOptionDomain(addr.Domain),
}
allOptions = append(allOptions, options...)

// build and return the endpoint
endpoint := NewEndpoint(network, addrport, allOptions...)
return endpoint, nil
})
}

// MeasureResolvedAddresses returns a [Func] that measures the resolved addresses provided
// as the input argument using each of the provided functions.
func MeasureResolvedAddresses(fxs ...Func[*ResolvedAddress, Void]) Func[*ResolvedAddresses, Void] {
return Operation[*ResolvedAddresses, Void](func(ctx context.Context, addrs *ResolvedAddresses) (Void, error) {
// TODO(https://github.com/ooni/probe/issues/2619): we may want to configure this
const parallelism = Parallelism(3)

// run the matrix until the output is drained
for range Matrix(ctx, parallelism, addrs.Flatten(), fxs) {
// nothing
}

return Void{}, nil
})
}
60 changes: 60 additions & 0 deletions internal/dslx/fxasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Parallelism int
// The return value is the channel generating fx(a)
// for every a in inputs. This channel will also be closed
// to signal EOF to the consumer.
//
// Deprecated: use Matrix instead.
func Map[A, B any](
ctx context.Context,
parallelism Parallelism,
Expand Down Expand Up @@ -77,6 +79,8 @@ func Map[A, B any](
// - fn is the list of functions.
//
// The return value is the list [fx(a)] for every fx in fn.
//
// Deprecated: use Matrix instead.
func Parallel[A, B any](
ctx context.Context,
parallelism Parallelism,
Expand All @@ -90,6 +94,8 @@ func Parallel[A, B any](
// ParallelAsync is like Parallel but deals with channels. We assume the
// input channel will be closed to signal EOF. We will close the output
// channel to signal EOF to the consumer.
//
// Deprecated: use Matrix instead.
func ParallelAsync[A, B any](
ctx context.Context,
parallelism Parallelism,
Expand Down Expand Up @@ -124,10 +130,64 @@ func ParallelAsync[A, B any](
}

// ApplyAsync is equivalent to calling Apply but returns a channel.
//
// Deprecated: use Matrix instead.
func ApplyAsync[A, B any](
ctx context.Context,
fx Func[A, B],
input A,
) <-chan *Maybe[B] {
return Map(ctx, Parallelism(1), fx, StreamList(input))
}

// matrixPoint is a point within the matrix used by [Matrix].
type matrixPoint[A, B any] struct {
f Func[A, B]
in A
}

// matrixMin can be replaced with the built-in min when we switch to go1.21.
func matrixMin(a, b Parallelism) Parallelism {
if a < b {
return a
}
return b
}

// Matrix invokes each function on each input using N goroutines and streams the results to a channel.
func Matrix[A, B any](ctx context.Context, N Parallelism, inputs []A, functions []Func[A, B]) <-chan *Maybe[B] {
// make output
output := make(chan *Maybe[B])

// stream all the possible points
points := make(chan *matrixPoint[A, B])
go func() {
defer close(points)
for _, input := range inputs {
for _, fx := range functions {
points <- &matrixPoint[A, B]{f: fx, in: input}
}
}
}()

// spawn goroutines
wg := &sync.WaitGroup{}
N = matrixMin(1, N)
for i := Parallelism(0); i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for p := range points {
output <- p.f.Apply(ctx, NewMaybeWithValue(p.in))
}
}()
}

// close output channel when done
go func() {
defer close(output)
wg.Wait()
}()

return output
}
12 changes: 12 additions & 0 deletions internal/dslx/fxasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,15 @@ func TestParallel(t *testing.T) {
}
})
}

func TestMatrixMin(t *testing.T) {
if v := matrixMin(1, 7); v != 1 {
t.Fatal("expected to see 1, got", v)
}
if v := matrixMin(7, 4); v != 4 {
t.Fatal("expected to see 4, got", v)
}
if v := matrixMin(11, 11); v != 11 {
t.Fatal("expected to see 11, got", v)
}
}
75 changes: 75 additions & 0 deletions internal/dslx/fxcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,23 @@ package dslx

import (
"context"
"errors"
"sync"
)

// Func is a function f: (context.Context, A) -> B.
type Func[A, B any] interface {
Apply(ctx context.Context, a *Maybe[A]) *Maybe[B]
}

// FuncAdapter adapts a func to be a [Func].
type FuncAdapter[A, B any] func(ctx context.Context, a *Maybe[A]) *Maybe[B]

// Apply implements Func.
func (fa FuncAdapter[A, B]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[B] {
return fa(ctx, a)
}

// Operation adapts a golang function to behave like a Func.
type Operation[A, B any] func(ctx context.Context, a A) (B, error)

Expand Down Expand Up @@ -73,3 +83,68 @@ type compose2Func[A, B, C any] struct {
func (h *compose2Func[A, B, C]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[C] {
return h.g.Apply(ctx, h.f.Apply(ctx, a))
}

// Void is the empty data structure.
type Void struct{}

// Discard transforms any type to [Void].
func Discard[T any]() Func[T, Void] {
return Operation[T, Void](func(ctx context.Context, input T) (Void, error) {
return Void{}, nil
})
}

// ErrSkip is an error that indicates that we already processed an error emitted
// by a previous stage, so we are using this error to avoid counting the original
// error more than once when computing statistics, e.g., in [*Stats].
var ErrSkip = errors.New("dslx: error already processed by a previous stage")

// Stats measures the number of successes and failures.
//
// The zero value is invalid; use [NewStats].
type Stats[T any] struct {
m map[string]int64
mu sync.Mutex
}

// NewStats creates a [*Stats] instance.
func NewStats[T any]() *Stats[T] {
return &Stats[T]{
m: map[string]int64{},
mu: sync.Mutex{},
}
}

// Observer returns a Func that observes the results of the previous pipeline stage. This function
// converts any error that it sees to [ErrSkip]. This function does not account for [ErrSkip], meaning
// that you will never see [ErrSkip] in the stats returned by [Stats.Export].
func (s *Stats[T]) Observer() Func[T, T] {
return FuncAdapter[T, T](func(ctx context.Context, minput *Maybe[T]) *Maybe[T] {
defer s.mu.Unlock()
s.mu.Lock()
var r string
if err := minput.Error; err != nil {
if errors.Is(err, ErrSkip) {
return NewMaybeWithError[T](ErrSkip) // as documented
}
r = err.Error()
}
s.m[r]++
if r != "" {
return NewMaybeWithError[T](ErrSkip) // as documented
}
return minput
})
}

// Export exports the current stats without clearing the internally used map such that
// statistics accumulate over time and never reset for the [*Stats] lifecycle.
func (s *Stats[T]) Export() (out map[string]int64) {
out = make(map[string]int64)
defer s.mu.Unlock()
s.mu.Lock()
for r, cnt := range s.m {
out[r] = cnt
}
return
}
12 changes: 5 additions & 7 deletions internal/dslx/fxstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ func Collect[T any](c <-chan T) (v []T) {
// StreamList creates a channel out of static values. This function will
// close the channel when it has streamed all the available elements.
func StreamList[T any](ts ...T) <-chan T {
c := make(chan T)
go func() {
defer close(c) // as documented
for _, t := range ts {
c <- t
}
}()
c := make(chan T, len(ts)) // buffer so writing does not block
defer close(c) // as documented
for _, t := range ts {
c <- t
}
return c
}

Expand Down
4 changes: 2 additions & 2 deletions internal/dslx/httpcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func httpRoundTrip(
input *HTTPConnection,
req *http.Request,
) (*http.Response, []byte, []*Observations, error) {
const maxbody = 1 << 19 // TODO(bassosimone): allow to configure this value?
const maxbody = 1 << 19 // TODO(https://github.com/ooni/probe/issues/2621): allow to configure this value
started := input.Trace.TimeSince(input.Trace.ZeroTime())

// manually create a single 1-length observations structure because
Expand Down Expand Up @@ -249,7 +249,7 @@ func httpRoundTrip(

// read a snapshot of the response body
reader := io.LimitReader(resp.Body, maxbody)
body, err = netxlite.ReadAllContext(ctx, reader) // TODO: enable streaming and measure speed
body, err = netxlite.ReadAllContext(ctx, reader) // TODO(https://github.com/ooni/probe/issues/2622)

// collect and save download speed samples
samples := sampler.ExtractSamples()
Expand Down
Loading

0 comments on commit 5c50be4

Please sign in to comment.