Skip to content

Commit

Permalink
chore(blooms): Remove bloom gateway ring (grafana#12484)
Browse files Browse the repository at this point in the history
Remove ring in favour of a client side, DNS discovery of the bloom gateway servers combined with consistent hashing using the jumphash algorithm (grafana#12470).

Instead of configuring the ring, the bloom gateway client needs to be configured with the `-bloom-gateway-client.addresses` flag, which takes a list of comma separated, DNS-discovery-style strings.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored and rhnasc committed Apr 12, 2024
1 parent 8d1e1a3 commit 21cd5d4
Show file tree
Hide file tree
Showing 7 changed files with 3 additions and 211 deletions.
98 changes: 0 additions & 98 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1986,102 +1986,6 @@ ring:
The `bloom_gateway` block configures the Loki bloom gateway server, responsible for serving queries for filtering chunks based on filter expressions.

```yaml
# Defines the ring to be used by the bloom gateway servers and clients. In case
# this isn't configured, this block supports inheriting configuration from the
# common ring section.
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -bloom-gateway.ring.store
[store: <string> | default = "consul"]
# The prefix for the keys in the store. Should end with a /.
# CLI flag: -bloom-gateway.ring.prefix
[prefix: <string> | default = "collectors/"]
# Configuration for a Consul client. Only applies if the selected kvstore is
# consul.
# The CLI flags prefix for this block configuration is: bloom-gateway.ring
[consul: <consul>]
# Configuration for an ETCD v3 client. Only applies if the selected kvstore
# is etcd.
# The CLI flags prefix for this block configuration is: bloom-gateway.ring
[etcd: <etcd>]
multi:
# Primary backend storage used by multi-client.
# CLI flag: -bloom-gateway.ring.multi.primary
[primary: <string> | default = ""]
# Secondary backend storage used by multi-client.
# CLI flag: -bloom-gateway.ring.multi.secondary
[secondary: <string> | default = ""]
# Mirror writes to secondary store.
# CLI flag: -bloom-gateway.ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]
# Timeout for storing value to secondary store.
# CLI flag: -bloom-gateway.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -bloom-gateway.ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which compactors are considered unhealthy within
# the ring. 0 = never (timeout disabled).
# CLI flag: -bloom-gateway.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# File path where tokens are stored. If empty, tokens are not stored at
# shutdown and restored at startup.
# CLI flag: -bloom-gateway.ring.tokens-file-path
[tokens_file_path: <string> | default = ""]
# True to enable zone-awareness and replicate blocks across different
# availability zones.
# CLI flag: -bloom-gateway.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Number of tokens to use in the ring. The bigger the number of tokens, the
# more fingerprint ranges the compactor will own, but the smaller these ranges
# will be. Bigger number of tokens means that more but smaller requests will
# be handled by each gateway.
# CLI flag: -bloom-gateway.ring.tokens
[num_tokens: <int> | default = 16]
# Factor for data replication.
# CLI flag: -bloom-gateway.ring.replication-factor
[replication_factor: <int> | default = 3]
# Instance ID to register in the ring.
# CLI flag: -bloom-gateway.ring.instance-id
[instance_id: <string> | default = "<hostname>"]
# Name of network interface to read address from.
# CLI flag: -bloom-gateway.ring.instance-interface-names
[instance_interface_names: <list of strings> | default = [<private network interfaces>]]
# Port to advertise in the ring (defaults to server.grpc-listen-port).
# CLI flag: -bloom-gateway.ring.instance-port
[instance_port: <int> | default = 0]
# IP address to advertise in the ring.
# CLI flag: -bloom-gateway.ring.instance-addr
[instance_addr: <string> | default = ""]
# The availability zone where this instance is running. Required if
# zone-awareness is enabled.
# CLI flag: -bloom-gateway.ring.instance-availability-zone
[instance_availability_zone: <string> | default = ""]
# Enable using a IPv6 instance address.
# CLI flag: -bloom-gateway.ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]
# Flag to enable or disable the bloom gateway component globally.
# CLI flag: -bloom-gateway.enabled
[enabled: <boolean> | default = false]
Expand Down Expand Up @@ -4149,7 +4053,6 @@ ring:
Configuration for a Consul client. Only applies if the selected kvstore is `consul`. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bloom-compactor.ring`
- `bloom-gateway.ring`
- `common.storage.ring`
- `compactor.ring`
- `distributor.ring`
Expand Down Expand Up @@ -4196,7 +4099,6 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons
Configuration for an ETCD v3 client. Only applies if the selected kvstore is `etcd`. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bloom-compactor.ring`
- `bloom-gateway.ring`
- `common.storage.ring`
- `compactor.ring`
- `distributor.ring`
Expand Down
34 changes: 2 additions & 32 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
Expand All @@ -29,7 +26,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
"github.com/grafana/loki/v3/pkg/validation"
)

Expand Down Expand Up @@ -99,20 +95,8 @@ func TestBloomGateway_StartStopService(t *testing.T) {
reg := prometheus.NewRegistry()

t.Run("start and stop bloom gateway", func(t *testing.T) {
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg)
t.Cleanup(func() {
closer.Close()
})

cfg := Config{
Enabled: true,
Ring: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
ReplicationFactor: 1,
NumTokens: 16,
},
Enabled: true,
WorkerConcurrency: 4,
MaxOutstandingPerTenant: 1024,
}
Expand All @@ -137,22 +121,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
tenantID := "test"

logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg)
t.Cleanup(func() {
closer.Close()
})

cfg := Config{
Enabled: true,
Ring: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
ReplicationFactor: 1,
NumTokens: 16,
},
Enabled: true,
WorkerConcurrency: 2,
BlockQueryConcurrency: 2,
MaxOutstandingPerTenant: 1024,
Expand Down
13 changes: 0 additions & 13 deletions pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,10 @@ package bloomgateway

import (
"flag"

"github.com/grafana/loki/v3/pkg/util/ring"
)

// Config configures the Bloom Gateway component.
type Config struct {
// Ring configures the ring store used to save and retrieve the different Bloom Gateway instances.
// In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration
// section and the ingester configuration by default).
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom gateway servers and clients. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks.
Enabled bool `yaml:"enabled"`
// Client configures the Bloom Gateway client
Expand All @@ -38,13 +32,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)

// Ring
skipFlags := []string{
prefix + "ring.tokens",
}
cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f, skipFlags...)
f.IntVar(&cfg.Ring.NumTokens, prefix+"ring.tokens", 16, "Number of tokens to use in the ring. The bigger the number of tokens, the more fingerprint ranges the compactor will own, but the smaller these ranges will be. Bigger number of tokens means that more but smaller requests will be handled by each gateway.")
}

type Limits interface {
Expand Down
27 changes: 0 additions & 27 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
func applyCommonReplicationFactor(r, defaults *ConfigWrapper) {
if !reflect.DeepEqual(r.Common.ReplicationFactor, defaults.Common.ReplicationFactor) {
r.IndexGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor
r.BloomGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor
}
}

Expand Down Expand Up @@ -332,20 +331,6 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, merg
r.BloomCompactor.Ring.KVStore = rc.KVStore
r.BloomCompactor.Ring.NumTokens = rc.NumTokens
}

// BloomGateway
if mergeWithExisting || reflect.DeepEqual(r.BloomGateway.Ring, defaults.BloomGateway.Ring) {
r.BloomGateway.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.BloomGateway.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.BloomGateway.Ring.InstancePort = rc.InstancePort
r.BloomGateway.Ring.InstanceAddr = rc.InstanceAddr
r.BloomGateway.Ring.InstanceID = rc.InstanceID
r.BloomGateway.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.BloomGateway.Ring.InstanceZone = rc.InstanceZone
r.BloomGateway.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.BloomGateway.Ring.KVStore = rc.KVStore
r.BloomGateway.Ring.NumTokens = rc.NumTokens
}
}

func applyTokensFilePath(cfg *ConfigWrapper) error {
Expand Down Expand Up @@ -384,13 +369,6 @@ func applyTokensFilePath(cfg *ConfigWrapper) error {
}
cfg.BloomCompactor.Ring.TokensFilePath = f

// Bloom-Gateway
f, err = tokensFile(cfg, "bloomgateway.tokens")
if err != nil {
return err
}
cfg.BloomGateway.Ring.TokensFilePath = f

// Pattern
f, err = tokensFile(cfg, "pattern.tokens")
if err != nil {
Expand Down Expand Up @@ -487,10 +465,6 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
if reflect.DeepEqual(cfg.BloomCompactor.Ring.InstanceInterfaceNames, defaults.BloomCompactor.Ring.InstanceInterfaceNames) {
cfg.BloomCompactor.Ring.InstanceInterfaceNames = append(cfg.BloomCompactor.Ring.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.BloomGateway.Ring.InstanceInterfaceNames, defaults.BloomGateway.Ring.InstanceInterfaceNames) {
cfg.BloomGateway.Ring.InstanceInterfaceNames = append(cfg.BloomGateway.Ring.InstanceInterfaceNames, loopbackIface)
}
}

// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist.
Expand All @@ -506,7 +480,6 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
r.IndexGateway.Ring.KVStore.Store = memberlistStr
r.BloomCompactor.Ring.KVStore.Store = memberlistStr
r.BloomGateway.Ring.KVStore.Store = memberlistStr
}

var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")
Expand Down
9 changes: 1 addition & 8 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,6 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(IndexGatewayInterceptors, t.initIndexGatewayInterceptors, modules.UserInvisibleModule)
mm.RegisterModule(BloomGateway, t.initBloomGateway)
mm.RegisterModule(BloomGatewayRing, t.initBloomGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule)
mm.RegisterModule(Analytics, t.initAnalytics)
Expand Down Expand Up @@ -713,14 +712,13 @@ func (t *Loki) setupModuleManager() error {
TableManager: {Server, Analytics},
Compactor: {Server, Overrides, MemberlistKV, Analytics},
IndexGateway: {Server, Store, IndexGatewayRing, IndexGatewayInterceptors, Analytics},
BloomGateway: {Server, BloomStore, BloomGatewayRing, Analytics},
BloomGateway: {Server, BloomStore, Analytics},
BloomCompactor: {Server, BloomStore, BloomCompactorRing, Analytics, Store},
PatternIngester: {Server, MemberlistKV, Analytics},
PatternRingClient: {Server, MemberlistKV, Analytics},
IngesterQuerier: {Ring},
QuerySchedulerRing: {Overrides, MemberlistKV},
IndexGatewayRing: {Overrides, MemberlistKV},
BloomGatewayRing: {Overrides, MemberlistKV},
BloomCompactorRing: {Overrides, MemberlistKV},
MemberlistKV: {Server},

Expand Down Expand Up @@ -777,11 +775,6 @@ func (t *Loki) setupModuleManager() error {
deps[Server] = append(deps[Server], IngesterGRPCInterceptors)
}

// Add bloom gateway ring in client mode to IndexGateway service dependencies if bloom filtering is enabled.
if t.Cfg.BloomGateway.Enabled {
deps[IndexGateway] = append(deps[IndexGateway], BloomGatewayRing)
}

if t.Cfg.LegacyReadTarget {
deps[Read] = append(deps[Read], deps[Backend]...)
}
Expand Down
32 changes: 0 additions & 32 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ const (
MemberlistKV string = "memberlist-kv"
Compactor string = "compactor"
BloomGateway string = "bloom-gateway"
BloomGatewayRing string = "bloom-gateway-ring"
IndexGateway string = "index-gateway"
IndexGatewayRing string = "index-gateway-ring"
IndexGatewayInterceptors string = "index-gateway-interceptors"
Expand Down Expand Up @@ -278,7 +277,6 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) {
t.Cfg.BloomCompactor.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.IndexGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.BloomGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.QueryScheduler.SchedulerRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Ruler.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
Expand Down Expand Up @@ -1293,7 +1291,6 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.BloomGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.BloomCompactor.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Pattern.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Server.HTTP.Handle("/memberlist", t.MemberlistKV)
Expand Down Expand Up @@ -1386,35 +1383,6 @@ func (t *Loki) initBloomGateway() (services.Service, error) {
return gateway, nil
}

func (t *Loki) initBloomGatewayRing() (services.Service, error) {
if !t.Cfg.BloomGateway.Enabled {
return nil, nil
}
// Inherit ring listen port from gRPC config
t.Cfg.BloomGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

// TODO(chaudum): Do we want to integration the bloom gateway component into the backend target?
mode := lokiring.ClientMode
legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read)
if t.Cfg.isModuleEnabled(BloomGateway) || t.Cfg.isModuleEnabled(Backend) || legacyReadMode {
mode = lokiring.ServerMode
}
manager, err := lokiring.NewRingManager(bloomGatewayRingKey, mode, t.Cfg.BloomGateway.Ring, t.Cfg.BloomGateway.Ring.ReplicationFactor, t.Cfg.BloomGateway.Ring.NumTokens, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "error initializing bloom gateway ring manager")
}

t.bloomGatewayRingManager = manager

t.Server.HTTP.Path("/bloomgateway/ring").Methods("GET", "POST").Handler(t.bloomGatewayRingManager)

if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/bloomgateway/ring").Methods("GET", "POST").Handler(t.bloomGatewayRingManager)
}

return t.bloomGatewayRingManager, nil
}

func (t *Loki) initIndexGateway() (services.Service, error) {
shardingStrategy := indexgateway.GetShardingStrategy(t.Cfg.IndexGateway, t.indexGatewayRingManager, t.Overrides)

Expand Down
1 change: 0 additions & 1 deletion pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
cfg.IndexGateway.Mode = indexgateway.SimpleMode
cfg.IndexGateway.Ring.InstanceAddr = localhost
cfg.BloomCompactor.Ring.InstanceAddr = localhost
cfg.BloomGateway.Ring.InstanceAddr = localhost
cfg.CompactorConfig.CompactorRing.InstanceAddr = localhost
cfg.CompactorConfig.WorkingDirectory = filepath.Join(dir, "compactor")

Expand Down

0 comments on commit 21cd5d4

Please sign in to comment.