diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index d3265e86445fe..2837647cff97f 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2089,11 +2089,18 @@ ring: client: # Configures the behavior of the connection pool. pool_config: - [client_cleanup_period: ] + # How frequently to clean up clients for servers that have gone away or are + # unhealthy. + # CLI flag: -bloom-gateway-client.pool.check-interval + [check_interval: | default = 10s] - [health_check_ingesters: ] + # Run a health check on each server during periodic cleanup. + # CLI flag: -bloom-gateway-client.pool.enable-health-check + [enable_health_check: | default = true] - [remote_timeout: ] + # Timeout for the health check if health check is enabled. + # CLI flag: -bloom-gateway-client.pool.health-check-timeout + [health_check_timeout: | default = 1s] # The grpc_client block configures the gRPC client used to communicate between # two Loki components. @@ -2116,6 +2123,11 @@ client: # CLI flag: -bloom-gateway-client.cache_results [cache_results: | default = false] + # Comma separated addresses list in DNS Service Discovery format: + # https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes + # CLI flag: -bloom-gateway-client.addresses + [addresses: | default = ""] + # Number of workers to use for filtering chunks concurrently. # CLI flag: -bloom-gateway.worker-concurrency [worker_concurrency: | default = 4] diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 5d2af03403d45..53a57b2a51d86 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -6,70 +6,33 @@ import ( "fmt" "io" "math" - "math/rand" - "strings" - "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/grpcclient" - "github.com/grafana/dskit/instrument" - "github.com/grafana/dskit/ring" ringclient "github.com/grafana/dskit/ring/client" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" - "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/grafana/loki/v3/pkg/bloomutils" - "github.com/grafana/loki/v3/pkg/distributor/clientpool" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/queue" - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" "github.com/grafana/loki/v3/pkg/util/constants" - util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/discovery" ) var ( - // BlocksOwnerRead is the operation used to check the authoritative owners of a block - // (replicas included) that are available for queries (a bloom gateway is available for - // queries only when ACTIVE). - BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) // groupedChunksRefPool pooling slice of logproto.GroupedChunkRefs [64, 128, 256, ..., 65536] groupedChunksRefPool = queue.NewSlicePool[*logproto.GroupedChunkRefs](1<<6, 1<<16, 2) - // ringGetBuffersPool pooling for ringGetBuffers to avoid calling ring.MakeBuffersForGet() for each request - ringGetBuffersPool = sync.Pool{ - New: func() interface{} { - descs, hosts, zones := ring.MakeBuffersForGet() - return &ringGetBuffers{ - Descs: descs, - Hosts: hosts, - Zones: zones, - } - }, - } ) -type ringGetBuffers struct { - Descs []ring.InstanceDesc - Hosts []string - Zones []string -} - -func (buf *ringGetBuffers) Reset() { - buf.Descs = buf.Descs[:0] - buf.Hosts = buf.Hosts[:0] - buf.Zones = buf.Zones[:0] -} - // GRPCPool represents a pool of gRPC connections to different bloom gateway instances. // Interfaces are inlined for simplicity to automatically satisfy interface functions. type GRPCPool struct { @@ -98,19 +61,17 @@ func NewBloomGatewayGRPCPool(address string, opts []grpc.DialOption) (*GRPCPool, type ClientConfig struct { // PoolConfig defines the behavior of the gRPC connection pool used to communicate // with the Bloom Gateway. - // It is defined at the distributors YAML section and reused here. - PoolConfig clientpool.PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures the behavior of the connection pool."` + PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures the behavior of the connection pool."` // GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server. GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` - // Ring is the Bloom Gateway ring used to find the appropriate Bloom Gateway instance - // this client should talk to. - Ring ring.ReadRing `yaml:"-"` - // Cache configures the cache used to store the results of the Bloom Gateway server. Cache CacheConfig `yaml:"results_cache,omitempty"` CacheResults bool `yaml:"cache_results"` + + // Client sharding using DNS disvovery and jumphash + Addresses string `yaml:"addresses,omitempty"` } // RegisterFlags registers flags for the Bloom Gateway client configuration. @@ -122,7 +83,9 @@ func (i *ClientConfig) RegisterFlags(f *flag.FlagSet) { func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc", f) i.Cache.RegisterFlagsWithPrefix(prefix+"cache.", f) + i.PoolConfig.RegisterFlagsWithPrefix(prefix+"pool.", f) f.BoolVar(&i.CacheResults, prefix+"cache_results", false, "Flag to control whether to cache bloom gateway client requests/responses.") + f.StringVar(&i.Addresses, prefix+"addresses", "", "Comma separated addresses list in DNS Service Discovery format: https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes") } func (i *ClientConfig) Validate() error { @@ -130,12 +93,20 @@ func (i *ClientConfig) Validate() error { return errors.Wrap(err, "grpc client config") } + if err := i.PoolConfig.Validate(); err != nil { + return errors.Wrap(err, "pool config") + } + if i.CacheResults { if err := i.Cache.Validate(); err != nil { return errors.Wrap(err, "cache config") } } + if i.Addresses == "" { + return errors.New("addresses requires a list of comma separated strings in DNS service discovery format with at least one item") + } + return nil } @@ -144,33 +115,25 @@ type Client interface { } type GatewayClient struct { - cfg ClientConfig - limits Limits - logger log.Logger - metrics *clientMetrics - pool *ringclient.Pool - ring ring.ReadRing + cfg ClientConfig + limits Limits + logger log.Logger + metrics *clientMetrics + pool *JumpHashClientPool + dnsProvider *discovery.DNS } func NewClient( cfg ClientConfig, - readRing ring.ReadRing, limits Limits, registerer prometheus.Registerer, logger log.Logger, - metricsNamespace string, cacheGen resultscache.CacheGenNumberLoader, retentionEnabled bool, ) (*GatewayClient, error) { - latency := promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ - Namespace: constants.Loki, - Subsystem: "bloom_gateway", - Name: "request_duration_seconds", - Help: "Time (in seconds) spent serving requests when using the bloom gateway", - Buckets: instrument.DefBuckets, - }, []string{"operation", "status_code"}) - - dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(latency)) + metrics := newClientMetrics(registerer) + + dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(metrics.requestLatency)) if err != nil { return nil, err } @@ -206,29 +169,35 @@ func NewClient( return pool, nil } + dnsProvider := discovery.NewDNS(logger, cfg.PoolConfig.CheckInterval, cfg.Addresses, nil) + // Make an attempt to do one DNS lookup so we can start with addresses + dnsProvider.RunOnce() + + clientPool := ringclient.NewPool( + "bloom-gateway", + ringclient.PoolConfig(cfg.PoolConfig), + func() ([]string, error) { return dnsProvider.Addresses(), nil }, + ringclient.PoolAddrFunc(poolFactory), + metrics.clients, + logger, + ) + + pool := NewJumpHashClientPool(clientPool, dnsProvider, cfg.PoolConfig.CheckInterval, logger) + pool.Start() + return &GatewayClient{ - cfg: cfg, - logger: logger, - limits: limits, - metrics: newClientMetrics(registerer), - pool: clientpool.NewPool("bloom-gateway", cfg.PoolConfig, cfg.Ring, ringclient.PoolAddrFunc(poolFactory), logger, metricsNamespace), - ring: readRing, + cfg: cfg, + logger: logger, + limits: limits, + metrics: metrics, + pool: pool, + dnsProvider: dnsProvider, // keep reference so we can stop it when the client is closed }, nil } -func JoinFunc[S ~[]E, E any](elems S, sep string, f func(e E) string) string { - res := make([]string, len(elems)) - for i := range elems { - res[i] = f(elems[i]) - } - return strings.Join(res, sep) -} - -func shuffleAddrs(addrs []string) []string { - rand.Shuffle(len(addrs), func(i, j int) { - addrs[i], addrs[j] = addrs[j], addrs[i] - }) - return addrs +func (c *GatewayClient) Close() { + c.pool.Stop() + c.dnsProvider.Stop() } // FilterChunkRefs implements Client @@ -237,19 +206,23 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t return groups, nil } - subRing := GetShuffleShardingSubring(c.ring, tenant, c.limits) - rs, err := subRing.GetAllHealthy(BlocksOwnerRead) - if err != nil { - return nil, errors.Wrap(err, "bloom gateway get healthy instances") + clients := make(map[string][]*logproto.GroupedChunkRefs) + for _, g := range groups { + addr, err := c.pool.AddrForFingerprint(g.Fingerprint) + if err != nil { + return nil, errors.Wrap(err, "server address for fingerprint") + } + clients[addr] = append(clients[addr], g) } - servers, err := replicationSetsWithBounds(subRing, rs.Instances) - - if err != nil { - return nil, errors.Wrap(err, "bloom gateway get replication sets") + servers := make([]addrWithGroups, 0, len(clients)) + for k, v := range clients { + servers = append(servers, addrWithGroups{ + groups: v, + addr: k, + }) } - servers = partitionByReplicationSet(groups, servers) if len(servers) > 0 { // cache locality score (higher is better): // `% keyspace / % instances`. Ideally converges to 1 (querying x% of keyspace requires x% of instances), @@ -258,23 +231,20 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t // overlap on instances to the left and right of the range. firstFp, lastFp := groups[0].Fingerprint, groups[len(groups)-1].Fingerprint pctKeyspace := float64(lastFp-firstFp) / float64(math.MaxUint64) - pctInstances := float64(len(servers)) / float64(len(rs.Instances)) + pctInstances := float64(len(servers)) / float64(max(1, len(c.pool.Addrs()))) cacheLocalityScore := pctKeyspace / pctInstances c.metrics.cacheLocalityScore.Observe(cacheLocalityScore) } results := make([][]*logproto.GroupedChunkRefs, len(servers)) count := 0 - err = concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { + err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { rs := servers[i] - // randomize order of addresses so we don't hotspot the first server in the list - addrs := shuffleAddrs(rs.rs.GetAddresses()) level.Info(c.logger).Log( "msg", "do FilterChunkRefs for addresses", "progress", fmt.Sprintf("%d/%d", i+1, len(servers)), - "bounds", JoinFunc(rs.ranges, ",", func(e v1.FingerprintBounds) string { return e.String() }), - "addrs", strings.Join(addrs, ","), + "addr", rs.addr, "from", from.Time(), "through", through.Time(), "num_refs", len(rs.groups), @@ -282,7 +252,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t "plan_hash", plan.Hash(), ) - return c.doForAddrs(addrs, func(client logproto.BloomGatewayClient) error { + return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error { req := &logproto.FilterChunkRefRequest{ From: from, Through: through, @@ -316,7 +286,6 @@ func flatten(input [][]*logproto.GroupedChunkRefs, n int) []*logproto.GroupedChu // doForAddrs sequetially calls the provided callback function fn for each // address in given slice addrs until the callback function does not return an // error. -// TODO(owen-d): parallelism func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGatewayClient) error) error { var err error var poolClient ringclient.PoolClient @@ -337,116 +306,7 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway return err } -func mapTokenRangeToFingerprintRange(r bloomutils.Range[uint32]) v1.FingerprintBounds { - minFp := uint64(r.Min) << 32 - maxFp := uint64(r.Max) << 32 - return v1.NewBounds( - model.Fingerprint(minFp), - model.Fingerprint(maxFp|math.MaxUint32), - ) -} - -type rsWithRanges struct { - rs ring.ReplicationSet - ranges []v1.FingerprintBounds +type addrWithGroups struct { + addr string groups []*logproto.GroupedChunkRefs } - -func replicationSetsWithBounds(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]rsWithRanges, error) { - bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() - - servers := make([]rsWithRanges, 0, len(instances)) - for _, inst := range instances { - tr, err := bloomutils.TokenRangesForInstance(inst.Id, instances) - if err != nil { - return nil, errors.Wrap(err, "bloom gateway get ring") - } - - if len(tr) == 0 { - level.Warn(util_log.Logger).Log( - "subroutine", "replicationSetsWithBounds", - "msg", "instance has no token ranges - should not be possible", - "instance", inst.Id, - "n_instances", len(instances), - ) - continue - } - - // NB(owen-d): this will send requests to the wrong nodes if RF>1 since it only checks the - // first token when assigning replicasets - rs, err := subRing.Get(tr[0], BlocksOwnerRead, bufDescs, bufHosts, bufZones) - if err != nil { - return nil, errors.Wrap(err, "bloom gateway get ring") - } - - bounds := make([]v1.FingerprintBounds, 0, len(tr)/2) - for i := 0; i < len(tr); i += 2 { - b := v1.NewBounds( - model.Fingerprint(uint64(tr[i])<<32), - model.Fingerprint(uint64(tr[i+1])<<32|math.MaxUint32), - ) - bounds = append(bounds, b) - } - - servers = append(servers, rsWithRanges{ - rs: rs, - ranges: bounds, - }) - } - return servers, nil -} - -func partitionByReplicationSet(fingerprints []*logproto.GroupedChunkRefs, rs []rsWithRanges) (result []rsWithRanges) { - for _, inst := range rs { - for _, bounds := range inst.ranges { - min, _ := slices.BinarySearchFunc(fingerprints, bounds, func(g *logproto.GroupedChunkRefs, b v1.FingerprintBounds) int { - if g.Fingerprint < uint64(b.Min) { - return -1 - } else if g.Fingerprint > uint64(b.Min) { - return 1 - } - return 0 - }) - - max, _ := slices.BinarySearchFunc(fingerprints, bounds, func(g *logproto.GroupedChunkRefs, b v1.FingerprintBounds) int { - if g.Fingerprint <= uint64(b.Max) { - return -1 - } else if g.Fingerprint > uint64(b.Max) { - return 1 - } - return 0 - }) - - // fingerprint is out of boundaries - if min == len(fingerprints) || max == 0 { - continue - } - - inst.groups = append(inst.groups, fingerprints[min:max]...) - } - - if len(inst.groups) > 0 { - result = append(result, inst) - } - } - - return result -} - -// GetShuffleShardingSubring returns the subring to be used for a given user. -// This function should be used both by index gateway servers and clients in -// order to guarantee the same logic is used. -func GetShuffleShardingSubring(ring ring.ReadRing, tenantID string, limits Limits) ring.ReadRing { - shardSize := limits.BloomGatewayShardSize(tenantID) - - // A shard size of 0 means shuffle sharding is disabled for this specific user, - // so we just return the full ring so that indexes will be sharded across all index gateways. - // Since we set the shard size to replication factor if shard size is 0, this - // can only happen if both the shard size and the replication factor are set - // to 0. - if shardSize <= 0 { - return ring - } - - return ring.ShuffleShard(tenantID, shardSize) -} diff --git a/pkg/bloomgateway/client_pool.go b/pkg/bloomgateway/client_pool.go new file mode 100644 index 0000000000000..dc96c33e53c56 --- /dev/null +++ b/pkg/bloomgateway/client_pool.go @@ -0,0 +1,106 @@ +package bloomgateway + +import ( + "context" + "flag" + "sort" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/ring/client" + "github.com/grafana/dskit/services" + + "github.com/grafana/loki/v3/pkg/util/jumphash" +) + +// PoolConfig is config for creating a Pool. +// It has the same fields as "github.com/grafana/dskit/ring/client.PoolConfig" so it can be cast. +type PoolConfig struct { + CheckInterval time.Duration `yaml:"check_interval"` + HealthCheckEnabled bool `yaml:"enable_health_check"` + HealthCheckTimeout time.Duration `yaml:"health_check_timeout"` + MaxConcurrentHealthChecks int `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.CheckInterval, prefix+"check-interval", 10*time.Second, "How frequently to clean up clients for servers that have gone away or are unhealthy.") + f.BoolVar(&cfg.HealthCheckEnabled, prefix+"enable-health-check", true, "Run a health check on each server during periodic cleanup.") + f.DurationVar(&cfg.HealthCheckTimeout, prefix+"health-check-timeout", 1*time.Second, "Timeout for the health check if health check is enabled.") +} + +func (cfg *PoolConfig) Validate() error { + return nil +} + +type JumpHashClientPool struct { + *client.Pool + *jumphash.Selector + + done chan struct{} + logger log.Logger +} + +type AddressProvider interface { + Addresses() []string +} + +func NewJumpHashClientPool(pool *client.Pool, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) *JumpHashClientPool { + selector := jumphash.DefaultSelector() + err := selector.SetServers(dnsProvider.Addresses()...) + if err != nil { + level.Warn(logger).Log("msg", "error updating servers", "err", err) + } + + p := &JumpHashClientPool{ + Pool: pool, + Selector: selector, + done: make(chan struct{}), + logger: logger, + } + go p.updateLoop(dnsProvider, updateInterval) + + return p +} + +func (p *JumpHashClientPool) AddrForFingerprint(fp uint64) (string, error) { + addr, err := p.FromUInt64(fp) + if err != nil { + return "", err + } + return addr.String(), nil +} + +func (p *JumpHashClientPool) Start() { + ctx := context.Background() + _ = services.StartAndAwaitRunning(ctx, p.Pool) +} + +func (p *JumpHashClientPool) Stop() { + ctx := context.Background() + _ = services.StopAndAwaitTerminated(ctx, p.Pool) + close(p.done) +} + +func (p *JumpHashClientPool) updateLoop(provider AddressProvider, updateInterval time.Duration) { + ticker := time.NewTicker(updateInterval) + defer ticker.Stop() + + for { + select { + case <-p.done: + return + case <-ticker.C: + servers := provider.Addresses() + // ServerList deterministically maps keys to _index_ of the server list. + // Since DNS returns records in different order each time, we sort to + // guarantee best possible match between nodes. + sort.Strings(servers) + err := p.SetServers(servers...) + if err != nil { + level.Warn(p.logger).Log("msg", "error updating servers", "err", err) + } + } + } +} diff --git a/pkg/bloomgateway/client_pool_test.go b/pkg/bloomgateway/client_pool_test.go new file mode 100644 index 0000000000000..af0ca116eb824 --- /dev/null +++ b/pkg/bloomgateway/client_pool_test.go @@ -0,0 +1,35 @@ +package bloomgateway + +import ( + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" +) + +type provider struct { + addresses []string +} + +func (p *provider) Addresses() []string { + return p.addresses +} + +func TestJumpHashClientPool_UpdateLoop(t *testing.T) { + interval := 100 * time.Millisecond + + provider := &provider{[]string{"localhost:9095"}} + pool := NewJumpHashClientPool(nil, provider, interval, log.NewNopLogger()) + require.Len(t, pool.Addrs(), 1) + require.Equal(t, "127.0.0.1:9095", pool.Addrs()[0].String()) + + // update address list + provider.addresses = []string{"localhost:9095", "localhost:9096"} + // wait refresh interval + time.Sleep(2 * interval) + // pool has been updated + require.Len(t, pool.Addrs(), 2) + require.Equal(t, "127.0.0.1:9095", pool.Addrs()[0].String()) + require.Equal(t, "127.0.0.1:9096", pool.Addrs()[1].String()) +} diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index 5e2484af29bf9..a9ded6f45f7d0 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -2,35 +2,19 @@ package bloomgateway import ( "context" - "fmt" - "math" "testing" - "time" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/ring" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/bloomutils" - "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/querier/plan" - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/validation" ) -func rs(id int, tokens ...uint32) ring.ReplicationSet { - inst := ring.InstanceDesc{ - Id: fmt.Sprintf("instance-%d", id), - Addr: fmt.Sprintf("10.0.0.%d", id), - Tokens: tokens, - } - return ring.ReplicationSet{Instances: []ring.InstanceDesc{inst}} -} - func TestBloomGatewayClient(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -42,7 +26,7 @@ func TestBloomGatewayClient(t *testing.T) { flagext.DefaultValues(&cfg) t.Run("FilterChunks returns response", func(t *testing.T) { - c, err := NewClient(cfg, &mockRing{}, l, reg, logger, "loki", nil, false) + c, err := NewClient(cfg, l, reg, logger, nil, false) require.NoError(t, err) expr, err := syntax.ParseExpr(`{foo="bar"}`) require.NoError(t, err) @@ -51,283 +35,3 @@ func TestBloomGatewayClient(t *testing.T) { require.Equal(t, 0, len(res)) }) } - -func TestBloomGatewayClient_ReplicationSetsWithBounds(t *testing.T) { - testCases := map[string]struct { - instances []ring.InstanceDesc - expected []rsWithRanges - }{ - "single instance covers full range": { - instances: []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{(1 << 31)}}, // 0x80000000 - }, - expected: []rsWithRanges{ - {rs: rs(1, (1 << 31)), ranges: []v1.FingerprintBounds{ - v1.NewBounds(0, math.MaxUint64), - }}, - }, - }, - "one token per instance": { - instances: []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{(1 << 30) * 1}}, // 0x40000000 - {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{(1 << 30) * 2}}, // 0x80000000 - {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{(1 << 30) * 3}}, // 0xc0000000 - }, - expected: []rsWithRanges{ - {rs: rs(1, (1<<30)*1), ranges: []v1.FingerprintBounds{ - v1.NewBounds(0, 4611686018427387903), - v1.NewBounds(13835058055282163712, 18446744073709551615), - }}, - {rs: rs(2, (1<<30)*2), ranges: []v1.FingerprintBounds{ - v1.NewBounds(4611686018427387904, 9223372036854775807), - }}, - {rs: rs(3, (1<<30)*3), ranges: []v1.FingerprintBounds{ - v1.NewBounds(9223372036854775808, 13835058055282163711), - }}, - }, - }, - "extreme tokens in ring": { - instances: []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0}}, - {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32}}, - }, - expected: []rsWithRanges{ - {rs: rs(1, 0), ranges: []v1.FingerprintBounds{ - v1.NewBounds(math.MaxUint64-math.MaxUint32, math.MaxUint64), - }}, - {rs: rs(2, math.MaxUint32), ranges: []v1.FingerprintBounds{ - v1.NewBounds(0, math.MaxUint64-math.MaxUint32-1), - }}, - }, - }, - } - - for name, tc := range testCases { - tc := tc - t.Run(name, func(t *testing.T) { - subRing := newMockRing(t, tc.instances) - res, err := replicationSetsWithBounds(subRing, tc.instances) - require.NoError(t, err) - require.Equal(t, tc.expected, res) - }) - } -} - -func TestBloomGatewayClient_PartitionByReplicationSet(t *testing.T) { - // Create 10 fingerprints [0, 2, 4, ... 18] - groups := make([]*logproto.GroupedChunkRefs, 0, 10) - for i := 0; i < 20; i += 2 { - groups = append(groups, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) - } - - // instance token ranges do not overlap - t.Run("non-overlapping", func(t *testing.T) { - - servers := []rsWithRanges{ - {rs: rs(1), ranges: []v1.FingerprintBounds{v1.NewBounds(0, 4)}}, - {rs: rs(2), ranges: []v1.FingerprintBounds{v1.NewBounds(5, 9), v1.NewBounds(15, 19)}}, - {rs: rs(3), ranges: []v1.FingerprintBounds{v1.NewBounds(10, 14)}}, - } - - // partition fingerprints - - expected := [][]*logproto.GroupedChunkRefs{ - { - {Fingerprint: 0}, - {Fingerprint: 2}, - {Fingerprint: 4}, - }, - { - {Fingerprint: 6}, - {Fingerprint: 8}, - {Fingerprint: 16}, - {Fingerprint: 18}, - }, - { - {Fingerprint: 10}, - {Fingerprint: 12}, - {Fingerprint: 14}, - }, - } - - partitioned := partitionByReplicationSet(groups, servers) - for i := range partitioned { - require.Equal(t, expected[i], partitioned[i].groups) - } - }) - - // instance token ranges overlap -- this should not happen in a real ring, though - t.Run("overlapping", func(t *testing.T) { - servers := []rsWithRanges{ - {rs: rs(1), ranges: []v1.FingerprintBounds{v1.NewBounds(0, 9)}}, - {rs: rs(2), ranges: []v1.FingerprintBounds{v1.NewBounds(5, 14)}}, - {rs: rs(3), ranges: []v1.FingerprintBounds{v1.NewBounds(10, 19)}}, - } - - // partition fingerprints - - expected := [][]*logproto.GroupedChunkRefs{ - { - {Fingerprint: 0}, - {Fingerprint: 2}, - {Fingerprint: 4}, - {Fingerprint: 6}, - {Fingerprint: 8}, - }, - { - {Fingerprint: 6}, - {Fingerprint: 8}, - {Fingerprint: 10}, - {Fingerprint: 12}, - {Fingerprint: 14}, - }, - { - {Fingerprint: 10}, - {Fingerprint: 12}, - {Fingerprint: 14}, - {Fingerprint: 16}, - {Fingerprint: 18}, - }, - } - - partitioned := partitionByReplicationSet(groups, servers) - for i := range partitioned { - require.Equal(t, expected[i], partitioned[i].groups) - } - }) -} - -func BenchmarkPartitionFingerprintsByAddresses(b *testing.B) { - numFp := 100000 - fpStep := math.MaxUint64 / uint64(numFp) - - groups := make([]*logproto.GroupedChunkRefs, 0, numFp) - for i := uint64(0); i < math.MaxUint64-fpStep; i += fpStep { - groups = append(groups, &logproto.GroupedChunkRefs{Fingerprint: i}) - } - - numServers := 100 - tokenStep := math.MaxUint32 / uint32(numServers) - servers := make([]rsWithRanges, 0, numServers) - for i := uint32(0); i < math.MaxUint32-tokenStep; i += tokenStep { - servers = append(servers, rsWithRanges{ - rs: rs(int(i)), - ranges: []v1.FingerprintBounds{ - v1.NewBounds(model.Fingerprint(i)<<32, model.Fingerprint(i+tokenStep)<<32), - }, - }) - } - - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - _ = partitionByReplicationSet(groups, servers) - } -} - -func TestBloomGatewayClient_MapTokenRangeToFingerprintRange(t *testing.T) { - testCases := map[string]struct { - lshift int - inp bloomutils.Range[uint32] - exp v1.FingerprintBounds - }{ - "single token expands to multiple fingerprints": { - inp: bloomutils.NewTokenRange(0, 0), - exp: v1.NewBounds(0, 0xffffffff), - }, - "max value expands to max value of new range": { - inp: bloomutils.NewTokenRange((1 << 31), math.MaxUint32), - exp: v1.NewBounds((1 << 63), 0xffffffffffffffff), - }, - } - for desc, tc := range testCases { - t.Run(desc, func(t *testing.T) { - actual := mapTokenRangeToFingerprintRange(tc.inp) - require.Equal(t, tc.exp, actual) - }) - } -} - -// make sure mockRing implements the ring.ReadRing interface -var _ ring.ReadRing = &mockRing{} - -func newMockRing(t *testing.T, instances []ring.InstanceDesc) *mockRing { - ranges := make([]ring.TokenRanges, 0) - for i := range instances { - tr, err := bloomutils.TokenRangesForInstance(instances[i].Id, instances) - if err != nil { - t.Fatal(err) - } - ranges = append(ranges, tr) - } - return &mockRing{ - instances: instances, - ranges: ranges, - } -} - -type mockRing struct { - instances []ring.InstanceDesc - ranges []ring.TokenRanges -} - -// Get implements ring.ReadRing. -func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (rs ring.ReplicationSet, err error) { - for i := range r.ranges { - if r.ranges[i].IncludesKey(key) { - rs.Instances = append(rs.Instances, r.instances[i]) - } - } - return -} - -// GetAllHealthy implements ring.ReadRing. -func (r *mockRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) { - return ring.ReplicationSet{ - Instances: r.instances, - }, nil -} - -// GetInstanceState implements ring.ReadRing. -func (*mockRing) GetInstanceState(_ string) (ring.InstanceState, error) { - panic("unimplemented") -} - -// GetReplicationSetForOperation implements ring.ReadRing. -func (*mockRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) { - panic("unimplemented") -} - -// HasInstance implements ring.ReadRing. -func (*mockRing) HasInstance(_ string) bool { - panic("unimplemented") -} - -// InstancesCount implements ring.ReadRing. -func (r *mockRing) InstancesCount() int { - return len(r.instances) -} - -// ReplicationFactor implements ring.ReadRing. -func (*mockRing) ReplicationFactor() int { - return 1 -} - -// ShuffleShard implements ring.ReadRing. -func (r *mockRing) ShuffleShard(_ string, _ int) ring.ReadRing { - return r -} - -// ShuffleShardWithLookback implements ring.ReadRing. -func (*mockRing) ShuffleShardWithLookback(_ string, _ int, _ time.Duration, _ time.Time) ring.ReadRing { - panic("unimplemented") -} - -// CleanupShuffleShardCache implements ring.ReadRing. -func (*mockRing) CleanupShuffleShardCache(_ string) { - panic("unimplemented") -} - -func (r *mockRing) GetTokenRangesForInstance(id string) (ring.TokenRanges, error) { - return bloomutils.TokenRangesForInstance(id, r.instances) -} diff --git a/pkg/bloomgateway/metrics.go b/pkg/bloomgateway/metrics.go index 34e1960fa45ee..0d408991b40c2 100644 --- a/pkg/bloomgateway/metrics.go +++ b/pkg/bloomgateway/metrics.go @@ -3,6 +3,7 @@ package bloomgateway import ( "time" + "github.com/grafana/dskit/instrument" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -16,6 +17,8 @@ type metrics struct { type clientMetrics struct { cacheLocalityScore prometheus.Histogram + requestLatency *prometheus.HistogramVec + clients prometheus.Gauge } func newClientMetrics(registerer prometheus.Registerer) *clientMetrics { @@ -27,6 +30,19 @@ func newClientMetrics(registerer prometheus.Registerer) *clientMetrics { Help: "Cache locality score of the bloom filter, as measured by % of keyspace touched / % of bloom_gws required", Buckets: prometheus.LinearBuckets(0.01, 0.2, 5), }), + requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "bloom_gateway_client", + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using the bloom gateway", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + clients: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Namespace: constants.Loki, + Subsystem: "bloom_gateway", + Name: "clients", + Help: "The current number of bloom gateway clients.", + }), } } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 946f434637b21..5d64b947437c3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1451,11 +1451,9 @@ func (t *Loki) initIndexGateway() (services.Service, error) { if t.Cfg.BloomGateway.Enabled { bloomGatewayClient, err := bloomgateway.NewClient( t.Cfg.BloomGateway.Client, - t.bloomGatewayRingManager.Ring, t.Overrides, prometheus.DefaultRegisterer, logger, - t.Cfg.MetricsNamespace, t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled, ) diff --git a/pkg/storage/chunk/cache/memcached_client.go b/pkg/storage/chunk/cache/memcached_client.go index d6df538342faf..995e896fbcfee 100644 --- a/pkg/storage/chunk/cache/memcached_client.go +++ b/pkg/storage/chunk/cache/memcached_client.go @@ -22,6 +22,7 @@ import ( "github.com/sony/gobreaker" "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/grafana/loki/v3/pkg/util/jumphash" ) // MemcachedClient interface exists for mocking memcacheClient. @@ -113,7 +114,7 @@ func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description st func NewMemcachedClient(cfg MemcachedClientConfig, name string, r prometheus.Registerer, logger log.Logger, metricsNamespace string) MemcachedClient { var selector serverSelector if cfg.ConsistentHash { - selector = DefaultMemcachedJumpHashSelector() + selector = jumphash.DefaultSelector() } else { selector = &memcache.ServerList{} } diff --git a/pkg/util/discovery/dns.go b/pkg/util/discovery/dns.go index e85479ced000d..e076f39e92d8a 100644 --- a/pkg/util/discovery/dns.go +++ b/pkg/util/discovery/dns.go @@ -3,6 +3,7 @@ package discovery import ( "context" "fmt" + "strings" "sync" "time" @@ -15,7 +16,7 @@ import ( type DNS struct { logger log.Logger cleanupPeriod time.Duration - address string + addresses []string stop chan struct{} done sync.WaitGroup once sync.Once @@ -27,7 +28,7 @@ func NewDNS(logger log.Logger, cleanupPeriod time.Duration, address string, reg d := &DNS{ logger: logger, cleanupPeriod: cleanupPeriod, - address: address, + addresses: strings.Split(address, ","), stop: make(chan struct{}), done: sync.WaitGroup{}, dnsProvider: dnsProvider, @@ -70,10 +71,10 @@ func (d *DNS) discoveryLoop() { } func (d *DNS) runDiscovery() { - ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, fmt.Errorf("DNS lookup timeout: %s", d.address)) + ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, fmt.Errorf("DNS lookup timeout: %v", d.addresses)) defer cancel() - err := d.dnsProvider.Resolve(ctx, []string{d.address}) + err := d.dnsProvider.Resolve(ctx, d.addresses) if err != nil { - level.Error(d.logger).Log("msg", "failed to resolve index gateway address", "err", err) + level.Error(d.logger).Log("msg", "failed to resolve server addresses", "err", err) } } diff --git a/pkg/storage/chunk/cache/memcached_client_selector.go b/pkg/util/jumphash/memcached_client_selector.go similarity index 80% rename from pkg/storage/chunk/cache/memcached_client_selector.go rename to pkg/util/jumphash/memcached_client_selector.go index 8c8d49e2ba3af..ccec90fa0dda2 100644 --- a/pkg/storage/chunk/cache/memcached_client_selector.go +++ b/pkg/util/jumphash/memcached_client_selector.go @@ -1,4 +1,4 @@ -package cache +package jumphash import ( "net" @@ -13,16 +13,16 @@ import ( util_log "github.com/grafana/loki/v3/pkg/util/log" ) -// MemcachedJumpHashSelector implements the memcache.ServerSelector -// interface. MemcachedJumpHashSelector utilizes a jump hash to +// Selector implements the memcache.ServerSelector +// interface. Selector utilizes a jump hash to // distribute keys to servers. // // While adding or removing servers only requires 1/N keys to move, // servers are treated as a stack and can only be pushed/popped. -// Therefore, MemcachedJumpHashSelector works best for servers +// Therefore, Selector works best for servers // with consistent DNS names where the naturally sorted order // is predictable. -type MemcachedJumpHashSelector struct { +type Selector struct { mu sync.RWMutex addrs []net.Addr resolveUnixAddr UnixResolver @@ -33,15 +33,15 @@ type UnixResolver func(network, address string) (*net.UnixAddr, error) type TCPResolver func(network, address string) (*net.TCPAddr, error) -func NewMemcachedJumpHashSelector(resolveUnixAddr UnixResolver, resolveTCPAddr TCPResolver) *MemcachedJumpHashSelector { - return &MemcachedJumpHashSelector{ +func NewSelector(resolveUnixAddr UnixResolver, resolveTCPAddr TCPResolver) *Selector { + return &Selector{ resolveUnixAddr: resolveUnixAddr, resolveTCPAddr: resolveTCPAddr, } } -func DefaultMemcachedJumpHashSelector() *MemcachedJumpHashSelector { - return &MemcachedJumpHashSelector{ +func DefaultSelector() *Selector { + return &Selector{ resolveUnixAddr: net.ResolveUnixAddr, resolveTCPAddr: net.ResolveTCPAddr, } @@ -78,7 +78,7 @@ func (a *staticAddr) String() string { return a.str } // To minimize the number of rehashes for keys when scaling the // number of servers in subsequent calls to SetServers, servers // are stored in natural sort order. -func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error { +func (s *Selector) SetServers(servers ...string) error { sortedServers := make([]string, len(servers)) copy(sortedServers, servers) natsort.Sort(sortedServers) @@ -138,7 +138,11 @@ func jumpHash(key uint64, numBuckets int) int32 { // PickServer returns the server address that a given item // should be shared onto. -func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { +func (s *Selector) PickServer(key string) (net.Addr, error) { + return s.FromString(key) +} + +func (s *Selector) FromString(key string) (net.Addr, error) { s.mu.RLock() defer s.mu.RUnlock() if len(s.addrs) == 0 { @@ -151,10 +155,22 @@ func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { return s.addrs[idx], nil } +func (s *Selector) FromUInt64(key uint64) (net.Addr, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if len(s.addrs) == 0 { + return nil, memcache.ErrNoServers + } else if len(s.addrs) == 1 { + return s.addrs[0], nil + } + idx := jumpHash(key, len(s.addrs)) + return s.addrs[idx], nil +} + // Each iterates over each server and calls the given function. // If f returns a non-nil error, iteration will stop and that // error will be returned. -func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { +func (s *Selector) Each(f func(net.Addr) error) error { s.mu.RLock() defer s.mu.RUnlock() for _, def := range s.addrs { @@ -164,3 +180,9 @@ func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { } return nil } + +func (s *Selector) Addrs() []net.Addr { + s.mu.RLock() + defer s.mu.RUnlock() + return s.addrs +} diff --git a/pkg/storage/chunk/cache/memcached_client_selector_test.go b/pkg/util/jumphash/memcached_client_selector_test.go similarity index 93% rename from pkg/storage/chunk/cache/memcached_client_selector_test.go rename to pkg/util/jumphash/memcached_client_selector_test.go index cec908876b1bb..0708f06d763e4 100644 --- a/pkg/storage/chunk/cache/memcached_client_selector_test.go +++ b/pkg/util/jumphash/memcached_client_selector_test.go @@ -1,4 +1,4 @@ -package cache_test +package jumphash import ( "fmt" @@ -8,8 +8,6 @@ import ( "github.com/facette/natsort" "github.com/grafana/gomemcache/memcache" "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/storage/chunk/cache" ) func TestNatSort(t *testing.T) { @@ -58,7 +56,7 @@ var mockTCPResolver = func(network, address string) (*net.TCPAddr, error) { } func TestMemcachedJumpHashSelector_PickSever(t *testing.T) { - s := cache.NewMemcachedJumpHashSelector( + s := NewSelector( mockUnixResolver, mockTCPResolver, ) @@ -85,7 +83,7 @@ func TestMemcachedJumpHashSelector_PickSever(t *testing.T) { } func TestMemcachedJumpHashSelector_PickSever_ErrNoServers(t *testing.T) { - s := cache.NewMemcachedJumpHashSelector( + s := NewSelector( mockUnixResolver, mockTCPResolver, )