Skip to content

Commit

Permalink
Avoid reading block files into memory (grafana#11447)
Browse files Browse the repository at this point in the history
Bloom blocks in the wild likely exceed the memory resources of a
compactor. Therefore we need to make sure that we don't copy block data
into memory, but rather write to and read from disk.

Signed-off-by: Christian Haudum <[email protected]>
Co-authored-by: Paul Rogers <[email protected]>
  • Loading branch information
2 people authored and rhnasc committed Apr 12, 2024
1 parent e1b2020 commit 355c564
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 48 deletions.
78 changes: 61 additions & 17 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type chunkClient interface {

type blockBuilder interface {
BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error)
Data() (io.ReadCloser, error)
Data() (io.ReadSeekCloser, error)
}

type PersistentBlockBuilder struct {
Expand All @@ -55,7 +55,7 @@ func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom])
return p.builder.BuildFrom(itr)
}

func (p *PersistentBlockBuilder) Data() (io.ReadCloser, error) {
func (p *PersistentBlockBuilder) Data() (io.ReadSeekCloser, error) {
blockFile, err := os.Open(filepath.Join(p.localDst, v1.BloomFileName))
if err != nil {
return nil, err
Expand Down Expand Up @@ -101,15 +101,15 @@ func buildBlockFromBlooms(
ctx context.Context,
logger log.Logger,
builder blockBuilder,
blooms []v1.SeriesWithBloom,
blooms v1.Iterator[v1.SeriesWithBloom],
job Job,
) (bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
}

checksum, err := builder.BuildFrom(v1.NewSliceIter(blooms))
checksum, err := builder.BuildFrom(blooms)
if err != nil {
level.Error(logger).Log("msg", "failed writing to bloom", "err", err)
return bloomshipper.Block{}, err
Expand Down Expand Up @@ -160,25 +160,69 @@ func compactNewChunks(
return bloomshipper.Block{}, err
}

blooms := make([]v1.SeriesWithBloom, len(job.seriesMetas))

for _, seriesMeta := range job.seriesMetas {
// Get chunks data from list of chunkRefs
chks, err := storeClient.GetChunks(ctx, makeChunkRefs(seriesMeta.chunkRefs, job.tenantID, seriesMeta.seriesFP))
if err != nil {
return bloomshipper.Block{}, err
}

bloom := buildBloomFromSeries(seriesMeta, fpRate, bt, chks)
blooms = append(blooms, bloom)
}
bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, fpRate)

// Build and upload bloomBlock to storage
block, err := buildBlockFromBlooms(ctx, logger, builder, blooms, job)
block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job)
if err != nil {
level.Error(logger).Log("msg", "building bloomBlocks", "err", err)
return bloomshipper.Block{}, err
}

return block, nil
}

type lazyBloomBuilder struct {
ctx context.Context
metas v1.Iterator[seriesMeta]
tenant string
client chunkClient
bt compactorTokenizer
fpRate float64

cur v1.SeriesWithBloom // retured by At()
err error // returned by Err()
}

// newLazyBloomBuilder returns an iterator that yields v1.SeriesWithBloom
// which are used by the blockBuilder to write a bloom block.
// We use an interator to avoid loading all blooms into memory first, before
// building the block.
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, fpRate float64) *lazyBloomBuilder {
return &lazyBloomBuilder{
ctx: ctx,
metas: v1.NewSliceIter(job.seriesMetas),
client: client,
tenant: job.tenantID,
bt: bt,
fpRate: fpRate,
}
}

func (it *lazyBloomBuilder) Next() bool {
if !it.metas.Next() {
it.err = io.EOF
it.cur = v1.SeriesWithBloom{}
return false
}
meta := it.metas.At()

// Get chunks data from list of chunkRefs
chks, err := it.client.GetChunks(it.ctx, makeChunkRefs(meta.chunkRefs, it.tenant, meta.seriesFP))
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
return false
}

it.cur = buildBloomFromSeries(meta, it.fpRate, it.bt, chks)
return true
}

func (it *lazyBloomBuilder) At() v1.SeriesWithBloom {
return it.cur
}

func (it *lazyBloomBuilder) Err() error {
return it.err
}
80 changes: 76 additions & 4 deletions pkg/bloomcompactor/chunkcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,90 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) {
require.Equal(t, indexPath, compactedBlock.IndexPath)
}

