Skip to content

Commit

Permalink
(chore) Bloomshipper: Separate store and client (#11865)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

This PR removes the `StoreAndClient` interface that was accepted by the
`BloomShipper`. Since the `BloomStore` had to not only implement the
`Store` interface, but also the `Client` interface, it caused
re-implementation of the same methods in different ways.

Now the shipper solely relies on the `Store` interface.

See individual commit messages for more context.

Tests have been rewritten from scratch and placed in their own
respective test files for store and client.

---------

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Feb 5, 2024
1 parent 2e3fa3b commit 73edf7a
Show file tree
Hide file tree
Showing 8 changed files with 586 additions and 605 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ require (
github.com/DmitriyVTitov/size v1.5.0
github.com/IBM/go-sdk-core/v5 v5.13.1
github.com/IBM/ibm-cos-sdk-go v1.10.0
github.com/aws/smithy-go v1.11.1
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc
github.com/d4l3k/messagediff v1.2.1
github.com/efficientgo/core v1.0.0-rc.2
Expand Down Expand Up @@ -183,6 +182,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 // indirect
github.com/aws/smithy-go v1.11.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Compactor struct {
limits Limits

// temporary workaround until store has implemented read/write shipper interface
store bloomshipper.StoreAndClient
store bloomshipper.Store

sharding ShardingStrategy

Expand All @@ -48,7 +48,7 @@ type Compactor struct {

func New(
cfg Config,
store bloomshipper.StoreAndClient,
store bloomshipper.Store,
sharding ShardingStrategy,
limits Limits,
logger log.Logger,
Expand Down
51 changes: 41 additions & 10 deletions pkg/storage/chunk/client/testutils/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,36 @@ const (

// MockStorage is a fake in-memory StorageClient.
type MockStorage struct {
*InMemoryObjectClient

mtx sync.RWMutex
tables map[string]*mockTable
objects map[string][]byte
schemaCfg config.SchemaConfig

numIndexWrites int
numChunkWrites int
mode MockStorageMode
}

// compiler check
var _ client.ObjectClient = &InMemoryObjectClient{}

type InMemoryObjectClient struct {
objects map[string][]byte
mtx sync.RWMutex
mode MockStorageMode
}

func NewInMemoryObjectClient() *InMemoryObjectClient {
return &InMemoryObjectClient{
objects: make(map[string][]byte),
}
}

func (m *InMemoryObjectClient) Internals() map[string][]byte {
return m.objects
}

type mockTable struct {
items map[string][]mockItem
write, read int64
Expand All @@ -64,6 +84,7 @@ func ResetMockStorage() {
func NewMockStorage() *MockStorage {
if singleton == nil {
singleton = &MockStorage{
InMemoryObjectClient: NewInMemoryObjectClient(),
schemaCfg: config.SchemaConfig{
Configs: []config.PeriodConfig{
{
Expand All @@ -73,8 +94,7 @@ func NewMockStorage() *MockStorage {
},
},
},
tables: map[string]*mockTable{},
objects: map[string][]byte{},
tables: map[string]*mockTable{},
}
}
return singleton
Expand Down Expand Up @@ -109,6 +129,7 @@ func (*MockStorage) Stop() {

func (m *MockStorage) SetMode(mode MockStorageMode) {
m.mode = mode
m.InMemoryObjectClient.mode = mode
}

// ListTables implements StorageClient.
Expand Down Expand Up @@ -370,7 +391,8 @@ func (m *MockStorage) query(ctx context.Context, query index.Query, callback fun
return nil
}

func (m *MockStorage) ObjectExists(_ context.Context, objectKey string) (bool, error) {
// ObjectExists implments client.ObjectClient
func (m *InMemoryObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()

Expand All @@ -386,7 +408,8 @@ func (m *MockStorage) ObjectExists(_ context.Context, objectKey string) (bool, e
return true, nil
}

func (m *MockStorage) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) {
// GetObject implements client.ObjectClient.
func (m *InMemoryObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()

Expand All @@ -402,7 +425,8 @@ func (m *MockStorage) GetObject(_ context.Context, objectKey string) (io.ReadClo
return io.NopCloser(bytes.NewReader(buf)), int64(len(buf)), nil
}

func (m *MockStorage) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error {
// PutObject implements client.ObjectClient.
func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error {
buf, err := io.ReadAll(object)
if err != nil {
return err
Expand All @@ -419,17 +443,20 @@ func (m *MockStorage) PutObject(_ context.Context, objectKey string, object io.R
return nil
}

func (m *MockStorage) IsObjectNotFoundErr(err error) bool {
// IsObjectNotFoundErr implements client.ObjectClient.
func (m *InMemoryObjectClient) IsObjectNotFoundErr(err error) bool {
return errors.Is(err, errStorageObjectNotFound)
}

func (m *MockStorage) IsChunkNotFoundErr(err error) bool {
return m.IsObjectNotFoundErr(err)
}

func (m *MockStorage) IsRetryableErr(error) bool { return false }
// IsRetryableErr implements client.ObjectClient.
func (m *InMemoryObjectClient) IsRetryableErr(error) bool { return false }

func (m *MockStorage) DeleteObject(_ context.Context, objectKey string) error {
// DeleteObject implements client.ObjectClient.
func (m *InMemoryObjectClient) DeleteObject(_ context.Context, objectKey string) error {
m.mtx.Lock()
defer m.mtx.Unlock()

Expand All @@ -446,7 +473,7 @@ func (m *MockStorage) DeleteObject(_ context.Context, objectKey string) error {
}

// List implements chunk.ObjectClient.
func (m *MockStorage) List(_ context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
func (m *InMemoryObjectClient) List(_ context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()

Expand Down Expand Up @@ -494,6 +521,10 @@ func (m *MockStorage) List(_ context.Context, prefix, delimiter string) ([]clien
return storageObjects, commonPrefixes, nil
}

// Stop implements client.ObjectClient
func (*InMemoryObjectClient) Stop() {
}

type mockWriteBatch struct {
inserts []struct {
tableName, hashValue string
Expand Down
Loading

0 comments on commit 73edf7a

Please sign in to comment.