From f552cb08e41e3b05050b40dc64d973bb26466236 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 5 Apr 2024 07:47:53 +0200 Subject: [PATCH] perf(blooms): Use jumpmap for client side sharding of the filter requests (#12470) **What this PR does / why we need it**: Use the same client-side sharding mechanism for bloom gateway filter requests that we use for client side sharding in the memcached client. This uses DNS discovery to resolve a list of addresses uses as buckets for the jumphash algorithm to shard GroupedChunkRefs across bloom gateway servers. The ring component is still part of the bloom gateway, but it is not used any more. **Special notes for your reviewer**: https://arxiv.org/pdf/1406.2294.pdf Signed-off-by: Christian Haudum Co-authored-by: Owen Diehl --- docs/sources/configure/_index.md | 18 +- pkg/bloomgateway/client.go | 276 ++++------------ pkg/bloomgateway/client_pool.go | 106 +++++++ pkg/bloomgateway/client_pool_test.go | 35 ++ pkg/bloomgateway/client_test.go | 298 +----------------- pkg/bloomgateway/metrics.go | 16 + pkg/loki/modules.go | 2 - pkg/storage/chunk/cache/memcached_client.go | 3 +- pkg/util/discovery/dns.go | 11 +- .../jumphash}/memcached_client_selector.go | 46 ++- .../memcached_client_selector_test.go | 8 +- 11 files changed, 286 insertions(+), 533 deletions(-) create mode 100644 pkg/bloomgateway/client_pool.go create mode 100644 pkg/bloomgateway/client_pool_test.go rename pkg/{storage/chunk/cache => util/jumphash}/memcached_client_selector.go (80%) rename pkg/{storage/chunk/cache => util/jumphash}/memcached_client_selector_test.go (93%) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index d3265e86445fe..2837647cff97f 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2089,11 +2089,18 @@ ring: client: # Configures the behavior of the connection pool. pool_config: - [client_cleanup_period: ] + # How frequently to clean up clients for servers that have gone away or are + # unhealthy. + # CLI flag: -bloom-gateway-client.pool.check-interval + [check_interval: | default = 10s] - [health_check_ingesters: ] + # Run a health check on each server during periodic cleanup. + # CLI flag: -bloom-gateway-client.pool.enable-health-check + [enable_health_check: | default = true] - [remote_timeout: ] + # Timeout for the health check if health check is enabled. + # CLI flag: -bloom-gateway-client.pool.health-check-timeout + [health_check_timeout: | default = 1s] # The grpc_client block configures the gRPC client used to communicate between # two Loki components. @@ -2116,6 +2123,11 @@ client: # CLI flag: -bloom-gateway-client.cache_results [cache_results: | default = false] + # Comma separated addresses list in DNS Service Discovery format: + # https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes + # CLI flag: -bloom-gateway-client.addresses + [addresses: | default = ""] + # Number of workers to use for filtering chunks concurrently. # CLI flag: -bloom-gateway.worker-concurrency [worker_concurrency: | default = 4] diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 5d2af03403d45..53a57b2a51d86 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -6,70 +6,33 @@ import ( "fmt" "io" "math" - "math/rand" - "strings" - "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/grpcclient" - "github.com/grafana/dskit/instrument" - "github.com/grafana/dskit/ring" ringclient "github.com/grafana/dskit/ring/client" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" - "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/grafana/loki/v3/pkg/bloomutils" - "github.com/grafana/loki/v3/pkg/distributor/clientpool" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/queue" - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" "github.com/grafana/loki/v3/pkg/util/constants" - util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/discovery" ) var ( - // BlocksOwnerRead is the operation used to check the authoritative owners of a block - // (replicas included) that are available for queries (a bloom gateway is available for - // queries only when ACTIVE). - BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) // groupedChunksRefPool pooling slice of logproto.GroupedChunkRefs [64, 128, 256, ..., 65536] groupedChunksRefPool = queue.NewSlicePool[*logproto.GroupedChunkRefs](1<<6, 1<<16, 2) - // ringGetBuffersPool pooling for ringGetBuffers to avoid calling ring.MakeBuffersForGet() for each request - ringGetBuffersPool = sync.Pool{ - New: func() interface{} { - descs, hosts, zones := ring.MakeBuffersForGet() - return &ringGetBuffers{ - Descs: descs, - Hosts: hosts, - Zones: zones, - } - }, - } ) -type ringGetBuffers struct { - Descs []ring.InstanceDesc - Hosts []string - Zones []string -} - -func (buf *ringGetBuffers) Reset() { - buf.Descs = buf.Descs[:0] - buf.Hosts = buf.Hosts[:0] - buf.Zones = buf.Zones[:0] -} - // GRPCPool represents a pool of gRPC connections to different bloom gateway instances. // Interfaces are inlined for simplicity to automatically satisfy interface functions. type GRPCPool struct { @@ -98,19 +61,17 @@ func NewBloomGatewayGRPCPool(address string, opts []grpc.DialOption) (*GRPCPool, type ClientConfig struct { // PoolConfig defines the behavior of the gRPC connection pool used to communicate // with the Bloom Gateway. - // It is defined at the distributors YAML section and reused here. - PoolConfig clientpool.PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures the behavior of the connection pool."` + PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures the behavior of the connection pool."` // GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server. GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` - // Ring is the Bloom Gateway ring used to find the appropriate Bloom Gateway instance - // this client should talk to. - Ring ring.ReadRing `yaml:"-"` - // Cache configures the cache used to store the results of the Bloom Gateway server. Cache CacheConfig `yaml:"results_cache,omitempty"` CacheResults bool `yaml:"cache_results"` + + // Client sharding using DNS disvovery and jumphash + Addresses string `yaml:"addresses,omitempty"` } // RegisterFlags registers flags for the Bloom Gateway client configuration. @@ -122,7 +83,9 @@ func (i *ClientConfig) RegisterFlags(f *flag.FlagSet) { func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc", f) i.Cache.RegisterFlagsWithPrefix(prefix+"cache.", f) + i.PoolConfig.RegisterFlagsWithPrefix(prefix+"pool.", f) f.BoolVar(&i.CacheResults, prefix+"cache_results", false, "Flag to control whether to cache bloom gateway client requests/responses.") + f.StringVar(&i.Addresses, prefix+"addresses", "", "Comma separated addresses list in DNS Service Discovery format: https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes") } func (i *ClientConfig) Validate() error { @@ -130,12 +93,20 @@ func (i *ClientConfig) Validate() error { return errors.Wrap(err, "grpc client config") } + if err := i.PoolConfig.Validate(); err != nil { + return errors.Wrap(err, "pool config") + } + if i.CacheResults { if err := i.Cache.Validate(); err != nil { return errors.Wrap(err, "cache config") } } + if i.Addresses == "" { + return errors.New("addresses requires a list of comma separated strings in DNS service discovery format with at least one item") + } + return nil } @@ -144,33 +115,25 @@ type Client interface { } type GatewayClient struct { - cfg ClientConfig - limits Limits - logger log.Logger - metrics *clientMetrics - pool *ringclient.Pool - ring ring.ReadRing + cfg ClientConfig + limits Limits + logger log.Logger + metrics *clientMetrics + pool *JumpHashClientPool + dnsProvider *discovery.DNS } func NewClient( cfg ClientConfig, - readRing ring.ReadRing, limits Limits, registerer prometheus.Registerer, logger log.Logger, - metricsNamespace string, cacheGen resultscache.CacheGenNumberLoader, retentionEnabled bool, ) (*GatewayClient, error) { - latency := promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ - Namespace: constants.Loki, - Subsystem: "bloom_gateway", - Name: "request_duration_seconds", - Help: "Time (in seconds) spent serving requests when using the bloom gateway", - Buckets: instrument.DefBuckets, - }, []string{"operation", "status_code"}) - - dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(latency)) + metrics := newClientMetrics(registerer) + + dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(metrics.requestLatency)) if err != nil { return nil, err } @@ -206,29 +169,35 @@ func NewClient( return pool, nil } + dnsProvider := discovery.NewDNS(logger, cfg.PoolConfig.CheckInterval, cfg.Addresses, nil) + // Make an attempt to do one DNS lookup so we can start with addresses + dnsProvider.RunOnce() + + clientPool := ringclient.NewPool( + "bloom-gateway", + ringclient.PoolConfig(cfg.PoolConfig), + func() ([]string, error) { return dnsProvider.Addresses(), nil }, + ringclient.PoolAddrFunc(poolFactory), + metrics.clients, + logger, + ) + + pool := NewJumpHashClientPool(clientPool, dnsProvider, cfg.PoolConfig.CheckInterval, logger) + pool.Start() + return &GatewayClient{ - cfg: cfg, - logger: logger, - limits: limits, - metrics: newClientMetrics(registerer), - pool: clientpool.NewPool("bloom-gateway", cfg.PoolConfig, cfg.Ring, ringclient.PoolAddrFunc(poolFactory), logger, metricsNamespace), - ring: readRing, + cfg: cfg, + logger: logger, + limits: limits, + metrics: metrics, + pool: pool, + dnsProvider: dnsProvider, // keep reference so we can stop it when the client is closed }, nil } -func JoinFunc[S ~[]E, E any](elems S, sep string, f func(e E) string) string { - res := make([]string, len(elems)) - for i := range elems { - res[i] = f(elems[i]) - } - return strings.Join(res, sep) -} - -func shuffleAddrs(addrs []string) []string { - rand.Shuffle(len(addrs), func(i, j int) { - addrs[i], addrs[j] = addrs[j], addrs[i] - }) - return addrs +func (c *GatewayClient) Close() { + c.pool.Stop() + c.dnsProvider.Stop() } // FilterChunkRefs implements Client @@ -237,19 +206,23 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t return groups, nil } - subRing := GetShuffleShardingSubring(c.ring, tenant, c.limits) - rs, err := subRing.GetAllHealthy(BlocksOwnerRead) - if err != nil { - return nil, errors.Wrap(err, "bloom gateway get healthy instances") + clients := make(map[string][]*logproto.GroupedChunkRefs) + for _, g := range groups { + addr, err := c.pool.AddrForFingerprint(g.Fingerprint) + if err != nil { + return nil, errors.Wrap(err, "server address for fingerprint") + } + clients[addr] = append(clients[addr], g) } - servers, err := replicationSetsWithBounds(subRing, rs.Instances) - - if err != nil { - return nil, errors.Wrap(err, "bloom gateway get replication sets") + servers := make([]addrWithGroups, 0, len(clients)) + for k, v := range clients { + servers = append(servers, addrWithGroups{ + groups: v, + addr: k, + }) } - servers = partitionByReplicationSet(groups, servers) if len(servers) > 0 { // cache locality score (higher is better): // `% keyspace / % instances`. Ideally converges to 1 (querying x% of keyspace requires x% of instances), @@ -258,23 +231,20 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t // overlap on instances to the left and right of the range. firstFp, lastFp := groups[0].Fingerprint, groups[len(groups)-1].Fingerprint pctKeyspace := float64(lastFp-firstFp) / float64(math.MaxUint64) - pctInstances := float64(len(servers)) / float64(len(rs.Instances)) + pctInstances := float64(len(servers)) / float64(max(1, len(c.pool.Addrs()))) cacheLocalityScore := pctKeyspace / pctInstances c.metrics.cacheLocalityScore.Observe(cacheLocalityScore) } results := make([][]*logproto.GroupedChunkRefs, len(servers)) count := 0 - err = concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { + err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { rs := servers[i] - // randomize order of addresses so we don't hotspot the first server in the list - addrs := shuffleAddrs(rs.rs.GetAddresses()) level.Info(c.logger).Log( "msg", "do FilterChunkRefs for addresses", "progress", fmt.Sprintf("%d/%d", i+1, len(servers)), - "bounds", JoinFunc(rs.ranges, ",", func(e v1.FingerprintBounds) string { return e.String() }), - "addrs", strings.Join(addrs, ","), + "addr", rs.addr, "from", from.Time(), "through", through.Time(), "num_refs", len(rs.groups), @@ -282,7 +252,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t "plan_hash", plan.Hash(), ) - return c.doForAddrs(addrs, func(client logproto.BloomGatewayClient) error { + return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error { req := &logproto.FilterChunkRefRequest{ From: from, Through: through, @@ -316,7 +286,6 @@ func flatten(input [][]*logproto.GroupedChunkRefs, n int) []*logproto.GroupedChu // doForAddrs sequetially calls the provided callback function fn for each // address in given slice addrs until the callback function does not return an // error. -// TODO(owen-d): parallelism func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGatewayClient) error) error { var err error var poolClient ringclient.PoolClient @@ -337,116 +306,7 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway return err } -func mapTokenRangeToFingerprintRange(r bloomutils.Range[uint32]) v1.FingerprintBounds { - minFp := uint64(r.Min) << 32 - maxFp := uint64(r.Max) << 32 - return v1.NewBounds( - model.Fingerprint(minFp), - model.Fingerprint(maxFp|math.MaxUint32), - ) -} - -type rsWithRanges struct { - rs ring.ReplicationSet - ranges []v1.FingerprintBounds +type addrWithGroups struct { + addr string groups []*logproto.GroupedChunkRefs } - -func replicationSetsWithBounds(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]rsWithRanges, error) { - bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() - - servers := make([]rsWithRanges, 0, len(instances)) - for _, inst := range instances { - tr, err := bloomutils.TokenRangesForInstance(inst.Id, instances) - if err != nil { - return nil, errors.Wrap(err, "bloom gateway get ring") - } - - if len(tr) == 0 { - level.Warn(util_log.Logger).Log( - "subroutine", "replicationSetsWithBounds", - "msg", "instance has no token ranges - should not be possible", - "instance", inst.Id, - "n_instances", len(instances), - ) - continue - } - - // NB(owen-d): this will send requests to the wrong nodes if RF>1 since it only checks the - // first token when assigning replicasets - rs, err := subRing.Get(tr[0], BlocksOwnerRead, bufDescs, bufHosts, bufZones) - if err != nil { - return nil, errors.Wrap(err, "bloom gateway get ring") - } - - bounds := make([]v1.FingerprintBounds, 0, len(tr)/2) - for i := 0; i < len(tr); i += 2 { - b := v1.NewBounds( - model.Fingerprint(uint64(tr[i])<<32), - model.Fingerprint(uint64(tr[i+1])<<32|math.MaxUint32), - ) - bounds = append(bounds, b) - } - - servers = append(servers, rsWithRanges{ - rs: rs, - ranges: bounds, - }) - } - return servers, nil -} - -func partitionByReplicationSet(fingerprints []*logproto.GroupedChunkRefs, rs []rsWithRanges) (result []rsWithRanges) { - for _, inst := range rs { - for _, bounds := range inst.ranges { - min, _ := slices.BinarySearchFunc(fingerprints, bounds, func(g *logproto.GroupedChunkRefs, b v1.FingerprintBounds) int { - if g.Fingerprint < uint64(b.Min) { - return -1 - } else if g.Fingerprint > uint64(b.Min) { - return 1 - } - return 0 - }) - - max, _ := slices.BinarySearchFunc(fingerprints, bounds, func(g *logproto.GroupedChunkRefs, b v1.FingerprintBounds) int { - if g.Fingerprint <= uint64(b.Max) { - return -1 - } else if g.Fingerprint > uint64(b.Max) { - return 1 - } - return 0 - }) - - // fingerprint is out of boundaries - if min == len(fingerprints) || max == 0 { - continue - } - - inst.groups = append(inst.groups, fingerprints[min:max]...) - } - - if len(inst.groups) > 0 { - result = append(result, inst) - } - } - - return result -} - -// GetShuffleShardingSubring returns the subring to be used for a given user. -// This function should be used both by index gateway servers and clients in -// order to guarantee the same logic is used. -func GetShuffleShardingSubring(ring ring.ReadRing, tenantID string, limits Limits) ring.ReadRing { - shardSize := limits.BloomGatewayShardSize(tenantID) - - // A shard size of 0 means shuffle sharding is disabled for this specific user, - // so we just return the full ring so that indexes will be sharded across all index gateways. - // Since we set the shard size to replication factor if shard size is 0, this - // can only happen if both the shard size and the replication factor are set - // to 0. - if shardSize <= 0 { - return ring - } - - return ring.ShuffleShard(tenantID, shardSize) -} diff --git a/pkg/bloomgateway/client_pool.go b/pkg/bloomgateway/client_pool.go new file mode 100644 index 0000000000000..dc96c33e53c56 --- /dev/null +++ b/pkg/bloomgateway/client_pool.go @@ -0,0 +1,106 @@ +package bloomgateway + +import ( + "context" + "flag" + "sort" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/ring/client" + "github.com/grafana/dskit/services" + + "github.com/grafana/loki/v3/pkg/util/jumphash" +) + +// PoolConfig is config for creating a Pool. +// It has the same fields as "github.com/grafana/dskit/ring/client.PoolConfig" so it can be cast. +type PoolConfig struct { + CheckInterval time.Duration `yaml:"check_interval"` + HealthCheckEnabled bool `yaml:"enable_health_check"` + HealthCheckTimeout time.Duration `yaml:"health_check_timeout"` + MaxConcurrentHealthChecks int `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.CheckInterval, prefix+"check-interval", 10*time.Second, "How frequently to clean up clients for servers that have gone away or are unhealthy.") + f.BoolVar(&cfg.HealthCheckEnabled, prefix+"enable-health-check", true, "Run a health check on each server during periodic cleanup.") + f.DurationVar(&cfg.HealthCheckTimeout, prefix+"health-check-timeout", 1*time.Second, "Timeout for the health check if health check is enabled.") +} + +func (cfg *PoolConfig) Validate() error { + return nil +} + +type JumpHashClientPool struct { + *client.Pool + *jumphash.Selector + + done chan struct{} + logger log.Logger +} + +type AddressProvider interface { + Addresses() []string +} + +func NewJumpHashClientPool(pool *client.Pool, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) *JumpHashClientPool { + selector := jumphash.DefaultSelector() + err := selector.SetServers(dnsProvider.Addresses()...) + if err != nil { + level.Warn(logger).Log("msg", "error updating servers", "err", err) + } + + p := &JumpHashClientPool{ + Pool: pool, + Selector: selector, + done: make(chan struct{}), + logger: logger, + } + go p.updateLoop(dnsProvider, updateInterval) + + return p +} + +func (p *JumpHashClientPool) AddrForFingerprint(fp uint64) (string, error) { + addr, err := p.FromUInt64(fp) + if err != nil { + return "", err + } + return addr.String(), nil +} + +func (p *JumpHashClientPool) Start() { + ctx := context.Background() + _ = services.StartAndAwaitRunning(ctx, p.Pool) +} + +func (p *JumpHashClientPool) Stop() { + ctx := context.Background() + _ = services.StopAndAwaitTerminated(ctx, p.Pool) + close(p.done) +} + +func (p *JumpHashClientPool) updateLoop(provider AddressProvider, updateInterval time.Duration) { + ticker := time.NewTicker(updateInterval) + defer ticker.Stop() + + for { + select { + case <-p.done: + return + case <-ticker.C: + servers := provider.Addresses() + // ServerList deterministically maps keys to _index_ of the server list. + // Since DNS returns records in different order each time, we sort to + // guarantee best possible match between nodes. + sort.Strings(servers) + err := p.SetServers(servers...) + if err != nil { + level.Warn(p.logger).Log("msg", "error updating servers", "err", err) + } + } + } +} diff --git a/pkg/bloomgateway/client_pool_test.go b/pkg/bloomgateway/client_pool_test.go new file mode 100644 index 0000000000000..af0ca116eb824 --- /dev/null +++ b/pkg/bloomgateway/client_pool_test.go @@ -0,0 +1,35 @@ +package bloomgateway + +import ( + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" +) + +type provider struct { + addresses []string +} + +func (p *provider) Addresses() []string { + return p.addresses +} + +func TestJumpHashClientPool_UpdateLoop(t *testing.T) { + interval := 100 * time.Millisecond + + provider := &provider{[]string{"localhost:9095"}} + pool := NewJumpHashClientPool(nil, provider, interval, log.NewNopLogger()) + require.Len(t, pool.Addrs(), 1) + require.Equal(t, "127.0.0.1:9095", pool.Addrs()[0].String()) + + // update address list + provider.addresses = []string{"localhost:9095", "localhost:9096"} + // wait refresh interval + time.Sleep(2 * interval) + // pool has been updated + require.Len(t, pool.Addrs(), 2) + require.Equal(t, "127.0.0.1:9095", pool.Addrs()[0].String()) + require.Equal(t, "127.0.0.1:9096", pool.Addrs()[1].String()) +} diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index 5e2484af29bf9..a9ded6f45f7d0 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -2,35 +2,19 @@ package bloomgateway import ( "context" - "fmt" - "math" "testing" - "time" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/ring" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/bloomutils" - "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/querier/plan" - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/validation" ) -func rs(id int, tokens ...uint32) ring.ReplicationSet { - inst := ring.InstanceDesc{ - Id: fmt.Sprintf("instance-%d", id), - Addr: fmt.Sprintf("10.0.0.%d", id), - Tokens: tokens, - } - return ring.ReplicationSet{Instances: []ring.InstanceDesc{inst}} -} - func TestBloomGatewayClient(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -42,7 +26,7 @@ func TestBloomGatewayClient(t *testing.T) { flagext.DefaultValues(&cfg) t.Run("FilterChunks returns response", func(t *testing.T) { - c, err := NewClient(cfg, &mockRing{}, l, reg, logger, "loki", nil, false) + c, err := NewClient(cfg, l, reg, logger, nil, false) require.NoError(t, err) expr, err := syntax.ParseExpr(`{foo="bar"}`) require.NoError(t, err) @@ -51,283 +35,3 @@ func TestBloomGatewayClient(t *testing.T) { require.Equal(t, 0, len(res)) }) } - -func TestBloomGatewayClient_ReplicationSetsWithBounds(t *testing.T) { - testCases := map[string]struct { - instances []ring.InstanceDesc - expected []rsWithRanges - }{ - "single instance covers full range": { - instances: []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{(1 << 31)}}, // 0x80000000 - }, - expected: []rsWithRanges{ - {rs: rs(1, (1 << 31)), ranges: []v1.FingerprintBounds{ - v1.NewBounds(0, math.MaxUint64), - }}, - }, - }, - "one token per instance": { - instances: []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{(1 << 30) * 1}}, // 0x40000000 - {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{(1 << 30) * 2}}, // 0x80000000 - {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{(1 << 30) * 3}}, // 0xc0000000 - }, - expected: []rsWithRanges{ - {rs: rs(1, (1<<30)*1), ranges: []v1.FingerprintBounds{ - v1.NewBounds(0, 4611686018427387903), - v1.NewBounds(13835058055282163712, 18446744073709551615), - }}, - {rs: rs(2, (1<<30)*2), ranges: []v1.FingerprintBounds{ - v1.NewBounds(4611686018427387904, 9223372036854775807), - }}, - {rs: rs(3, (1<<30)*3), ranges: []v1.FingerprintBounds{ - v1.NewBounds(9223372036854775808, 13835058055282163711), - }}, - }, - }, - "extreme tokens in ring": { - instances: []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0}}, - {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32}}, - }, - expected: []rsWithRanges{ - {rs: rs(1, 0), ranges: []v1.FingerprintBounds{ - v1.NewBounds(math.MaxUint64-math.MaxUint32, math.MaxUint64), - }}, - {rs: rs(2, math.MaxUint32), ranges: []v1.FingerprintBounds{ - v1.NewBounds(0, math.MaxUint64-math.MaxUint32-1), - }}, - }, - }, - } - - for name, tc := range testCases { - tc := tc - t.Run(name, func(t *testing.T) { - subRing := newMockRing(t, tc.instances) - res, err := replicationSetsWithBounds(subRing, tc.instances) - require.NoError(t, err) - require.Equal(t, tc.expected, res) - }) - } -} - -func TestBloomGatewayClient_PartitionByReplicationSet(t *testing.T) { - // Create 10 fingerprints [0, 2, 4, ... 18] - groups := make([]*logproto.GroupedChunkRefs, 0, 10) - for i := 0; i < 20; i += 2 { - groups = append(groups, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) - } - - // instance token ranges do not overlap - t.Run("non-overlapping", func(t *testing.T) { - - servers := []rsWithRanges{ - {rs: rs(1), ranges: []v1.FingerprintBounds{v1.NewBounds(0, 4)}}, - {rs: rs(2), ranges: []v1.FingerprintBounds{v1.NewBounds(5, 9), v1.NewBounds(15, 19)}}, - {rs: rs(3), ranges: []v1.FingerprintBounds{v1.NewBounds(10, 14)}}, - } - - // partition fingerprints - - expected := [][]*logproto.GroupedChunkRefs{ - { - {Fingerprint: 0}, - {Fingerprint: 2}, - {Fingerprint: 4}, - }, - { - {Fingerprint: 6}, - {Fingerprint: 8}, - {Fingerprint: 16}, - {Fingerprint: 18}, - }, - { - {Fingerprint: 10}, - {Fingerprint: 12}, - {Fingerprint: 14}, - }, - } - - partitioned := partitionByReplicationSet(groups, servers) - for i := range partitioned { - require.Equal(t, expected[i], partitioned[i].groups) - } - }) - - // instance token ranges overlap -- this should not happen in a real ring, though - t.Run("overlapping", func(t *testing.T) { - servers := []rsWithRanges{ - {rs: rs(1), ranges: []v1.FingerprintBounds{v1.NewBounds(0, 9)}}, - {rs: rs(2), ranges: []v1.FingerprintBounds{v1.NewBounds(5, 14)}}, - {rs: rs(3), ranges: []v1.FingerprintBounds{v1.NewBounds(10, 19)}}, - } - - // partition fingerprints - - expected := [][]*logproto.GroupedChunkRefs{ - { - {Fingerprint: 0}, - {Fingerprint: 2}, - {Fingerprint: 4}, - {Fingerprint: 6}, - {Fingerprint: 8}, - }, - { - {Fingerprint: 6}, - {Fingerprint: 8}, - {Fingerprint: 10}, - {Fingerprint: 12}, - {Fingerprint: 14}, - }, - { - {Fingerprint: 10}, - {Fingerprint: 12}, - {Fingerprint: 14}, - {Fingerprint: 16}, - {Fingerprint: 18}, - }, - } - - partitioned := partitionByReplicationSet(groups, servers) - for i := range partitioned { - require.Equal(t, expected[i], partitioned[i].groups) - } - }) -} - -func BenchmarkPartitionFingerprintsByAddresses(b *testing.B) { - numFp := 100000 - fpStep := math.MaxUint64 / uint64(numFp) - - groups := make([]*logproto.GroupedChunkRefs, 0, numFp) - for i := uint64(0); i < math.MaxUint64-fpStep; i += fpStep { - groups = append(groups, &logproto.GroupedChunkRefs{Fingerprint: i}) - } - - numServers := 100 - tokenStep := math.MaxUint32 / uint32(numServers) - servers := make([]rsWithRanges, 0, numServers) - for i := uint32(0); i < math.MaxUint32-tokenStep; i += tokenStep { - servers = append(servers, rsWithRanges{ - rs: rs(int(i)), - ranges: []v1.FingerprintBounds{ - v1.NewBounds(model.Fingerprint(i)<<32, model.Fingerprint(i+tokenStep)<<32), - }, - }) - } - - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - _ = partitionByReplicationSet(groups, servers) - } -} - -func TestBloomGatewayClient_MapTokenRangeToFingerprintRange(t *testing.T) { - testCases := map[string]struct { - lshift int - inp bloomutils.Range[uint32] - exp v1.FingerprintBounds - }{ - "single token expands to multiple fingerprints": { - inp: bloomutils.NewTokenRange(0, 0), - exp: v1.NewBounds(0, 0xffffffff), - }, - "max value expands to max value of new range": { - inp: bloomutils.NewTokenRange((1 << 31), math.MaxUint32), - exp: v1.NewBounds((1 << 63), 0xffffffffffffffff), - }, - } - for desc, tc := range testCases { - t.Run(desc, func(t *testing.T) { - actual := mapTokenRangeToFingerprintRange(tc.inp) - require.Equal(t, tc.exp, actual) - }) - } -} - -// make sure mockRing implements the ring.ReadRing interface -var _ ring.ReadRing = &mockRing{} - -func newMockRing(t *testing.T, instances []ring.InstanceDesc) *mockRing { - ranges := make([]ring.TokenRanges, 0) - for i := range instances { - tr, err := bloomutils.TokenRangesForInstance(instances[i].Id, instances) - if err != nil { - t.Fatal(err) - } - ranges = append(ranges, tr) - } - return &mockRing{ - instances: instances, - ranges: ranges, - } -} - -type mockRing struct { - instances []ring.InstanceDesc - ranges []ring.TokenRanges -} - -// Get implements ring.ReadRing. -func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (rs ring.ReplicationSet, err error) { - for i := range r.ranges { - if r.ranges[i].IncludesKey(key) { - rs.Instances = append(rs.Instances, r.instances[i]) - } - } - return -} - -// GetAllHealthy implements ring.ReadRing. -func (r *mockRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) { - return ring.ReplicationSet{ - Instances: r.instances, - }, nil -} - -// GetInstanceState implements ring.ReadRing. -func (*mockRing) GetInstanceState(_ string) (ring.InstanceState, error) { - panic("unimplemented") -} - -// GetReplicationSetForOperation implements ring.ReadRing. -func (*mockRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) { - panic("unimplemented") -} - -// HasInstance implements ring.ReadRing. -func (*mockRing) HasInstance(_ string) bool { - panic("unimplemented") -} - -// InstancesCount implements ring.ReadRing. -func (r *mockRing) InstancesCount() int { - return len(r.instances) -} - -// ReplicationFactor implements ring.ReadRing. -func (*mockRing) ReplicationFactor() int { - return 1 -} - -// ShuffleShard implements ring.ReadRing. -func (r *mockRing) ShuffleShard(_ string, _ int) ring.ReadRing { - return r -} - -// ShuffleShardWithLookback implements ring.ReadRing. -func (*mockRing) ShuffleShardWithLookback(_ string, _ int, _ time.Duration, _ time.Time) ring.ReadRing { - panic("unimplemented") -} - -// CleanupShuffleShardCache implements ring.ReadRing. -func (*mockRing) CleanupShuffleShardCache(_ string) { - panic("unimplemented") -} - -func (r *mockRing) GetTokenRangesForInstance(id string) (ring.TokenRanges, error) { - return bloomutils.TokenRangesForInstance(id, r.instances) -} diff --git a/pkg/bloomgateway/metrics.go b/pkg/bloomgateway/metrics.go index 34e1960fa45ee..0d408991b40c2 100644 --- a/pkg/bloomgateway/metrics.go +++ b/pkg/bloomgateway/metrics.go @@ -3,6 +3,7 @@ package bloomgateway import ( "time" + "github.com/grafana/dskit/instrument" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -16,6 +17,8 @@ type metrics struct { type clientMetrics struct { cacheLocalityScore prometheus.Histogram + requestLatency *prometheus.HistogramVec + clients prometheus.Gauge } func newClientMetrics(registerer prometheus.Registerer) *clientMetrics { @@ -27,6 +30,19 @@ func newClientMetrics(registerer prometheus.Registerer) *clientMetrics { Help: "Cache locality score of the bloom filter, as measured by % of keyspace touched / % of bloom_gws required", Buckets: prometheus.LinearBuckets(0.01, 0.2, 5), }), + requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "bloom_gateway_client", + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using the bloom gateway", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + clients: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Namespace: constants.Loki, + Subsystem: "bloom_gateway", + Name: "clients", + Help: "The current number of bloom gateway clients.", + }), } } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 946f434637b21..5d64b947437c3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1451,11 +1451,9 @@ func (t *Loki) initIndexGateway() (services.Service, error) { if t.Cfg.BloomGateway.Enabled { bloomGatewayClient, err := bloomgateway.NewClient( t.Cfg.BloomGateway.Client, - t.bloomGatewayRingManager.Ring, t.Overrides, prometheus.DefaultRegisterer, logger, - t.Cfg.MetricsNamespace, t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled, ) diff --git a/pkg/storage/chunk/cache/memcached_client.go b/pkg/storage/chunk/cache/memcached_client.go index d6df538342faf..995e896fbcfee 100644 --- a/pkg/storage/chunk/cache/memcached_client.go +++ b/pkg/storage/chunk/cache/memcached_client.go @@ -22,6 +22,7 @@ import ( "github.com/sony/gobreaker" "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/grafana/loki/v3/pkg/util/jumphash" ) // MemcachedClient interface exists for mocking memcacheClient. @@ -113,7 +114,7 @@ func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description st func NewMemcachedClient(cfg MemcachedClientConfig, name string, r prometheus.Registerer, logger log.Logger, metricsNamespace string) MemcachedClient { var selector serverSelector if cfg.ConsistentHash { - selector = DefaultMemcachedJumpHashSelector() + selector = jumphash.DefaultSelector() } else { selector = &memcache.ServerList{} } diff --git a/pkg/util/discovery/dns.go b/pkg/util/discovery/dns.go index e85479ced000d..e076f39e92d8a 100644 --- a/pkg/util/discovery/dns.go +++ b/pkg/util/discovery/dns.go @@ -3,6 +3,7 @@ package discovery import ( "context" "fmt" + "strings" "sync" "time" @@ -15,7 +16,7 @@ import ( type DNS struct { logger log.Logger cleanupPeriod time.Duration - address string + addresses []string stop chan struct{} done sync.WaitGroup once sync.Once @@ -27,7 +28,7 @@ func NewDNS(logger log.Logger, cleanupPeriod time.Duration, address string, reg d := &DNS{ logger: logger, cleanupPeriod: cleanupPeriod, - address: address, + addresses: strings.Split(address, ","), stop: make(chan struct{}), done: sync.WaitGroup{}, dnsProvider: dnsProvider, @@ -70,10 +71,10 @@ func (d *DNS) discoveryLoop() { } func (d *DNS) runDiscovery() { - ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, fmt.Errorf("DNS lookup timeout: %s", d.address)) + ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, fmt.Errorf("DNS lookup timeout: %v", d.addresses)) defer cancel() - err := d.dnsProvider.Resolve(ctx, []string{d.address}) + err := d.dnsProvider.Resolve(ctx, d.addresses) if err != nil { - level.Error(d.logger).Log("msg", "failed to resolve index gateway address", "err", err) + level.Error(d.logger).Log("msg", "failed to resolve server addresses", "err", err) } } diff --git a/pkg/storage/chunk/cache/memcached_client_selector.go b/pkg/util/jumphash/memcached_client_selector.go similarity index 80% rename from pkg/storage/chunk/cache/memcached_client_selector.go rename to pkg/util/jumphash/memcached_client_selector.go index 8c8d49e2ba3af..ccec90fa0dda2 100644 --- a/pkg/storage/chunk/cache/memcached_client_selector.go +++ b/pkg/util/jumphash/memcached_client_selector.go @@ -1,4 +1,4 @@ -package cache +package jumphash import ( "net" @@ -13,16 +13,16 @@ import ( util_log "github.com/grafana/loki/v3/pkg/util/log" ) -// MemcachedJumpHashSelector implements the memcache.ServerSelector -// interface. MemcachedJumpHashSelector utilizes a jump hash to +// Selector implements the memcache.ServerSelector +// interface. Selector utilizes a jump hash to // distribute keys to servers. // // While adding or removing servers only requires 1/N keys to move, // servers are treated as a stack and can only be pushed/popped. -// Therefore, MemcachedJumpHashSelector works best for servers +// Therefore, Selector works best for servers // with consistent DNS names where the naturally sorted order // is predictable. -type MemcachedJumpHashSelector struct { +type Selector struct { mu sync.RWMutex addrs []net.Addr resolveUnixAddr UnixResolver @@ -33,15 +33,15 @@ type UnixResolver func(network, address string) (*net.UnixAddr, error) type TCPResolver func(network, address string) (*net.TCPAddr, error) -func NewMemcachedJumpHashSelector(resolveUnixAddr UnixResolver, resolveTCPAddr TCPResolver) *MemcachedJumpHashSelector { - return &MemcachedJumpHashSelector{ +func NewSelector(resolveUnixAddr UnixResolver, resolveTCPAddr TCPResolver) *Selector { + return &Selector{ resolveUnixAddr: resolveUnixAddr, resolveTCPAddr: resolveTCPAddr, } } -func DefaultMemcachedJumpHashSelector() *MemcachedJumpHashSelector { - return &MemcachedJumpHashSelector{ +func DefaultSelector() *Selector { + return &Selector{ resolveUnixAddr: net.ResolveUnixAddr, resolveTCPAddr: net.ResolveTCPAddr, } @@ -78,7 +78,7 @@ func (a *staticAddr) String() string { return a.str } // To minimize the number of rehashes for keys when scaling the // number of servers in subsequent calls to SetServers, servers // are stored in natural sort order. -func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error { +func (s *Selector) SetServers(servers ...string) error { sortedServers := make([]string, len(servers)) copy(sortedServers, servers) natsort.Sort(sortedServers) @@ -138,7 +138,11 @@ func jumpHash(key uint64, numBuckets int) int32 { // PickServer returns the server address that a given item // should be shared onto. -func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { +func (s *Selector) PickServer(key string) (net.Addr, error) { + return s.FromString(key) +} + +func (s *Selector) FromString(key string) (net.Addr, error) { s.mu.RLock() defer s.mu.RUnlock() if len(s.addrs) == 0 { @@ -151,10 +155,22 @@ func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { return s.addrs[idx], nil } +func (s *Selector) FromUInt64(key uint64) (net.Addr, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if len(s.addrs) == 0 { + return nil, memcache.ErrNoServers + } else if len(s.addrs) == 1 { + return s.addrs[0], nil + } + idx := jumpHash(key, len(s.addrs)) + return s.addrs[idx], nil +} + // Each iterates over each server and calls the given function. // If f returns a non-nil error, iteration will stop and that // error will be returned. -func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { +func (s *Selector) Each(f func(net.Addr) error) error { s.mu.RLock() defer s.mu.RUnlock() for _, def := range s.addrs { @@ -164,3 +180,9 @@ func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { } return nil } + +func (s *Selector) Addrs() []net.Addr { + s.mu.RLock() + defer s.mu.RUnlock() + return s.addrs +} diff --git a/pkg/storage/chunk/cache/memcached_client_selector_test.go b/pkg/util/jumphash/memcached_client_selector_test.go similarity index 93% rename from pkg/storage/chunk/cache/memcached_client_selector_test.go rename to pkg/util/jumphash/memcached_client_selector_test.go index cec908876b1bb..0708f06d763e4 100644 --- a/pkg/storage/chunk/cache/memcached_client_selector_test.go +++ b/pkg/util/jumphash/memcached_client_selector_test.go @@ -1,4 +1,4 @@ -package cache_test +package jumphash import ( "fmt" @@ -8,8 +8,6 @@ import ( "github.com/facette/natsort" "github.com/grafana/gomemcache/memcache" "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/storage/chunk/cache" ) func TestNatSort(t *testing.T) { @@ -58,7 +56,7 @@ var mockTCPResolver = func(network, address string) (*net.TCPAddr, error) { } func TestMemcachedJumpHashSelector_PickSever(t *testing.T) { - s := cache.NewMemcachedJumpHashSelector( + s := NewSelector( mockUnixResolver, mockTCPResolver, ) @@ -85,7 +83,7 @@ func TestMemcachedJumpHashSelector_PickSever(t *testing.T) { } func TestMemcachedJumpHashSelector_PickSever_ErrNoServers(t *testing.T) { - s := cache.NewMemcachedJumpHashSelector( + s := NewSelector( mockUnixResolver, mockTCPResolver, )