func TestLazyBloomBuilder(t *testing.T) {
label := labels.FromStrings("foo", "bar")
fp1 := model.Fingerprint(100)
fp2 := model.Fingerprint(999)
fp3 := model.Fingerprint(200)

chunkRef1 := index.ChunkMeta{
Checksum: 1,
MinTime: 1,
MaxTime: 99,
}

chunkRef2 := index.ChunkMeta{
Checksum: 2,
MinTime: 10,
MaxTime: 999,
}

seriesMetas := []seriesMeta{
{
seriesFP: fp1,
seriesLbs: label,
chunkRefs: []index.ChunkMeta{chunkRef1},
},
{
seriesFP: fp2,
seriesLbs: label,
chunkRefs: []index.ChunkMeta{chunkRef1, chunkRef2},
},
{
seriesFP: fp3,
seriesLbs: label,
chunkRefs: []index.ChunkMeta{chunkRef1, chunkRef1, chunkRef2},
},
}

job := NewJob(userID, table, indexPath, seriesMetas)

mbt := &mockBloomTokenizer{}
mcc := &mockChunkClient{}

it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, fpRate)

// first seriesMeta has 1 chunks
require.True(t, it.Next())
require.Equal(t, 1, mcc.requestCount)
require.Equal(t, 1, mcc.chunkCount)
require.Equal(t, fp1, it.At().Series.Fingerprint)

// first seriesMeta has 2 chunks
require.True(t, it.Next())
require.Equal(t, 2, mcc.requestCount)
require.Equal(t, 3, mcc.chunkCount)
require.Equal(t, fp2, it.At().Series.Fingerprint)

// first seriesMeta has 3 chunks
require.True(t, it.Next())
require.Equal(t, 3, mcc.requestCount)
require.Equal(t, 6, mcc.chunkCount)
require.Equal(t, fp3, it.At().Series.Fingerprint)

// interator is done
require.False(t, it.Next())
require.Error(t, io.EOF, it.Err())
require.Equal(t, v1.SeriesWithBloom{}, it.At())
}

type mockBloomTokenizer struct {
chunks []chunk.Chunk
}

func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c []chunk.Chunk) error {
mbt.chunks = c
mbt.chunks = append(mbt.chunks, c...)
return nil
}

type mockChunkClient struct{}
type mockChunkClient struct {
requestCount int
chunkCount int
}

