Skip to content

Commit

Permalink
Add testcase for BloomStore.FetchBlocks()
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 5, 2024
1 parent 5f211a0 commit 2e399c0
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 28 deletions.
4 changes: 3 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ func (q *downloadQueue[T, R]) do(ctx context.Context, task downloadTask[T, R]) {
q.mu.LockKey(task.key)
defer func() {
err := q.mu.UnlockKey(task.key)
level.Error(q.logger).Log("msg", "failed to unlock key in block lock", "err", err)
if err != nil {
level.Error(q.logger).Log("msg", "failed to unlock key in block lock", "key", task.key, "err", err)
}
}()

q.process(ctx, task)
Expand Down
95 changes: 68 additions & 27 deletions pkg/storage/stores/shipper/bloomshipper/store_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package bloomshipper

import (
"archive/tar"
"bytes"
"context"
"encoding/json"
"io"
"os"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
storageconfig "github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -53,14 +51,24 @@ func newMockBloomStore(t *testing.T) (*BloomStore, string) {
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: 1,
},
BlocksCache: config.BlocksCacheConfig{
EmbeddedCacheConfig: cache.EmbeddedCacheConfig{
MaxSizeItems: 1000,
TTL: 1 * time.Hour,
},
},
},
}

metrics := storage.NewClientMetrics()
t.Cleanup(metrics.Unregister)
logger := log.NewLogfmtLogger(os.Stderr)
store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, cache.NewNoopCache(), nil, logger)

metasCache := cache.NewMockCache()
blocksCache := NewBlocksCache(storageConfig.BloomShipperConfig, prometheus.NewPedanticRegistry(), logger)
store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, logger)
require.NoError(t, err)
t.Cleanup(store.Stop)

return store, workDir
}
Expand All @@ -87,6 +95,37 @@ func createMetaInStorage(store *BloomStore, tenant string, start model.Time, min
return meta, err
}

func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) {
tmpDir := t.TempDir()
fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz")

blockWriter := v1.NewDirectoryBlockWriter(tmpDir)
err := blockWriter.Init()
require.NoError(t, err)

err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)

_, _ = fp.Seek(0, 0)

block := Block{
BlockRef: BlockRef{
Ref: Ref{
TenantID: tenant,
Bounds: v1.NewBounds(minFp, maxFp),
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
},
Data: fp,
}
err = store.storeDo(start, func(s *bloomStoreEntry) error {
block.BlockRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0]
return s.objectClient.PutObject(context.Background(), s.Block(block.BlockRef).Addr(), block.Data)
})
return block, err
}

func TestBloomStore_ResolveMetas(t *testing.T) {
store, _ := newMockBloomStore(t)

Expand Down Expand Up @@ -195,32 +234,34 @@ func TestBloomStore_FetchMetas(t *testing.T) {
})
}

func TestBloomStore_FetchBlocks(t *testing.T) {}

func TestBloomStore_Fetcher(t *testing.T) {}

func TarGz(t *testing.T, dst io.Writer, file string) {
src, err := os.Open(file)
require.NoError(t, err)
defer src.Close()

gzipper := chunkenc.GetWriterPool(chunkenc.EncGZIP).GetWriter(dst)
defer gzipper.Close()
func TestBloomStore_FetchBlocks(t *testing.T) {
store, _ := newMockBloomStore(t)

tarballer := tar.NewWriter(gzipper)
defer tarballer.Close()
// schema 1
b1, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
b2, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
// schema 2
b3, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
b4, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0001ffff)

for _, f := range []*os.File{src} {
info, err := f.Stat()
require.NoError(t, err)
ctx := context.Background()

header, err := tar.FileInfoHeader(info, f.Name())
require.NoError(t, err)
// first call fetches two blocks from cache
blockDirs, err := store.FetchBlocks(ctx, []BlockRef{b1.BlockRef, b3.BlockRef})
require.NoError(t, err)
require.Len(t, blockDirs, 2)

err = tarballer.WriteHeader(header)
require.NoError(t, err)
require.ElementsMatch(t, []BlockRef{b1.BlockRef, b3.BlockRef}, []BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef})

_, err = io.Copy(tarballer, f)
require.NoError(t, err)
}
// second call fetches two blocks from cache and two from storage
blockDirs, err = store.FetchBlocks(ctx, []BlockRef{b1.BlockRef, b2.BlockRef, b3.BlockRef, b4.BlockRef})
require.NoError(t, err)
require.Len(t, blockDirs, 4)

// Note the order: b1 and b2 come from cache, so they are in the beginning of the response
// Do we need to sort the response based on the request order of block refs?
require.ElementsMatch(t,
[]BlockRef{b1.BlockRef, b3.BlockRef, b2.BlockRef, b4.BlockRef},
[]BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef, blockDirs[2].BlockRef, blockDirs[3].BlockRef},
)
}

0 comments on commit 2e399c0

Please sign in to comment.