Skip to content

Commit

Permalink
fix: Extract block directory directly from gzipped tar reader (#12021)
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Feb 21, 2024
1 parent 6b327b6 commit 9ef3b11
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 92 deletions.
15 changes: 1 addition & 14 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ func TestBlockDirectory_Cleanup(t *testing.T) {
tc := tc
t.Run(name, func(t *testing.T) {
extractedBlockDirectory := t.TempDir()
blockFilePath, _, _, _ := createBlockArchive(t)
err := extractArchive(blockFilePath, extractedBlockDirectory)
require.NoError(t, err)
require.DirExists(t, extractedBlockDirectory)

blockDir := BlockDirectory{
Expand Down Expand Up @@ -61,20 +58,10 @@ func TestBlockDirectory_Cleanup(t *testing.T) {
}

func Test_ClosableBlockQuerier(t *testing.T) {
blockFilePath, _, _, _ := createBlockArchive(t)
extractedBlockDirectory := t.TempDir()
err := extractArchive(blockFilePath, extractedBlockDirectory)
require.NoError(t, err)

blockDir := BlockDirectory{
Path: extractedBlockDirectory,
removeDirectoryTimeout: 100 * time.Millisecond,
refCount: atomic.NewInt32(0),
}
blockDir := NewBlockDirectory(BlockRef{}, t.TempDir(), log.NewNopLogger())

querier := blockDir.BlockQuerier()
require.Equal(t, int32(1), blockDir.refCount.Load())
require.NoError(t, querier.Close())
require.Equal(t, int32(0), blockDir.refCount.Load())

}
17 changes: 13 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/util/encoding"
Expand Down Expand Up @@ -264,18 +265,26 @@ func (b *BloomClient) DeleteMetas(ctx context.Context, refs []MetaRef) error {
return err
}

// GetBlock downloads the blocks from objectStorage and returns the downloaded block
// GetBlock downloads the blocks from objectStorage and returns the directory
// in which the block data resides
func (b *BloomClient) GetBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error) {
key := b.Block(ref).Addr()
readCloser, _, err := b.client.GetObject(ctx, key)

rc, _, err := b.client.GetObject(ctx, key)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to get block from storage: %w", err)
}
defer rc.Close()

path := b.fsResolver.Block(ref).LocalPath()
err = extractBlock(readCloser, path, b.logger)
err = util.EnsureDirectory(path)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to create block directory: %w", err)
}

err = v1.UnTarGz(path, rc)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to extract block into directory : %w", err)
return BlockDirectory{}, fmt.Errorf("failed to extract block: %w", err)
}

return NewBlockDirectory(ref, path, b.logger), nil
Expand Down
50 changes: 0 additions & 50 deletions pkg/storage/stores/shipper/bloomshipper/compress_utils.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package bloomshipper

import (
"fmt"
"io"
"os"
"path/filepath"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)
Expand All @@ -31,49 +27,3 @@ func CompressBloomBlock(ref BlockRef, archivePath, localDst string, logger log.L

return blockToUpload, nil
}

func writeDataToTempFile(workingDirectoryPath string, data io.ReadCloser) (string, error) {
defer data.Close()
archivePath := filepath.Join(workingDirectoryPath, uuid.New().String())

archiveFile, err := os.Create(archivePath)
if err != nil {
return "", fmt.Errorf("error creating empty file to store the archiver: %w", err)
}
defer archiveFile.Close()
_, err = io.Copy(archiveFile, data)
if err != nil {
return "", fmt.Errorf("error writing data to archive file: %w", err)
}
return archivePath, nil
}

func extractArchive(archivePath string, workingDirectoryPath string) error {
file, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("error opening archive file %s: %w", archivePath, err)
}
return v1.UnTarGz(workingDirectoryPath, file)
}

func extractBlock(data io.ReadCloser, blockDir string, logger log.Logger) error {
err := os.MkdirAll(blockDir, os.ModePerm)
if err != nil {
return fmt.Errorf("can not create directory to extract the block: %w", err)
}
archivePath, err := writeDataToTempFile(blockDir, data)
if err != nil {
return fmt.Errorf("error writing data to temp file: %w", err)
}
defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "error removing temp archive file", "err", err)
}
}()
err = extractArchive(archivePath, blockDir)
if err != nil {
return fmt.Errorf("error extracting archive: %w", err)
}
return nil
}
28 changes: 4 additions & 24 deletions pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,14 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func Test_blockDownloader_extractBlock(t *testing.T) {
blockFilePath, _, bloomFileContent, seriesFileContent := createBlockArchive(t)
blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700)
require.NoError(t, err)

workingDir := t.TempDir()

err = extractBlock(blockFile, workingDir, nil)
require.NoError(t, err)

require.FileExists(t, filepath.Join(workingDir, v1.BloomFileName))
require.FileExists(t, filepath.Join(workingDir, v1.SeriesFileName))

actualBloomFileContent, err := os.ReadFile(filepath.Join(workingDir, v1.BloomFileName))
require.NoError(t, err)
require.Equal(t, bloomFileContent, string(actualBloomFileContent))

actualSeriesFileContent, err := os.ReadFile(filepath.Join(workingDir, v1.SeriesFileName))
require.NoError(t, err)
require.Equal(t, seriesFileContent, string(actualSeriesFileContent))
}

func directoryDoesNotExist(path string) bool {
_, err := os.Lstat(path)
return err != nil
}

const testArchiveFileName = "test-block-archive"

func createBlockArchive(t *testing.T) (string, string, string, string) {
func createBlockArchive(t *testing.T) (string, io.Reader, string, string) {
dir := t.TempDir()
mockBlockDir := filepath.Join(dir, "mock-block-dir")
err := os.MkdirAll(mockBlockDir, 0777)
Expand All @@ -65,5 +43,7 @@ func createBlockArchive(t *testing.T) (string, string, string, string) {
err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir))
require.NoError(t, err)

return blockFilePath, mockBlockDir, bloomFileContent, seriesFileContent
_, _ = file.Seek(0, 0)

return blockFilePath, file, bloomFileContent, seriesFileContent
}

0 comments on commit 9ef3b11

Please sign in to comment.