diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index fede86484a96b..9a4dea08dba26 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -26,6 +26,7 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" + bloomshipperconfig "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" lokiring "github.com/grafana/loki/pkg/util/ring" "github.com/grafana/loki/pkg/validation" ) @@ -70,6 +71,9 @@ func TestBloomGateway_StartStopService(t *testing.T) { Configs: []config.PeriodConfig{p}, } storageCfg := storage.Config{ + BloomShipperConfig: bloomshipperconfig.Config{ + WorkingDirectory: t.TempDir(), + }, FSConfig: local.FSConfig{ Directory: t.TempDir(), }, @@ -136,6 +140,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { Configs: []config.PeriodConfig{p}, } storageCfg := storage.Config{ + BloomShipperConfig: bloomshipperconfig.Config{ + WorkingDirectory: t.TempDir(), + }, FSConfig: local.FSConfig{ Directory: t.TempDir(), }, diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 1914c8ab3edfc..f76e0f75da9f7 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -407,7 +407,7 @@ func applyPathPrefixDefaults(r, defaults *ConfigWrapper) { r.CompactorConfig.WorkingDirectory = fmt.Sprintf("%s/compactor", prefix) } if r.StorageConfig.BloomShipperConfig.WorkingDirectory == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory { - r.StorageConfig.BloomShipperConfig.WorkingDirectory = fmt.Sprintf("%s/bloom-shipper", prefix) + r.StorageConfig.BloomShipperConfig.WorkingDirectory = fmt.Sprintf("%s/blooms", prefix) } } } diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 3b1237dad4d1d..60c9223732d05 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -100,6 +100,7 @@ common: assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath) assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir) assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory) + assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory) }) t.Run("accepts paths both with and without trailing slash", func(t *testing.T) { @@ -111,6 +112,7 @@ common: assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath) assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir) assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory) + assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory) }) t.Run("does not rewrite custom (non-default) paths passed via config file", func(t *testing.T) { diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index 0d07242b75370..047ba5f838a52 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -2,7 +2,6 @@ package loki import ( "fmt" - "path" "path/filepath" "testing" "time" @@ -17,6 +16,7 @@ import ( "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" + bloomshipperconfig "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/boltdb" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/indexgateway" @@ -366,10 +366,13 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f // This would be overwritten by the default values setting. cfg.StorageConfig = storage.Config{ FSConfig: local.FSConfig{Directory: dir}, + BloomShipperConfig: bloomshipperconfig.Config{ + WorkingDirectory: filepath.Join(dir, "blooms"), + }, BoltDBShipperConfig: boltdb.IndexCfg{ Config: indexshipper.Config{ - ActiveIndexDirectory: path.Join(dir, "index"), - CacheLocation: path.Join(dir, "cache"), + ActiveIndexDirectory: filepath.Join(dir, "index"), + CacheLocation: filepath.Join(dir, "cache"), Mode: indexshipper.ModeWriteOnly, ResyncInterval: 24 * time.Hour, }, @@ -402,7 +405,7 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f cfg.BloomCompactor.Ring.InstanceAddr = localhost cfg.BloomGateway.Ring.InstanceAddr = localhost cfg.CompactorConfig.CompactorRing.InstanceAddr = localhost - cfg.CompactorConfig.WorkingDirectory = path.Join(dir, "compactor") + cfg.CompactorConfig.WorkingDirectory = filepath.Join(dir, "compactor") cfg.Ruler.Config.Ring.InstanceAddr = localhost cfg.Ruler.Config.StoreConfig.Type = config.StorageTypeLocal diff --git a/pkg/storage/chunk/client/util/util.go b/pkg/storage/chunk/client/util/util.go index 10237cc456da5..e49fad20136fb 100644 --- a/pkg/storage/chunk/client/util/util.go +++ b/pkg/storage/chunk/client/util/util.go @@ -72,6 +72,8 @@ func EnsureDirectory(dir string) error { return os.MkdirAll(dir, 0o777) } else if err == nil && !info.IsDir() { return fmt.Errorf("not a directory: %s", dir) + } else if err == nil && info.Mode()&0700 != 0700 { + return fmt.Errorf("insufficient permissions: %s %s", dir, info.Mode()) } return err } diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index e03d72c26ba37..86e8ed90a174c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -142,7 +142,7 @@ func TestBloomShipper_IsOutsideRange(t *testing.T) { func TestBloomShipper_ForEach(t *testing.T) { blockRefs := make([]BlockRef, 0, 3) - store, _ := newMockBloomStore(t) + store, _, _ := newMockBloomStore(t) for i := 0; i < len(blockRefs); i++ { block, err := createBlockInStorage(t, store, "tenant", model.Time(i*24*int(time.Hour)), 0x0000, 0x00ff) require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index d5cfa24b11ed5..56bfb3ebe97ab 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -15,6 +15,7 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" ) @@ -172,6 +173,10 @@ func NewBloomStore( numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount, } + if err := util.EnsureDirectory(cfg.workingDir); err != nil { + return nil, errors.Wrapf(err, "failed to create working directory for bloom store: '%s'", cfg.workingDir) + } + for _, periodicConfig := range periodicConfigs { objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics) if err != nil { @@ -323,10 +328,10 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*Clo results := make([]*CloseableBlockQuerier, 0, len(blocks)) for i := range fetchers { res, err := fetchers[i].FetchBlocks(ctx, refs[i]) - results = append(results, res...) if err != nil { return results, err } + results = append(results, res...) } // sort responses (results []*CloseableBlockQuerier) based on requests (blocks []BlockRef) diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index 59d8eee464053..48ab81cc45027 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "os" + "path/filepath" "testing" "time" @@ -20,9 +21,12 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) -func newMockBloomStore(t *testing.T) (*BloomStore, string) { +func newMockBloomStore(t *testing.T) (*BloomStore, string, error) { workDir := t.TempDir() + return newMockBloomStoreWithWorkDir(t, workDir) +} +func newMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*BloomStore, string, error) { periodicConfigs := []storageconfig.PeriodConfig{ { ObjectType: storageconfig.StorageTypeInMemory, @@ -63,11 +67,13 @@ func newMockBloomStore(t *testing.T) (*BloomStore, string) { metasCache := cache.NewMockCache() blocksCache := NewBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger) + store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, logger) - require.NoError(t, err) - t.Cleanup(store.Stop) + if err == nil { + t.Cleanup(store.Stop) + } - return store, workDir + return store, workDir, err } func createMetaInStorage(store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) { @@ -123,7 +129,8 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start } func TestBloomStore_ResolveMetas(t *testing.T) { - store, _ := newMockBloomStore(t) + store, _, err := newMockBloomStore(t) + require.NoError(t, err) // schema 1 // outside of interval, outside of bounds @@ -178,7 +185,8 @@ func TestBloomStore_ResolveMetas(t *testing.T) { } func TestBloomStore_FetchMetas(t *testing.T) { - store, _ := newMockBloomStore(t) + store, _, err := newMockBloomStore(t) + require.NoError(t, err) // schema 1 // outside of interval, outside of bounds @@ -231,7 +239,8 @@ func TestBloomStore_FetchMetas(t *testing.T) { } func TestBloomStore_FetchBlocks(t *testing.T) { - store, _ := newMockBloomStore(t) + store, _, err := newMockBloomStore(t) + require.NoError(t, err) // schema 1 b1, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) @@ -259,3 +268,33 @@ func TestBloomStore_FetchBlocks(t *testing.T) { []BlockRef{bqs[0].BlockRef, bqs[1].BlockRef, bqs[2].BlockRef, bqs[3].BlockRef}, ) } + +func TestBloomShipper_WorkingDir(t *testing.T) { + t.Run("insufficient permissions on directory yields error", func(t *testing.T) { + base := t.TempDir() + wd := filepath.Join(base, "notpermitted") + err := os.MkdirAll(wd, 0500) + require.NoError(t, err) + fi, _ := os.Stat(wd) + t.Log("working directory", wd, fi.Mode()) + + _, _, err = newMockBloomStoreWithWorkDir(t, wd) + require.ErrorContains(t, err, "insufficient permissions") + }) + + t.Run("not existing directory will be created", func(t *testing.T) { + base := t.TempDir() + // if the base directory does not exist, it will be created + wd := filepath.Join(base, "doesnotexist") + t.Log("working directory", wd) + + store, _, err := newMockBloomStoreWithWorkDir(t, wd) + require.NoError(t, err) + b, err := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + require.NoError(t, err) + + ctx := context.Background() + _, err = store.FetchBlocks(ctx, []BlockRef{b.BlockRef}) + require.NoError(t, err) + }) +}