Skip to content

Commit

Permalink
feat(bloomstore): Support for sharding blocks across multiple differe…
Browse files Browse the repository at this point in the history
…nt directories (#12375)

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Mar 27, 2024
1 parent c1edb82 commit 15dc2ba
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 31 deletions.
5 changes: 3 additions & 2 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2353,9 +2353,10 @@ tsdb_shipper:

# Configures Bloom Shipper.
bloom_shipper:
# Working directory to store downloaded Bloom Blocks.
# Working directory to store downloaded bloom blocks. Supports multiple
# directories, separated by comma.
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]
[working_directory: <string> | default = "/data/blooms"]

# Maximum size of bloom pages that should be queried. Larger pages than this
# limit are skipped when querying blooms to limit memory usage.
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {
}
storageCfg := storage.Config{
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: t.TempDir(),
WorkingDirectory: []string{t.TempDir()},
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},
Expand Down
7 changes: 5 additions & 2 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,11 @@ func applyPathPrefixDefaults(r, defaults *ConfigWrapper) {
if r.CompactorConfig.WorkingDirectory == defaults.CompactorConfig.WorkingDirectory {
r.CompactorConfig.WorkingDirectory = fmt.Sprintf("%s/compactor", prefix)
}
if r.StorageConfig.BloomShipperConfig.WorkingDirectory == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory {
r.StorageConfig.BloomShipperConfig.WorkingDirectory = fmt.Sprintf("%s/blooms", prefix)
if len(r.StorageConfig.BloomShipperConfig.WorkingDirectory) == 1 &&
len(r.StorageConfig.BloomShipperConfig.WorkingDirectory) == len(defaults.StorageConfig.BloomShipperConfig.WorkingDirectory) &&

r.StorageConfig.BloomShipperConfig.WorkingDirectory[0] == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory[0] {
_ = r.StorageConfig.BloomShipperConfig.WorkingDirectory.Set(fmt.Sprintf("%s/blooms", prefix))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/netutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -100,7 +101,7 @@ common:
assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath)
assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir)
assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory)
assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory)
assert.EqualValues(t, flagext.StringSliceCSV{"/opt/loki/blooms"}, config.StorageConfig.BloomShipperConfig.WorkingDirectory)
})

t.Run("accepts paths both with and without trailing slash", func(t *testing.T) {
Expand All @@ -112,7 +113,7 @@ common:
assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath)
assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir)
assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory)
assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory)
assert.EqualValues(t, flagext.StringSliceCSV{"/opt/loki/blooms"}, config.StorageConfig.BloomShipperConfig.WorkingDirectory)
})

t.Run("does not rewrite custom (non-default) paths passed via config file", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
cfg.StorageConfig = storage.Config{
FSConfig: local.FSConfig{Directory: dir},
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: filepath.Join(dir, "blooms"),
WorkingDirectory: []string{filepath.Join(dir, "blooms")},
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},
Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/util"
)

