Skip to content

Commit

Permalink
Pull request: 4254 fix optimistic
Browse files Browse the repository at this point in the history
Merge in DNS/dnsproxy from 4254-fix-optimistic to master

Updates AdguardTeam/AdGuardHome#4254.

Squashed commit of the following:

commit 90f3467
Author: Eugene Burkov <[email protected]>
Date:   Mon Feb 7 18:17:21 2022 +0300

    proxy: imp docs again

commit 010a02c
Author: Eugene Burkov <[email protected]>
Date:   Mon Feb 7 18:16:38 2022 +0300

    proxy: imp code

commit f6e31fa
Author: Eugene Burkov <[email protected]>
Date:   Mon Feb 7 18:08:45 2022 +0300

    proxy: imp docs

commit a50ba57
Author: Eugene Burkov <[email protected]>
Date:   Mon Feb 7 17:33:55 2022 +0300

    proxy: fix lint

commit 4288ccb
Author: Eugene Burkov <[email protected]>
Date:   Mon Feb 7 17:26:38 2022 +0300

    proxy: make optimistic actually optimistic
  • Loading branch information
EugeneOne1 committed Feb 7, 2022
1 parent 3defd67 commit 5956b6d
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 180 deletions.
33 changes: 1 addition & 32 deletions proxy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,7 @@ func (p *Proxy) initCache() {
c.initLazyWithSubnet()
}

p.shortFlighter = newOptimisticResolver(
p.replyFromUpstream,
p.cacheResp,
c.del,
)
p.shortFlighterWithSubnet = newOptimisticResolver(
p.replyFromUpstream,
p.cacheResp,
c.delWithSubnet,
)
p.shortFlighter = newOptimisticResolver(p)
}

// get returns cached item for the req if it's found. expired is true if the
Expand Down Expand Up @@ -528,25 +519,3 @@ func filterMsg(dst, m *dns.Msg, ad, do bool, ttl uint32) {
dst.Ns = filterRRSlice(m.Ns, do, ttl, dns.TypeNone)
dst.Extra = filterRRSlice(m.Extra, do, ttl, dns.TypeNone)
}

func (c *cache) del(key []byte) {
c.itemsLock.RLock()
defer c.itemsLock.RUnlock()

if c.items == nil {
return
}

c.items.Del(key)
}

func (c *cache) delWithSubnet(key []byte) {
c.itemsWithSubnetLock.RLock()
defer c.itemsWithSubnetLock.RUnlock()

if c.itemsWithSubnet == nil {
return
}

c.itemsWithSubnet.Del(key)
}
44 changes: 20 additions & 24 deletions proxy/optimisticresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,33 @@ import (
"github.com/AdguardTeam/golibs/log"
)

// resolveFunc is the signature of a method to resolve expired cached requests.
// This is exactly the signature of Proxy.replyFromUpstream.
type resolveFunc func(dctx *DNSContext) (ok bool, err error)
// cachingResolver is the DNS resolver that is also able to cache responses.
type cachingResolver interface {
// replyFromUpstream returns true if the request from dctx is successfully
// resolved and the response may be cached.
//
// TODO(e.burkov): Find out when ok can be false with nil err.
replyFromUpstream(dctx *DNSContext) (ok bool, err error)

// setFunc is the signature of a method to cache response. This is exactly the
// signature of Proxy.setInCache method.
type setFunc func(dctx *DNSContext)
// cacheResp caches the response from dctx.
cacheResp(dctx *DNSContext)
}

// deleteFunc is the signature of a method to remove the response from cache.
type deleteFunc func(key []byte)
// type check
var _ cachingResolver = (*Proxy)(nil)

// optimisticResolver is used to eventually resolve expired cached requests.
//
// TODO(e.burkov): Think about generalizing all function-fields into a single
// interface.
type optimisticResolver struct {
reqs *sync.Map
resolve resolveFunc
set setFunc
delete deleteFunc
reqs *sync.Map
cr cachingResolver
}

