From 15dc2bac04b9119b1e6f0358614da22150b9ad1a Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 27 Mar 2024 19:55:13 +0100 Subject: [PATCH] feat(bloomstore): Support for sharding blocks across multiple different directories (#12375) Signed-off-by: Christian Haudum --- docs/sources/configure/_index.md | 5 +- pkg/bloomgateway/bloomgateway_test.go | 2 +- pkg/loki/config_wrapper.go | 7 ++- pkg/loki/config_wrapper_test.go | 5 +- pkg/loki/modules_test.go | 2 +- .../stores/shipper/bloomshipper/cache.go | 13 ++++-- .../stores/shipper/bloomshipper/cache_test.go | 2 +- .../stores/shipper/bloomshipper/client.go | 7 ++- .../shipper/bloomshipper/client_test.go | 4 +- .../shipper/bloomshipper/config/config.go | 10 ++-- .../stores/shipper/bloomshipper/fetcher.go | 6 ++- .../shipper/bloomshipper/fetcher_test.go | 12 +++-- .../stores/shipper/bloomshipper/resolver.go | 46 +++++++++++++++++++ .../shipper/bloomshipper/resolver_test.go | 46 +++++++++++++++++++ .../stores/shipper/bloomshipper/store.go | 10 ++-- .../stores/shipper/bloomshipper/store_test.go | 2 +- 16 files changed, 148 insertions(+), 31 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 158bca6c00585..93582d3897b6f 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2353,9 +2353,10 @@ tsdb_shipper: # Configures Bloom Shipper. bloom_shipper: - # Working directory to store downloaded Bloom Blocks. + # Working directory to store downloaded bloom blocks. Supports multiple + # directories, separated by comma. # CLI flag: -bloom.shipper.working-directory - [working_directory: | default = "bloom-shipper"] + [working_directory: | default = "/data/blooms"] # Maximum size of bloom pages that should be queried. Larger pages than this # limit are skipped when querying blooms to limit memory usage. diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 45c9a3926c157..f705bb5eb0919 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -72,7 +72,7 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore { } storageCfg := storage.Config{ BloomShipperConfig: bloomshipperconfig.Config{ - WorkingDirectory: t.TempDir(), + WorkingDirectory: []string{t.TempDir()}, BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{ WorkersCount: 1, }, diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 8a5f6c6811250..c602f53cc6dd1 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -409,8 +409,11 @@ func applyPathPrefixDefaults(r, defaults *ConfigWrapper) { if r.CompactorConfig.WorkingDirectory == defaults.CompactorConfig.WorkingDirectory { r.CompactorConfig.WorkingDirectory = fmt.Sprintf("%s/compactor", prefix) } - if r.StorageConfig.BloomShipperConfig.WorkingDirectory == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory { - r.StorageConfig.BloomShipperConfig.WorkingDirectory = fmt.Sprintf("%s/blooms", prefix) + if len(r.StorageConfig.BloomShipperConfig.WorkingDirectory) == 1 && + len(r.StorageConfig.BloomShipperConfig.WorkingDirectory) == len(defaults.StorageConfig.BloomShipperConfig.WorkingDirectory) && + + r.StorageConfig.BloomShipperConfig.WorkingDirectory[0] == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory[0] { + _ = r.StorageConfig.BloomShipperConfig.WorkingDirectory.Set(fmt.Sprintf("%s/blooms", prefix)) } } } diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 41705f012f020..f6e22f74add51 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/netutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -100,7 +101,7 @@ common: assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath) assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir) assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory) - assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory) + assert.EqualValues(t, flagext.StringSliceCSV{"/opt/loki/blooms"}, config.StorageConfig.BloomShipperConfig.WorkingDirectory) }) t.Run("accepts paths both with and without trailing slash", func(t *testing.T) { @@ -112,7 +113,7 @@ common: assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath) assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir) assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory) - assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory) + assert.EqualValues(t, flagext.StringSliceCSV{"/opt/loki/blooms"}, config.StorageConfig.BloomShipperConfig.WorkingDirectory) }) t.Run("does not rewrite custom (non-default) paths passed via config file", func(t *testing.T) { diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index 61cc9198bbf28..1c8945b51a37e 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -367,7 +367,7 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f cfg.StorageConfig = storage.Config{ FSConfig: local.FSConfig{Directory: dir}, BloomShipperConfig: bloomshipperconfig.Config{ - WorkingDirectory: filepath.Join(dir, "blooms"), + WorkingDirectory: []string{filepath.Join(dir, "blooms")}, BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{ WorkersCount: 1, }, diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 3097822fccf75..e7fcfaff1666a 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -12,6 +12,7 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/util" ) type CloseableBlockQuerier struct { @@ -34,10 +35,14 @@ func (c *CloseableBlockQuerier) SeriesIter() (v1.PeekingIterator[*v1.SeriesWithB return v1.NewPeekingIter[*v1.SeriesWithBloom](c.BlockQuerier), nil } -func LoadBlocksDirIntoCache(path string, c Cache, logger log.Logger) error { - level.Debug(logger).Log("msg", "load bloomshipper working directory into cache", "path", path) - keys, values := loadBlockDirectories(path, logger) - return c.PutMany(context.Background(), keys, values) +func LoadBlocksDirIntoCache(paths []string, c Cache, logger log.Logger) error { + var err util.MultiError + for _, path := range paths { + level.Debug(logger).Log("msg", "load bloomshipper working directory into cache", "path", path) + keys, values := loadBlockDirectories(path, logger) + err.Add(c.PutMany(context.Background(), keys, values)) + } + return err.Err() } func loadBlockDirectories(root string, logger log.Logger) (keys []string, values []BlockDirectory) { diff --git a/pkg/storage/stores/shipper/bloomshipper/cache_test.go b/pkg/storage/stores/shipper/bloomshipper/cache_test.go index ca591efebb993..eb2a061c775bb 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache_test.go @@ -88,7 +88,7 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) { } c := NewFsBlocksCache(cfg, nil, log.NewNopLogger()) - err := LoadBlocksDirIntoCache(wd, c, logger) + err := LoadBlocksDirIntoCache([]string{wd, t.TempDir()}, c, logger) require.NoError(t, err) require.Equal(t, 1, len(c.entries)) diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index f5258570d869c..7e9128a9971a3 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -256,9 +256,14 @@ type BloomClient struct { } func NewBloomClient(cfg bloomStoreConfig, client client.ObjectClient, logger log.Logger) (*BloomClient, error) { + fsResolver, err := NewShardedPrefixedResolver(cfg.workingDirs, defaultKeyResolver{}) + if err != nil { + return nil, errors.Wrap(err, "creating fs resolver") + } + return &BloomClient{ KeyResolver: defaultKeyResolver{}, // TODO(owen-d): hook into schema, similar to `{,Parse}ExternalKey` - fsResolver: NewPrefixedResolver(cfg.workingDir, defaultKeyResolver{}), + fsResolver: fsResolver, concurrency: cfg.numWorkers, client: client, logger: logger, diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index e5bbe3b5b1bf5..cee23671b7216 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -41,8 +41,8 @@ func newMockBloomClient(t *testing.T) (*BloomClient, string) { dir := t.TempDir() logger := log.NewLogfmtLogger(os.Stderr) cfg := bloomStoreConfig{ - workingDir: dir, - numWorkers: 3, + workingDirs: []string{dir}, + numWorkers: 3, } client, err := NewBloomClient(cfg, oc, logger) require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index 791c97bfe1e4d..3aef86cabdf22 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -4,7 +4,6 @@ package config import ( "errors" "flag" - "strings" "time" "github.com/grafana/dskit/flagext" @@ -13,7 +12,7 @@ import ( ) type Config struct { - WorkingDirectory string `yaml:"working_directory"` + WorkingDirectory flagext.StringSliceCSV `yaml:"working_directory"` MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"` BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"` BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` @@ -31,7 +30,8 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla } func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.") + c.WorkingDirectory = []string{"/data/blooms"} + f.Var(&c.WorkingDirectory, prefix+"shipper.working-directory", "Working directory to store downloaded bloom blocks. Supports multiple directories, separated by comma.") _ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.") c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f) @@ -40,8 +40,8 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } func (c *Config) Validate() error { - if strings.TrimSpace(c.WorkingDirectory) == "" { - return errors.New("working directory must be specified") + if len(c.WorkingDirectory) == 0 { + return errors.New("at least one working directory must be specified") } return nil } diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index 74fb9a177a667..b9483675f21cc 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -91,12 +91,16 @@ func NewFetcher( logger log.Logger, bloomMetrics *v1.Metrics, ) (*Fetcher, error) { + localFSResolver, err := NewShardedPrefixedResolver(cfg.workingDirs, defaultKeyResolver{}) + if err != nil { + return nil, errors.Wrap(err, "creating fs resolver") + } fetcher := &Fetcher{ cfg: cfg, client: client, metasCache: metasCache, blocksCache: blocksCache, - localFSResolver: NewPrefixedResolver(cfg.workingDir, defaultKeyResolver{}), + localFSResolver: localFSResolver, metrics: newFetcherMetrics(reg, constants.Loki, "bloom_store"), bloomMetrics: bloomMetrics, logger: logger, diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index e51d153098381..ca3fc006c2688 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -100,7 +100,7 @@ func TestMetasFetcher(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() metasCache := cache.NewMockCache() - cfg := bloomStoreConfig{workingDir: t.TempDir(), numWorkers: 1} + cfg := bloomStoreConfig{workingDirs: []string{t.TempDir()}, numWorkers: 1} oc, err := local.NewFSObjectClient(local.FSConfig{Directory: dir}) require.NoError(t, err) @@ -259,7 +259,7 @@ func TestFetcher_DownloadQueue(t *testing.T) { func TestFetcher_LoadBlocksFromFS(t *testing.T) { base := t.TempDir() - cfg := bloomStoreConfig{workingDir: base, numWorkers: 1} + cfg := bloomStoreConfig{workingDirs: []string{base}, numWorkers: 1} resolver := NewPrefixedResolver(base, defaultKeyResolver{}) refs := []BlockRef{ @@ -312,9 +312,13 @@ func createBlockDir(t *testing.T, path string) { } func TestFetcher_IsBlockDir(t *testing.T) { - cfg := bloomStoreConfig{numWorkers: 1} + cfg := bloomStoreConfig{ + numWorkers: 1, + workingDirs: []string{t.TempDir()}, + } - fetcher, _ := NewFetcher(cfg, nil, nil, nil, nil, log.NewNopLogger(), v1.NewMetrics(nil)) + fetcher, err := NewFetcher(cfg, nil, nil, nil, nil, log.NewNopLogger(), v1.NewMetrics(nil)) + require.NoError(t, err) t.Run("path does not exist", func(t *testing.T) { base := t.TempDir() diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go index b88a48758d63d..7fb6652ebd174 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go @@ -2,6 +2,8 @@ package bloomshipper import ( "fmt" + "hash" + "hash/fnv" "path" "path/filepath" "strconv" @@ -150,6 +152,50 @@ func (p PrefixedResolver) Block(ref BlockRef) Location { } } +type hashable interface { + Hash(hash.Hash32) error +} + +type ShardedPrefixedResolver struct { + prefixes []string + KeyResolver +} + +func NewShardedPrefixedResolver(prefixes []string, resolver KeyResolver) (KeyResolver, error) { + n := len(prefixes) + switch n { + case 0: + return nil, fmt.Errorf("requires at least 1 prefix") + case 1: + return NewPrefixedResolver(prefixes[0], resolver), nil + default: + return ShardedPrefixedResolver{ + prefixes: prefixes, + KeyResolver: resolver, + }, nil + } +} + +func (r ShardedPrefixedResolver) prefix(ref hashable) key { + h := fnv.New32() + _ = ref.Hash(h) + return key(r.prefixes[h.Sum32()%uint32(len(r.prefixes))]) +} + +func (r ShardedPrefixedResolver) Meta(ref MetaRef) Location { + return locations{ + r.prefix(ref), + r.KeyResolver.Meta(ref), + } +} + +func (r ShardedPrefixedResolver) Block(ref BlockRef) Location { + return locations{ + r.prefix(ref), + r.KeyResolver.Block(ref), + } +} + type Location interface { Addr() string // object storage location LocalPath() string // local path version diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go index b2aa7e60a4b53..151b3bc11bb47 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go @@ -53,3 +53,49 @@ func TestResolver_ParseBlockKey(t *testing.T) { require.NoError(t, err) require.Equal(t, ref, parsed) } + +func TestResolver_ShardedPrefixedResolver(t *testing.T) { + + blockRef := BlockRef{ + Ref: Ref{ + TenantID: "tenant", + TableName: "table_1", + Bounds: v1.NewBounds(0x0000, 0xffff), + StartTimestamp: 0, + EndTimestamp: 3600000, + Checksum: 48350, + }, + } + + metaRef := MetaRef{ + Ref: Ref{ + TenantID: "tenant", + TableName: "table_1", + Bounds: v1.NewBounds(0x0000, 0xffff), + Checksum: 43981, + }, + } + + t.Run("empty prefixes cause error", func(t *testing.T) { + _, err := NewShardedPrefixedResolver([]string{}, defaultKeyResolver{}) + require.ErrorContains(t, err, "requires at least 1 prefix") + }) + + t.Run("single prefix", func(t *testing.T) { + r, err := NewShardedPrefixedResolver([]string{"prefix"}, defaultKeyResolver{}) + require.NoError(t, err) + loc := r.Meta(metaRef) + require.Equal(t, "prefix/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath()) + loc = r.Block(blockRef) + require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath()) + }) + + t.Run("multiple prefixes", func(t *testing.T) { + r, err := NewShardedPrefixedResolver([]string{"a", "b", "c", "d"}, defaultKeyResolver{}) + require.NoError(t, err) + loc := r.Meta(metaRef) + require.Equal(t, "b/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath()) + loc = r.Block(blockRef) + require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath()) + }) +} diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 8daa94bddf00d..aed16cd8c2532 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -41,7 +41,7 @@ type StoreWithMetrics interface { } type bloomStoreConfig struct { - workingDir string + workingDirs []string numWorkers int maxBloomPageSize int } @@ -193,13 +193,15 @@ func NewBloomStore( // TODO(chaudum): Remove wrapper cfg := bloomStoreConfig{ - workingDir: storageConfig.BloomShipperConfig.WorkingDirectory, + workingDirs: storageConfig.BloomShipperConfig.WorkingDirectory, numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount, maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize), } - if err := util.EnsureDirectory(cfg.workingDir); err != nil { - return nil, errors.Wrapf(err, "failed to create working directory for bloom store: '%s'", cfg.workingDir) + for _, wd := range cfg.workingDirs { + if err := util.EnsureDirectory(wd); err != nil { + return nil, errors.Wrapf(err, "failed to create working directory for bloom store: '%s'", wd) + } } for _, periodicConfig := range periodicConfigs { diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index 9274bfc620b6e..074a965ddb5b4 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -51,7 +51,7 @@ func newMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*BloomStore, st storageConfig := storage.Config{ BloomShipperConfig: config.Config{ - WorkingDirectory: workDir, + WorkingDirectory: []string{workDir}, BlocksDownloadingQueue: config.DownloadingQueueConfig{ WorkersCount: 1, },