Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Un/compress bloom blocks to/from object storage #11238

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions pkg/storage/bloom/v1/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"archive/tar"
"bytes"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -98,3 +99,36 @@ func UnTarGz(dst string, r io.Reader) error {

return nil
}

func UnGzData(compressedData []byte) []byte {
reader := bytes.NewReader(compressedData)
gz, err := chunkenc.GetReaderPool(chunkenc.EncGZIP).GetReader(reader)
if err != nil {
return nil
}

// Read the decompressed data directly from the gzip reader
var decompressedData bytes.Buffer
_, err = decompressedData.ReadFrom(gz)
if err != nil {
return nil
}

return decompressedData.Bytes()
}

func GzData(data []byte) []byte {
var buf bytes.Buffer
gz := chunkenc.Gzip.GetWriter(&buf)

_, err := gz.Write(data)
if err != nil {
return nil
}

if err := gz.Close(); err != nil {
return nil
}

return buf.Bytes()
}
74 changes: 74 additions & 0 deletions pkg/storage/bloom/v1/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package v1
import (
"bytes"
"io"
"os"
"os/exec"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -77,3 +79,75 @@ func TestArchive(t *testing.T) {
require.Nil(t, err)
require.Equal(t, srcBloomsBytes, dstBloomsBytes)
}

// Test to validate correctness of the UnGzData function.
// We use the gzip command line tool to compress a file and then
// uncompress the file contents using the UnGzData function
func TestUnGzData(t *testing.T) {
// Create a temporary file to gzip
content := []byte("Hello World!")
filePath := "testfile.txt"
err := os.WriteFile(filePath, content, 0644)
require.Nil(t, err)

defer os.Remove(filePath)

// Compress the file using the gzip command line tool
gzipFileName := "testfile.txt.gz"
cmd := exec.Command("gzip", filePath)
err = cmd.Run()
require.Nil(t, err)
defer os.Remove(gzipFileName)

// Read the gzipped file using the compress/gzip package
gzipFile, err := os.Open(gzipFileName)
require.Nil(t, err)
defer gzipFile.Close()

fileContent, err := os.ReadFile(gzipFileName)
require.Nil(t, err)

uncompressedContent := UnGzData(fileContent)

// Check if the uncompressed data matches the original content
require.Equal(t, uncompressedContent, content)
}

// Test to validate correctness of the GzData function.
// We use the GzData function to compress data, and then write it to a file
// Then we use the gunzip commandline tool to uncompress the file and verify
// the contents match the original data
func TestGzData(t *testing.T) {
// Create a temporary file to gzip
content := []byte("Hello World!")
compressedContent := GzData(content)
baseFileName := "testfile.txt"
filePath := "testfile.txt.gz"
err := os.WriteFile(filePath, compressedContent, 0644)
require.Nil(t, err)

defer os.Remove(filePath)

// Uncompress the file using the gunzip command line tool
cmd := exec.Command("gunzip", filePath)
err = cmd.Run()
require.Nil(t, err)

defer os.Remove(baseFileName)

require.Nil(t, err)

uncompressedContent, err := os.ReadFile(baseFileName)
require.Nil(t, err)

// Check if the uncompressed data matches the original content
require.Equal(t, uncompressedContent, content)
}

// Test to validate that the GzData and UnGzData functions are inverses of each other
func TestGzUnGzData(t *testing.T) {
testString := []byte("Hello World!")
compressed := GzData(testString)
uncompressed := UnGzData(compressed)
require.Equal(t, testString, uncompressed)
}
28 changes: 25 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"time"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"

"github.com/prometheus/common/model"

"github.com/grafana/dskit/concurrency"
Expand Down Expand Up @@ -209,9 +211,24 @@ func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (cha
if err != nil {
return fmt.Errorf("error while fetching object from storage: %w", err)
}

data := new(bytes.Buffer)
_, err = io.Copy(data, readCloser)
if err != nil {
return fmt.Errorf("error reading data: %w", err)
}
decompressedData := v1.UnGzData(data.Bytes())
if decompressedData == nil {
return fmt.Errorf("error decompressing data")
}

// Creating a new ReadCloser from the decompressed data
decompressedDataReader := bytes.NewReader(decompressedData)
decompressedReadCloser := io.NopCloser(decompressedDataReader)

blocksChannel <- Block{
BlockRef: reference,
Data: readCloser,
Data: decompressedReadCloser,
}
return nil
})
Expand All @@ -225,7 +242,6 @@ func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (cha
return blocksChannel, errChannel
}

// TODO zip (archive) blocks before uploading to storage
func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) {
results := make([]Block, len(blocks))
//todo move concurrency to the config
Expand All @@ -245,7 +261,13 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e
if err != nil {
return fmt.Errorf("error while reading object data: %w", err)
}
err = objectClient.PutObject(ctx, key, bytes.NewReader(data))

compressedData := v1.GzData(data)
if compressedData == nil {
return fmt.Errorf("error compressing data")
}

err = objectClient.PutObject(ctx, key, bytes.NewReader(compressedData))
if err != nil {
return fmt.Errorf("error updloading block file: %w", err)
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"testing"
"time"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"

aws_io "github.com/aws/smithy-go/io"
"github.com/google/uuid"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -236,11 +238,11 @@ func Test_BloomClient_GetBlocks(t *testing.T) {

firstBlockActualData, exists := blocks[firstBlockRef.BlockPath]
require.Truef(t, exists, "data for the first block must be present in the results: %+v", blocks)
require.Equal(t, firstBlockData, firstBlockActualData)
require.Equal(t, v1.UnGzData(firstBlockData), ([]byte)(firstBlockActualData))

secondBlockActualData, exists := blocks[secondBlockRef.BlockPath]
require.True(t, exists, "data for the second block must be present in the results: %+v", blocks)
require.Equal(t, secondBlockData, secondBlockActualData)
require.Equal(t, v1.UnGzData(secondBlockData), ([]byte)(secondBlockActualData))

require.Len(t, blocks, 2)
}
Expand Down Expand Up @@ -300,7 +302,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
require.FileExists(t, savedFilePath)
savedData, err := os.ReadFile(savedFilePath)
require.NoError(t, err)
require.Equal(t, blockForFirstFolderData, string(savedData))
require.Equal(t, v1.GzData(([]byte)(blockForFirstFolderData)), savedData)

secondResultBlock := results[1]
path = secondResultBlock.BlockPath
Expand All @@ -319,7 +321,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
require.FileExists(t, savedFilePath)
savedData, err = os.ReadFile(savedFilePath)
require.NoError(t, err)
require.Equal(t, blockForSecondFolderData, string(savedData))
require.Equal(t, v1.GzData(([]byte)(blockForSecondFolderData)), savedData)
}

func Test_BloomClient_DeleteBlocks(t *testing.T) {
Expand Down Expand Up @@ -364,13 +366,14 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) {
require.NoFileExists(t, block2Path)
}

func createBlockFile(t *testing.T, path string) string {
func createBlockFile(t *testing.T, path string) []byte {
err := os.MkdirAll(path[:strings.LastIndex(path, "/")], 0755)
require.NoError(t, err)
fileContent := uuid.NewString()
err = os.WriteFile(path, []byte(fileContent), 0700)
results := v1.GzData([]byte(fileContent))
err = os.WriteFile(path, v1.GzData([]byte(fileContent)), 0700)
require.NoError(t, err)
return fileContent
return results
}

func Test_TablesByPeriod(t *testing.T) {
Expand Down
Loading