type CloseableBlockQuerier struct {
Expand All @@ -34,10 +35,14 @@ func (c *CloseableBlockQuerier) SeriesIter() (v1.PeekingIterator[*v1.SeriesWithB
return v1.NewPeekingIter[*v1.SeriesWithBloom](c.BlockQuerier), nil
}

func LoadBlocksDirIntoCache(path string, c Cache, logger log.Logger) error {
level.Debug(logger).Log("msg", "load bloomshipper working directory into cache", "path", path)
keys, values := loadBlockDirectories(path, logger)
return c.PutMany(context.Background(), keys, values)
func LoadBlocksDirIntoCache(paths []string, c Cache, logger log.Logger) error {
var err util.MultiError
for _, path := range paths {
level.Debug(logger).Log("msg", "load bloomshipper working directory into cache", "path", path)
keys, values := loadBlockDirectories(path, logger)
err.Add(c.PutMany(context.Background(), keys, values))
}
return err.Err()
}

func loadBlockDirectories(root string, logger log.Logger) (keys []string, values []BlockDirectory) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {
}
c := NewFsBlocksCache(cfg, nil, log.NewNopLogger())

err := LoadBlocksDirIntoCache(wd, c, logger)
err := LoadBlocksDirIntoCache([]string{wd, t.TempDir()}, c, logger)
require.NoError(t, err)

require.Equal(t, 1, len(c.entries))
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,14 @@ type BloomClient struct {
}

func NewBloomClient(cfg bloomStoreConfig, client client.ObjectClient, logger log.Logger) (*BloomClient, error) {
fsResolver, err := NewShardedPrefixedResolver(cfg.workingDirs, defaultKeyResolver{})
if err != nil {
return nil, errors.Wrap(err, "creating fs resolver")
}

return &BloomClient{
KeyResolver: defaultKeyResolver{}, // TODO(owen-d): hook into schema, similar to `{,Parse}ExternalKey`
fsResolver: NewPrefixedResolver(cfg.workingDir, defaultKeyResolver{}),
fsResolver: fsResolver,
concurrency: cfg.numWorkers,
client: client,
logger: logger,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func newMockBloomClient(t *testing.T) (*BloomClient, string) {
dir := t.TempDir()
logger := log.NewLogfmtLogger(os.Stderr)
cfg := bloomStoreConfig{
workingDir: dir,
numWorkers: 3,
workingDirs: []string{dir},
numWorkers: 3,
}
client, err := NewBloomClient(cfg, oc, logger)
require.NoError(t, err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package config
import (
"errors"
"flag"
"strings"
"time"

"github.com/grafana/dskit/flagext"
Expand All @@ -13,7 +12,7 @@ import (
)

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
WorkingDirectory flagext.StringSliceCSV `yaml:"working_directory"`
MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"`
BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
Expand All @@ -31,7 +30,8 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla
}

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.")
c.WorkingDirectory = []string{"/data/blooms"}
f.Var(&c.WorkingDirectory, prefix+"shipper.working-directory", "Working directory to store downloaded bloom blocks. Supports multiple directories, separated by comma.")
_ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go
f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.")
c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f)
Expand All @@ -40,8 +40,8 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
}

func (c *Config) Validate() error {
if strings.TrimSpace(c.WorkingDirectory) == "" {
return errors.New("working directory must be specified")
if len(c.WorkingDirectory) == 0 {
return errors.New("at least one working directory must be specified")
}
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,16 @@ func NewFetcher(
logger log.Logger,
bloomMetrics *v1.Metrics,
) (*Fetcher, error) {
localFSResolver, err := NewShardedPrefixedResolver(cfg.workingDirs, defaultKeyResolver{})
if err != nil {
return nil, errors.Wrap(err, "creating fs resolver")
}
fetcher := &Fetcher{
cfg: cfg,
client: client,
metasCache: metasCache,
blocksCache: blocksCache,
localFSResolver: NewPrefixedResolver(cfg.workingDir, defaultKeyResolver{}),
localFSResolver: localFSResolver,
metrics: newFetcherMetrics(reg, constants.Loki, "bloom_store"),
bloomMetrics: bloomMetrics,
logger: logger,
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestMetasFetcher(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
metasCache := cache.NewMockCache()
cfg := bloomStoreConfig{workingDir: t.TempDir(), numWorkers: 1}
cfg := bloomStoreConfig{workingDirs: []string{t.TempDir()}, numWorkers: 1}

oc, err := local.NewFSObjectClient(local.FSConfig{Directory: dir})
require.NoError(t, err)
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestFetcher_DownloadQueue(t *testing.T) {

func TestFetcher_LoadBlocksFromFS(t *testing.T) {
base := t.TempDir()
cfg := bloomStoreConfig{workingDir: base, numWorkers: 1}
cfg := bloomStoreConfig{workingDirs: []string{base}, numWorkers: 1}
resolver := NewPrefixedResolver(base, defaultKeyResolver{})

refs := []BlockRef{
Expand Down Expand Up @@ -312,9 +312,13 @@ func createBlockDir(t *testing.T, path string) {
}

func TestFetcher_IsBlockDir(t *testing.T) {
cfg := bloomStoreConfig{numWorkers: 1}
cfg := bloomStoreConfig{
numWorkers: 1,
workingDirs: []string{t.TempDir()},
}

fetcher, _ := NewFetcher(cfg, nil, nil, nil, nil, log.NewNopLogger(), v1.NewMetrics(nil))
fetcher, err := NewFetcher(cfg, nil, nil, nil, nil, log.NewNopLogger(), v1.NewMetrics(nil))
require.NoError(t, err)

t.Run("path does not exist", func(t *testing.T) {
base := t.TempDir()
Expand Down
46 changes: 46 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package bloomshipper

import (
"fmt"
"hash"
"hash/fnv"
"path"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -150,6 +152,50 @@ func (p PrefixedResolver) Block(ref BlockRef) Location {
}
}

type hashable interface {
Hash(hash.Hash32) error
}

type ShardedPrefixedResolver struct {
prefixes []string
KeyResolver
}

func NewShardedPrefixedResolver(prefixes []string, resolver KeyResolver) (KeyResolver, error) {
n := len(prefixes)
switch n {
case 0:
return nil, fmt.Errorf("requires at least 1 prefix")
case 1:
return NewPrefixedResolver(prefixes[0], resolver), nil
default:
return ShardedPrefixedResolver{
prefixes: prefixes,
KeyResolver: resolver,
}, nil
}
}

func (r ShardedPrefixedResolver) prefix(ref hashable) key {
h := fnv.New32()
_ = ref.Hash(h)
return key(r.prefixes[h.Sum32()%uint32(len(r.prefixes))])
}

func (r ShardedPrefixedResolver) Meta(ref MetaRef) Location {
return locations{
r.prefix(ref),
r.KeyResolver.Meta(ref),
}
}

func (r ShardedPrefixedResolver) Block(ref BlockRef) Location {
return locations{
r.prefix(ref),
r.KeyResolver.Block(ref),
}
}

type Location interface {
Addr() string // object storage location
LocalPath() string // local path version
Expand Down
46 changes: 46 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,49 @@ func TestResolver_ParseBlockKey(t *testing.T) {
require.NoError(t, err)
require.Equal(t, ref, parsed)
}

func TestResolver_ShardedPrefixedResolver(t *testing.T) {

blockRef := BlockRef{
Ref: Ref{
TenantID: "tenant",
TableName: "table_1",
Bounds: v1.NewBounds(0x0000, 0xffff),
StartTimestamp: 0,
EndTimestamp: 3600000,
Checksum: 48350,
},
}

metaRef := MetaRef{
Ref: Ref{
TenantID: "tenant",
TableName: "table_1",
Bounds: v1.NewBounds(0x0000, 0xffff),
Checksum: 43981,
},
}

t.Run("empty prefixes cause error", func(t *testing.T) {
_, err := NewShardedPrefixedResolver([]string{}, defaultKeyResolver{})
require.ErrorContains(t, err, "requires at least 1 prefix")
})

t.Run("single prefix", func(t *testing.T) {
r, err := NewShardedPrefixedResolver([]string{"prefix"}, defaultKeyResolver{})
require.NoError(t, err)
loc := r.Meta(metaRef)
require.Equal(t, "prefix/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath())
loc = r.Block(blockRef)
require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath())
})

t.Run("multiple prefixes", func(t *testing.T) {
r, err := NewShardedPrefixedResolver([]string{"a", "b", "c", "d"}, defaultKeyResolver{})
require.NoError(t, err)
loc := r.Meta(metaRef)
require.Equal(t, "b/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath())
loc = r.Block(blockRef)
require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath())
})
}
10 changes: 6 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type StoreWithMetrics interface {
}

type bloomStoreConfig struct {
workingDir string
workingDirs []string
numWorkers int
maxBloomPageSize int
}
Expand Down Expand Up @@ -193,13 +193,15 @@ func NewBloomStore(

// TODO(chaudum): Remove wrapper
cfg := bloomStoreConfig{
workingDir: storageConfig.BloomShipperConfig.WorkingDirectory,
workingDirs: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize),
}

if err := util.EnsureDirectory(cfg.workingDir); err != nil {
return nil, errors.Wrapf(err, "failed to create working directory for bloom store: '%s'", cfg.workingDir)
for _, wd := range cfg.workingDirs {
if err := util.EnsureDirectory(wd); err != nil {
return nil, errors.Wrapf(err, "failed to create working directory for bloom store: '%s'", wd)
}
}

for _, periodicConfig := range periodicConfigs {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/bloomshipper/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*BloomStore, st

storageConfig := storage.Config{
BloomShipperConfig: config.Config{
WorkingDirectory: workDir,
WorkingDirectory: []string{workDir},
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: 1,
},
Expand Down

0 comments on commit 15dc2ba

Please sign in to comment.