diff --git a/resolver/min_conns.go b/resolver/min_conns.go new file mode 100644 index 0000000..22daa3e --- /dev/null +++ b/resolver/min_conns.go @@ -0,0 +1,73 @@ +// Copyright 2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resolver + +import ( + "context" + "io" +) + +// MinConnections decorates the given resolver so it sends a set of addresses that has at +// least as many entries as the given minimum. If the given resolver provides a smaller +// set of addresses, it replicates those addresses until the give minimum is reached. +// +// This will cause a client to effectively make redundant connections to the same address +// which is particularly useful when the addresses are virtual IPs, which actually have +// multiple servers behind them. This is appropriate for environments like Kubernetes +// (which uses virtual IPs for non-headless services) and for use with services that have +// layer-4 (TCP) proxies/load balancers in front of them. +// +// To avoid "hot spotting", where one backend address gets more load than others, this +// always fully replicates the set. So it will always report at least minAddresses, but +// could report nearly twice as many: in the case where the set from the underlying +// resolver has minAddresses-1 entries, this will provide (minAddresses-1)*2 entries. +func MinConnections(other Resolver, minAddresses int) Resolver { + return &minConnsResolver{res: other, min: minAddresses} +} + +type minConnsResolver struct { + res Resolver + min int +} + +func (m *minConnsResolver) New(ctx context.Context, scheme, hostPort string, receiver Receiver, refresh <-chan struct{}) io.Closer { + return m.res.New(ctx, scheme, hostPort, &minConnsReceiver{rcvr: receiver, min: m.min}, refresh) +} + +type minConnsReceiver struct { + rcvr Receiver + min int +} + +func (m *minConnsReceiver) OnResolve(addresses []Address) { + if len(addresses) >= m.min || len(addresses) == 0 { + // Already enough addresses; OR zero addresses, in which case, no amount of replication can help. + m.rcvr.OnResolve(addresses) + return + } + multiplier := m.min / len(addresses) + if len(addresses)*multiplier < m.min { + multiplier++ // div rounded down + } + scaledAddrs := make([]Address, 0, len(addresses)*multiplier) + for i := 0; i < multiplier; i++ { + scaledAddrs = append(scaledAddrs, addresses...) + } + m.rcvr.OnResolve(scaledAddrs) +} + +func (m *minConnsReceiver) OnResolveError(err error) { + m.rcvr.OnResolveError(err) +} diff --git a/resolver/min_conns_test.go b/resolver/min_conns_test.go new file mode 100644 index 0000000..d840d06 --- /dev/null +++ b/resolver/min_conns_test.go @@ -0,0 +1,67 @@ +// Copyright 2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resolver_test + +import ( + "context" + "testing" + + . "github.com/bufbuild/httplb/resolver" + "github.com/stretchr/testify/assert" +) + +func TestMinConnections(t *testing.T) { + t.Parallel() + + refreshCh := make(chan struct{}) + defer close(refreshCh) + + addrFoo := Address{HostPort: "foo"} + addrBar := Address{HostPort: "bar"} + addrBaz := Address{HostPort: "baz"} + addrQux := Address{HostPort: "qux"} + addresses := []Address{addrFoo, addrBar, addrBaz, addrQux} + + var resolver fakeResolver + minResolver := MinConnections(&resolver, 6) + var receiver fakeReceiver + _ = minResolver.New(context.Background(), "", "", &receiver, refreshCh) + + resolver.receiver.OnResolve([]Address{}) + assert.Equal(t, receiver.addrs, []Address{}) + + resolver.receiver.OnResolve([]Address{addrFoo}) + assert.Equal(t, receiver.addrs, []Address{ // single address, repeated 6 times + addrFoo, addrFoo, addrFoo, addrFoo, addrFoo, addrFoo, + }) + + resolver.receiver.OnResolve([]Address{addrFoo, addrBar}) + assert.Equal(t, receiver.addrs, []Address{ // both addresses, each repeated 3 times + addrFoo, addrBar, addrFoo, addrBar, addrFoo, addrBar, + }) + + resolver.receiver.OnResolve(append([]Address{}, addresses...)) + assert.Equal(t, receiver.addrs, []Address{ // all four addresses, each repeated + addrFoo, addrBar, addrBaz, addrQux, addrFoo, addrBar, addrBaz, addrQux, + }) + + minResolver = MinConnections(&resolver, 3) + _ = minResolver.New(context.Background(), "", "", &receiver, refreshCh) + + resolver.receiver.OnResolve(append([]Address{}, addresses...)) + assert.Equal(t, receiver.addrs, []Address{ // all four addresses, no repetition + addrFoo, addrBar, addrBaz, addrQux, + }) +}