From 56b539691f55e1f827e89949f01aaf92847602ac Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 22 Feb 2024 10:41:39 +0100 Subject: [PATCH] fix: Ensure working dir for bloomstore exists (#12019) Fail startup if directory does not exist or there are not enough permissions. This prevents the bloomstore to fail later in the process once it tries to download and extract blocks. Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway_test.go | 7 +++ pkg/loki/config_wrapper.go | 2 +- pkg/loki/config_wrapper_test.go | 2 + pkg/loki/modules_test.go | 11 ++-- pkg/storage/chunk/client/util/util.go | 2 + .../shipper/bloomshipper/shipper_test.go | 2 +- .../stores/shipper/bloomshipper/store.go | 7 ++- .../stores/shipper/bloomshipper/store_test.go | 53 ++++++++++++++++--- 8 files changed, 72 insertions(+), 14 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index fede86484a96b..9a4dea08dba26 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -26,6 +26,7 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" + bloomshipperconfig "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" lokiring "github.com/grafana/loki/pkg/util/ring" "github.com/grafana/loki/pkg/validation" ) @@ -70,6 +71,9 @@ func TestBloomGateway_StartStopService(t *testing.T) { Configs: []config.PeriodConfig{p}, } storageCfg := storage.Config{ + BloomShipperConfig: bloomshipperconfig.Config{ + WorkingDirectory: t.TempDir(), + }, FSConfig: local.FSConfig{ Directory: t.TempDir(), }, @@ -136,6 +140,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { Configs: []config.PeriodConfig{p}, } storageCfg := storage.Config{ + BloomShipperConfig: bloomshipperconfig.Config{ + WorkingDirectory: t.TempDir(), + }, FSConfig: local.FSConfig{ Directory: t.TempDir(), }, diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 1914c8ab3edfc..f76e0f75da9f7 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -407,7 +407,7 @@ func applyPathPrefixDefaults(r, defaults *ConfigWrapper) { r.CompactorConfig.WorkingDirectory = fmt.Sprintf("%s/compactor", prefix) } if r.StorageConfig.BloomShipperConfig.WorkingDirectory == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory { - r.StorageConfig.BloomShipperConfig.WorkingDirectory = fmt.Sprintf("%s/bloom-shipper", prefix) + r.StorageConfig.BloomShipperConfig.WorkingDirectory = fmt.Sprintf("%s/blooms", prefix) } } } diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 3b1237dad4d1d..60c9223732d05 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -100,6 +100,7 @@ common: assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath) assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir) assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory) + assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory) }) t.Run("accepts paths both with and without trailing slash", func(t *testing.T) { @@ -111,6 +112,7 @@ common: assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath) assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir) assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory) + assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory) }) t.Run("does not rewrite custom (non-default) paths passed via config file", func(t *testing.T) { diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index 0d07242b75370..047ba5f838a52 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -2,7 +2,6 @@ package loki import ( "fmt" - "path" "path/filepath" "testing" "time" @@ -17,6 +16,7 @@ import ( "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" + bloomshipperconfig "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/boltdb" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/indexgateway" @@ -366,10 +366,13 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f // This would be overwritten by the default values setting. cfg.StorageConfig = storage.Config{ FSConfig: local.FSConfig{Directory: dir}, + BloomShipperConfig: bloomshipperconfig.Config{ + WorkingDirectory: filepath.Join(dir, "blooms"), + }, BoltDBShipperConfig: boltdb.IndexCfg{ Config: indexshipper.Config{ - ActiveIndexDirectory: path.Join(dir, "index"), - CacheLocation: path.Join(dir, "cache"), + ActiveIndexDirectory: filepath.Join(dir, "index"), + CacheLocation: filepath.Join(dir, "cache"), Mode: indexshipper.ModeWriteOnly, ResyncInterval: 24 * time.Hour, }, @@ -402,7 +405,7 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f cfg.BloomCompactor.Ring.InstanceAddr = localhost cfg.BloomGateway.Ring.InstanceAddr = localhost cfg.CompactorConfig.CompactorRing.InstanceAddr = localhost - cfg.CompactorConfig.WorkingDirectory = path.Join(dir, "compactor") + cfg.CompactorConfig.WorkingDirectory = filepath.Join(dir, "compactor") cfg.Ruler.Config.Ring.InstanceAddr = localhost cfg.Ruler.Config.StoreConfig.Type = config.StorageTypeLocal diff --git a/pkg/storage/chunk/client/util/util.go b/pkg/storage/chunk/client/util/util.go index 10237cc456da5..e49fad20136fb 100644 --- a/pkg/storage/chunk/client/util/util.go +++ b/pkg/storage/chunk/client/util/util.go @@ -72,6 +72,8 @@ func EnsureDirectory(dir string) error { return os.MkdirAll(dir, 0o777) } else if err == nil && !info.IsDir() { return fmt.Errorf("not a directory: %s", dir) + } else if err == nil && info.Mode()&0700 != 0700 { + return fmt.Errorf("insufficient permissions: %s %s", dir, info.Mode()) } return err } diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index e03d72c26ba37..86e8ed90a174c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -142,7 +142,7 @@ func TestBloomShipper_IsOutsideRange(t *testing.T) { func TestBloomShipper_ForEach(t *testing.T) { blockRefs := make([]BlockRef, 0, 3) - store, _ := newMockBloomStore(t) + store, _, _ := newMockBloomStore(t) for i := 0; i < len(blockRefs); i++ { block, err := createBlockInStorage(t, store, "tenant", model.Time(i*24*int(time.Hour)), 0x0000, 0x00ff) require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index d5cfa24b11ed5..56bfb3ebe97ab 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -15,6 +15,7 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" ) @@ -172,6 +173,10 @@ func NewBloomStore( numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount, } + if err := util.EnsureDirectory(cfg.workingDir); err != nil { + return nil, errors.Wrapf(err, "failed to create working directory for bloom store: '%s'", cfg.workingDir) + } + for _, periodicConfig := range periodicConfigs { objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics) if err != nil { @@ -323,10 +328,10 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*Clo results := make([]*CloseableBlockQuerier, 0, len(blocks)) for i := range fetchers { res, err := fetchers[i].FetchBlocks(ctx, refs[i]) - results = append(results, res...) if err != nil { return results, err } + results = append(results, res...) } // sort responses (results []*CloseableBlockQuerier) based on requests (blocks []BlockRef) diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index 59d8eee464053..48ab81cc45027 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "os" + "path/filepath" "testing" "time" @@ -20,9 +21,12 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) -func newMockBloomStore(t *testing.T) (*BloomStore, string) { +func newMockBloomStore(t *testing.T) (*BloomStore, string, error) { workDir := t.TempDir() + return newMockBloomStoreWithWorkDir(t, workDir) +} +func newMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*BloomStore, string, error) { periodicConfigs := []storageconfig.PeriodConfig{ { ObjectType: storageconfig.StorageTypeInMemory, @@ -63,11 +67,13 @@ func newMockBloomStore(t *testing.T) (*BloomStore, string) { metasCache := cache.NewMockCache() blocksCache := NewBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger) + store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, logger) - require.NoError(t, err) - t.Cleanup(store.Stop) + if err == nil { + t.Cleanup(store.Stop) + } - return store, workDir + return store, workDir, err } func createMetaInStorage(store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) { @@ -123,7 +129,8 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start } func TestBloomStore_ResolveMetas(t *testing.T) { - store, _ := newMockBloomStore(t) + store, _, err := newMockBloomStore(t) + require.NoError(t, err) // schema 1 // outside of interval, outside of bounds @@ -178,7 +185,8 @@ func TestBloomStore_ResolveMetas(t *testing.T) { } func TestBloomStore_FetchMetas(t *testing.T) { - store, _ := newMockBloomStore(t) + store, _, err := newMockBloomStore(t) + require.NoError(t, err) // schema 1 // outside of interval, outside of bounds @@ -231,7 +239,8 @@ func TestBloomStore_FetchMetas(t *testing.T) { } func TestBloomStore_FetchBlocks(t *testing.T) { - store, _ := newMockBloomStore(t) + store, _, err := newMockBloomStore(t) + require.NoError(t, err) // schema 1 b1, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) @@ -259,3 +268,33 @@ func TestBloomStore_FetchBlocks(t *testing.T) { []BlockRef{bqs[0].BlockRef, bqs[1].BlockRef, bqs[2].BlockRef, bqs[3].BlockRef}, ) } + +func TestBloomShipper_WorkingDir(t *testing.T) { + t.Run("insufficient permissions on directory yields error", func(t *testing.T) { + base := t.TempDir() + wd := filepath.Join(base, "notpermitted") + err := os.MkdirAll(wd, 0500) + require.NoError(t, err) + fi, _ := os.Stat(wd) + t.Log("working directory", wd, fi.Mode()) + + _, _, err = newMockBloomStoreWithWorkDir(t, wd) + require.ErrorContains(t, err, "insufficient permissions") + }) + + t.Run("not existing directory will be created", func(t *testing.T) { + base := t.TempDir() + // if the base directory does not exist, it will be created + wd := filepath.Join(base, "doesnotexist") + t.Log("working directory", wd) + + store, _, err := newMockBloomStoreWithWorkDir(t, wd) + require.NoError(t, err) + b, err := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + require.NoError(t, err) + + ctx := context.Background() + _, err = store.FetchBlocks(ctx, []BlockRef{b.BlockRef}) + require.NoError(t, err) + }) +}