// newOptimisticResolver returns the new resolver for expired cached requests.
func newOptimisticResolver(rf resolveFunc, sf setFunc, df deleteFunc) (s *optimisticResolver) {
// cr must not be nil.
func newOptimisticResolver(cr cachingResolver) (s *optimisticResolver) {
return &optimisticResolver{
reqs: &sync.Map{},
resolve: rf,
set: sf,
delete: df,
reqs: &sync.Map{},
cr: cr,
}
}

Expand All @@ -55,14 +53,12 @@ func (s *optimisticResolver) ResolveOnce(dctx *DNSContext, key []byte) {
}
defer s.reqs.Delete(keyHexed)

ok, err := s.resolve(dctx)
ok, err := s.cr.replyFromUpstream(dctx)
if err != nil {
log.Debug("resolving request for optimistic cache: %s", err)
}

if ok {
s.set(dctx)
} else {
s.delete(key)
s.cr.cacheResp(dctx)
}
}
77 changes: 44 additions & 33 deletions proxy/optimisticresolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,45 @@ import (
"github.com/stretchr/testify/assert"
)

func TestOptimisticResolver_ResolveOnce(t *testing.T) {
in, out := make(chan unit), make(chan unit)
var timesResolved int
testResolveFunc := func(_ *DNSContext) (ok bool, err error) {
timesResolved++
// testCachingResolver is a stub implementation of the cachingResolver interface
// to simplify testing.
type testCachingResolver struct {
onReplyFromUpstream func(dctx *DNSContext) (ok bool, err error)
onCacheResp func(dctx *DNSContext)
}

return true, nil
}
// replyFromUpstream implements the cachingResolver interface for
// *testCachingResolver.
func (tcr *testCachingResolver) replyFromUpstream(dctx *DNSContext) (ok bool, err error) {
return tcr.onReplyFromUpstream(dctx)
}

var timesSet int
testSetFunc := func(_ *DNSContext) {
timesSet++
// cacheResp implements the cachingResolver interface for *testCachingResolver.
func (tcr *testCachingResolver) cacheResp(dctx *DNSContext) {
tcr.onCacheResp(dctx)
}

// Pass the signal to begin running secondary goroutines.
out <- unit{}
// Block until all the secondary goroutines finish.
<-in
func TestOptimisticResolver_ResolveOnce(t *testing.T) {
in, out := make(chan unit), make(chan unit)
var timesResolved, timesSet int

tcr := &testCachingResolver{
onReplyFromUpstream: func(_ *DNSContext) (ok bool, err error) {
timesResolved++

return true, nil
},
onCacheResp: func(_ *DNSContext) {
timesSet++

// Pass the signal to begin running secondary goroutines.
out <- unit{}
// Block until all the secondary goroutines finish.
<-in
},
}

s := newOptimisticResolver(testResolveFunc, testSetFunc, nil)
s := newOptimisticResolver(tcr)
sameKey := []byte{1, 2, 3}

// Start the primary goroutine.
Expand Down Expand Up @@ -61,8 +80,6 @@ func TestOptimisticResolver_ResolveOnce(t *testing.T) {
func TestOptimisticResolver_ResolveOnce_unsuccessful(t *testing.T) {
key := []byte{1, 2, 3}

noopSetFunc := func(_ *DNSContext) {}

t.Run("error", func(t *testing.T) {
logOutput := &bytes.Buffer{}

Expand All @@ -76,29 +93,23 @@ func TestOptimisticResolver_ResolveOnce_unsuccessful(t *testing.T) {
})

const rerr errors.Error = "sample resolving error"
testResolveFunc := func(_ *DNSContext) (ok bool, err error) {
return true, rerr
}

s := newOptimisticResolver(testResolveFunc, noopSetFunc, nil)
s := newOptimisticResolver(&testCachingResolver{
onReplyFromUpstream: func(_ *DNSContext) (ok bool, err error) { return true, rerr },
onCacheResp: func(_ *DNSContext) {},
})
s.ResolveOnce(nil, key)

assert.Contains(t, logOutput.String(), rerr.Error())
})

t.Run("not_ok", func(t *testing.T) {
testResolveFunc := func(_ *DNSContext) (ok bool, err error) {
return false, nil
}

var deleteCalled bool
testDeleteFunc := func(_ []byte) {
deleteCalled = true
}

s := newOptimisticResolver(testResolveFunc, noopSetFunc, testDeleteFunc)
cached := false
s := newOptimisticResolver(&testCachingResolver{
onReplyFromUpstream: func(_ *DNSContext) (ok bool, err error) { return false, nil },
onCacheResp: func(_ *DNSContext) { cached = true },
})
s.ResolveOnce(nil, key)

assert.True(t, deleteCalled)
assert.False(t, cached)
})
}
17 changes: 6 additions & 11 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,6 @@ type Proxy struct {
// shortFlighter is used to resolve the expired cached requests without
// repetitions.
shortFlighter *optimisticResolver
// shortFlighterWithSubnet is used to resolve the expired cached
// requests making sure that only one request for each cache item is
// performed at a time.
shortFlighterWithSubnet *optimisticResolver

// FastestAddr module
// --
Expand Down Expand Up @@ -388,8 +384,7 @@ func (p *Proxy) Addr(proto Proto) net.Addr {
}
}

// replyFromUpstream tries to resolve the request and caches it if cacheWorks is
// true.
// replyFromUpstream tries to resolve the request.
func (p *Proxy) replyFromUpstream(d *DNSContext) (ok bool, err error) {
req := d.Req
host := req.Question[0].Name
Expand All @@ -410,7 +405,7 @@ func (p *Proxy) replyFromUpstream(d *DNSContext) (ok bool, err error) {
var u upstream.Upstream
reply, u, err = p.exchange(req, upstreams)
if p.isNAT64PrefixAvailable() && p.isEmptyAAAAResponse(reply, req) {
log.Tracef("Received an empty AAAA response, checking DNS64")
log.Tracef("received an empty AAAA response, checking DNS64")
reply, u, err = p.checkDNS64(req, reply, upstreams)
} else if p.isBogusNXDomain(reply) {
log.Tracef("response ip is contained in bogus-nxdomain list")
Expand All @@ -420,7 +415,7 @@ func (p *Proxy) replyFromUpstream(d *DNSContext) (ok bool, err error) {
log.Tracef("RTT: %s", time.Since(start))

if err != nil && p.Fallbacks != nil {
log.Tracef("Using the fallback upstream due to %s", err)
log.Tracef("using the fallback upstream due to %s", err)

reply, u, err = upstream.ExchangeParallel(p.Fallbacks, req)
}
Expand All @@ -432,9 +427,9 @@ func (p *Proxy) replyFromUpstream(d *DNSContext) (ok bool, err error) {
d.Upstream = u
p.setMinMaxTTL(reply)

// Explicitly construct the question section since some
// upstreams may respond with invalidly constructed messages
// which cause out-of-range panics afterwards.
// Explicitly construct the question section since some upstreams may
// respond with invalidly constructed messages which cause out-of-range
// panics afterwards.
//
// See https://github.com/AdguardTeam/AdGuardHome/issues/3551.
if len(req.Question) > 0 && len(reply.Question) == 0 {
Expand Down
11 changes: 3 additions & 8 deletions proxy/proxy_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ func (p *Proxy) replyFromCache(d *DNSContext) (hit bool) {
hitMsg := "serving cached response"

var expired bool
var withSubnet bool
var key []byte
if !p.Config.EnableEDNSClientSubnet {
ci, expired, key = p.cache.get(d.Req)
} else if withSubnet = d.ecsReqMask != 0; withSubnet {
} else if d.ecsReqMask != 0 {
ci, expired, key = p.cache.getWithSubnet(d.Req, d.ecsReqIP, d.ecsReqMask)
hitMsg = "serving response from subnet cache"
} else if d.ecsReqMask == 0 {
} else {
ci, expired, key = p.cache.get(d.Req)
hitMsg = "serving response from general cache"
}
Expand Down Expand Up @@ -49,11 +48,7 @@ func (p *Proxy) replyFromCache(d *DNSContext) (hit bool) {
minCtxClone.Req = req
}

if !withSubnet {
go p.shortFlighter.ResolveOnce(minCtxClone, key)
} else {
go p.shortFlighterWithSubnet.ResolveOnce(minCtxClone, key)
}
go p.shortFlighter.ResolveOnce(minCtxClone, key)
}

return hit
Expand Down
Loading

0 comments on commit 5956b6d

Please sign in to comment.