func (mcc *mockChunkClient) GetChunks(_ context.Context, _ []chunk.Chunk) ([]chunk.Chunk, error) {
func (mcc *mockChunkClient) GetChunks(_ context.Context, chks []chunk.Chunk) ([]chunk.Chunk, error) {
mcc.requestCount++
mcc.chunkCount += len(chks)
return nil, nil
}

Expand All @@ -147,6 +219,6 @@ func (pbb *mockPersistentBlockBuilder) BuildFrom(_ v1.Iterator[v1.SeriesWithBloo
return 0, nil
}

func (pbb *mockPersistentBlockBuilder) Data() (io.ReadCloser, error) {
func (pbb *mockPersistentBlockBuilder) Data() (io.ReadSeekCloser, error) {
return nil, nil
}
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ type blockWithQuerier struct {
}

// extract the files into directory and returns absolute path to this directory.
func (d *blockDownloader) extractBlock(block *Block, ts time.Time) (string, error) {
func (d *blockDownloader) extractBlock(block *LazyBlock, ts time.Time) (string, error) {
workingDirectoryPath := filepath.Join(d.workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
err := os.MkdirAll(workingDirectoryPath, os.ModePerm)
if err != nil {
Expand Down Expand Up @@ -213,7 +213,7 @@ func (d *blockDownloader) stop() {
d.wg.Wait()
}

func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) {
func writeDataToTempFile(workingDirectoryPath string, block *LazyBlock) (string, error) {
defer block.Data.Close()
archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:])

Expand Down
17 changes: 13 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
// creates fake blocks and returns map[block-path]Block and mockBlockClient
func createFakeBlocks(t *testing.T, count int) ([]BlockRef, *mockBlockClient) {
mockData := make(map[string]Block, count)
mockLazyData := make(map[string]LazyBlock, count)
refs := make([]BlockRef, 0, count)
for i := 0; i < count; i++ {
archive, _, _ := createBlockArchive(t)
Expand All @@ -79,20 +80,28 @@ func createFakeBlocks(t *testing.T, count int) ([]BlockRef, *mockBlockClient) {
},
Data: archive,
}
lazyBlock := LazyBlock{
BlockRef: BlockRef{
BlockPath: fmt.Sprintf("block-path-%d", i),
},
Data: archive,
}
mockData[block.BlockPath] = block
mockLazyData[block.BlockPath] = lazyBlock
refs = append(refs, block.BlockRef)
}
return refs, &mockBlockClient{mockData: mockData}
return refs, &mockBlockClient{mockData: mockData, mockLazyData: mockLazyData}
}

type mockBlockClient struct {
responseDelay time.Duration
mockData map[string]Block
mockLazyData map[string]LazyBlock
}

func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (Block, error) {
func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (LazyBlock, error) {
time.Sleep(m.responseDelay)
block, exists := m.mockData[reference.BlockPath]
block, exists := m.mockLazyData[reference.BlockPath]
if exists {
return block, nil
}
Expand All @@ -114,7 +123,7 @@ func Test_blockDownloader_extractBlock(t *testing.T) {
workingDir := t.TempDir()
downloader := &blockDownloader{workingDirectory: workingDir}
ts := time.Now().UTC()
block := Block{
block := LazyBlock{
BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"},
Data: blockFile,
}
Expand Down
39 changes: 18 additions & 21 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,18 @@ type MetaClient interface {
DeleteMeta(ctx context.Context, meta Meta) error
}

type Block struct {
type LazyBlock struct {
BlockRef

Data io.ReadCloser
}

type Block struct {
BlockRef
Data io.ReadSeekCloser
}

type BlockClient interface {
GetBlock(ctx context.Context, reference BlockRef) (Block, error)
GetBlock(ctx context.Context, reference BlockRef) (LazyBlock, error)
PutBlocks(ctx context.Context, blocks []Block) ([]Block, error)
DeleteBlocks(ctx context.Context, blocks []BlockRef) error
}
Expand Down Expand Up @@ -191,27 +195,28 @@ func findPeriod(configs []config.PeriodConfig, timestamp int64) (config.DayTime,
}
return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", timestamp)
}

func (b *BloomClient) DeleteMeta(ctx context.Context, meta Meta) error {
periodFrom, err := findPeriod(b.periodicConfigs, meta.StartTimestamp)
if err != nil {
return fmt.Errorf("error updloading meta file: %w", err)
return err
}
key := createMetaObjectKey(meta.MetaRef.Ref)
return b.periodicObjectClients[periodFrom].DeleteObject(ctx, key)
}

// GetBlock downloads the blocks from objectStorage and returns the downloaded block
func (b *BloomClient) GetBlock(ctx context.Context, reference BlockRef) (Block, error) {
func (b *BloomClient) GetBlock(ctx context.Context, reference BlockRef) (LazyBlock, error) {
period, err := findPeriod(b.periodicConfigs, reference.StartTimestamp)
if err != nil {
return Block{}, fmt.Errorf("error while period lookup: %w", err)
return LazyBlock{}, fmt.Errorf("error while period lookup: %w", err)
}
objectClient := b.periodicObjectClients[period]
readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref))
if err != nil {
return Block{}, fmt.Errorf("error while fetching object from storage: %w", err)
return LazyBlock{}, fmt.Errorf("error while fetching object from storage: %w", err)
}
return Block{
return LazyBlock{
BlockRef: reference,
Data: readCloser,
}, nil
Expand All @@ -228,17 +233,13 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e

period, err := findPeriod(b.periodicConfigs, block.StartTimestamp)
if err != nil {
return fmt.Errorf("error updloading block file: %w", err)
return fmt.Errorf("error uploading block file: %w", err)
}
key := createBlockObjectKey(block.Ref)
objectClient := b.periodicObjectClients[period]
data, err := io.ReadAll(block.Data)
if err != nil {
return fmt.Errorf("error while reading object data: %w", err)
}
err = objectClient.PutObject(ctx, key, bytes.NewReader(data))
err = objectClient.PutObject(ctx, key, block.Data)
if err != nil {
return fmt.Errorf("error updloading block file: %w", err)
return fmt.Errorf("error uploading block file: %w", err)
}
block.BlockPath = key
results[idx] = block
Expand Down Expand Up @@ -279,13 +280,9 @@ func (b *BloomClient) downloadMeta(ctx context.Context, metaRef MetaRef, client
if err != nil {
return Meta{}, fmt.Errorf("error downloading meta file %s : %w", metaRef.FilePath, err)
}
defer func() { _ = reader.Close() }()
defer reader.Close()

buf, err := io.ReadAll(reader)
if err != nil {
return Meta{}, fmt.Errorf("error reading meta file %s: %w", metaRef.FilePath, err)
}
err = json.Unmarshal(buf, &meta)
err = json.NewDecoder(reader).Decode(&meta)
if err != nil {
return Meta{}, fmt.Errorf("error unmarshalling content of meta file %s: %w", metaRef.FilePath, err)
}
Expand Down

0 comments on commit 355c564

Please sign in to comment.