Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: Allow setting a constant prefix for all created keys #10096

Merged
merged 5 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
##### Enhancements

* [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace.
* [10096](https://github.com/grafana/loki/pull/10096) **aschleck**: Storage: Allow setting a constant prefix for all created keys
* [11038](https://github.com/grafana/loki/pull/11038) **kavirajk**: Remove already deprecated `store.max-look-back-period`.
* [10906](https://github.com/grafana/loki/pull/10906) **kavirajk**: Support Loki ruler to notify WAL writes to remote storage.
* [10613](https://github.com/grafana/loki/pull/10613) **ngc4579**: Helm: allow GrafanaAgent tolerations
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2100,6 +2100,11 @@ congestion_control:
# CLI flag: -store.congestion-control.hedge.strategy
[strategy: <string> | default = ""]

# Experimental. Sets a constant prefix for all keys inserted into object
# storage. Example: loki/
# CLI flag: -store.object-prefix
[object_prefix: <string> | default = ""]

# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is: store.index-cache-read
[index_queries_cache_config: <cache_config>]
Expand Down
8 changes: 7 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie

if c.cfg.RetentionEnabled {
var (
raw client.ObjectClient
encoder client.KeyEncoder
name = fmt.Sprintf("%s_%s", period.ObjectType, period.From.String())
retentionWorkDir = filepath.Join(c.cfg.WorkingDirectory, "retention", name)
Expand All @@ -313,7 +314,12 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie
// remove markers from the store dir after copying them to period specific dirs.
legacyMarkerDirs[period.ObjectType] = struct{}{}

if _, ok := objectClient.(*local.FSObjectClient); ok {
if casted, ok := objectClient.(client.PrefixedObjectClient); ok {
raw = casted.GetDownstream()
} else {
raw = objectClient
}
if _, ok := raw.(*local.FSObjectClient); ok {
encoder = client.FSEncoder
}
chunkClient := client.NewClient(objectClient, encoder, schemaConfig)
Expand Down
69 changes: 69 additions & 0 deletions pkg/storage/chunk/client/prefixed_object_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package client

import (
"context"
"io"
"strings"
)

type PrefixedObjectClient struct {
downstreamClient ObjectClient
prefix string
}

func NewPrefixedObjectClient(downstreamClient ObjectClient, prefix string) ObjectClient {
return PrefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix}
}

func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object)
}

func (p PrefixedObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) List(ctx context.Context, prefix, delimiter string) ([]StorageObject, []StorageCommonPrefix, error) {
objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix, delimiter)
if err != nil {
return nil, nil, err
}

for i := range objects {
objects[i].Key = strings.TrimPrefix(objects[i].Key, p.prefix)
}

for i := range commonPrefixes {
commonPrefixes[i] = StorageCommonPrefix(strings.TrimPrefix(string(commonPrefixes[i]), p.prefix))
}

return objects, commonPrefixes, nil
}

func (p PrefixedObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return p.downstreamClient.DeleteObject(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) IsObjectNotFoundErr(err error) bool {
return p.downstreamClient.IsObjectNotFoundErr(err)
}

func (p PrefixedObjectClient) IsRetryableErr(err error) bool {
return p.downstreamClient.IsRetryableErr(err)
}

func (p PrefixedObjectClient) Stop() {
p.downstreamClient.Stop()
}

func (p PrefixedObjectClient) GetDownstream() ObjectClient {
return p.downstreamClient
}

func (p PrefixedObjectClient) GetPrefix() string {
return p.prefix
}
19 changes: 18 additions & 1 deletion pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ type Config struct {
COSConfig ibmcloud.COSConfig `yaml:"cos"`
IndexCacheValidity time.Duration `yaml:"index_cache_validity"`
CongestionControl congestion.Config `yaml:"congestion_control,omitempty"`
ObjectPrefix string `yaml:"object_prefix" doc:"description=Experimental. Sets a constant prefix for all keys inserted into object storage. Example: loki/"`

IndexQueriesCacheConfig cache.Config `yaml:"index_queries_cache_config"`
DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"`
Expand Down Expand Up @@ -362,6 +363,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "", f)
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.")
f.StringVar(&cfg.ObjectPrefix, "store.object-prefix", "", "The prefix to all keys inserted in object storage. Example: loki-instances/west/")
f.BoolVar(&cfg.DisableBroadIndexQueries, "store.disable-broad-index-queries", false, "Disable broad index queries which results in reduced cache usage and faster query performance at the expense of somewhat higher QPS on the index store.")
f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.")
cfg.BoltDBShipperConfig.RegisterFlags(f)
Expand Down Expand Up @@ -634,8 +636,23 @@ func (c *ClientMetrics) Unregister() {
c.AzureMetrics.Unregister()
}

// NewObjectClient makes a new StorageClient of the desired types.
// NewObjectClient makes a new StorageClient with the prefix in the front.
func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) {
actual, err := internalNewObjectClient(name, cfg, clientMetrics)
if err != nil {
return nil, err
}

if cfg.ObjectPrefix == "" {
return actual, nil
} else {
prefix := strings.Trim(cfg.ObjectPrefix, "/") + "/"
slim-bean marked this conversation as resolved.
Show resolved Hide resolved
return client.NewPrefixedObjectClient(actual, prefix), nil
}
}

// internalNewObjectClient makes the underlying StorageClient of the desired types.
func internalNewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) {
var (
namedStore string
storeType = name
Expand Down
53 changes: 53 additions & 0 deletions pkg/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/cassandra"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
Expand Down Expand Up @@ -227,6 +228,58 @@ func TestNamedStores_populateStoreType(t *testing.T) {
})
}

func TestNewObjectClient_prefixing(t *testing.T) {
t.Run("no prefix", func(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)

objectClient, err := NewObjectClient("inmemory", cfg, cm)
require.NoError(t, err)

_, ok := objectClient.(client.PrefixedObjectClient)
assert.False(t, ok)
})

t.Run("prefix with trailing /", func(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "my/prefix/"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
assert.True(t, ok)
assert.Equal(t, "my/prefix/", prefixed.GetPrefix())
})

t.Run("prefix without trailing /", func(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "my/prefix"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
assert.True(t, ok)
assert.Equal(t, "my/prefix/", prefixed.GetPrefix())
})

t.Run("prefix with starting and trailing /", func(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "/my/prefix/"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
assert.True(t, ok)
assert.Equal(t, "my/prefix/", prefixed.GetPrefix())
})
}

// DefaultSchemaConfig creates a simple schema config for testing
func DefaultSchemaConfig(store, schema string, from model.Time) config.SchemaConfig {
s := config.SchemaConfig{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func (m *mockObjectClient) List(ctx context.Context, prefix, delimiter string) (
}

func TestCachedObjectClient_List(t *testing.T) {
objectKeys := func(items []client.StorageObject) []string {
keys := make([]string, 0, len(items))
for _, item := range items {
keys = append(keys, item.Key)
}
return keys
}

t.Run("refresh table name cache if requested table is not in cache", func(t *testing.T) {
ctx := context.Background()

Expand All @@ -81,14 +89,6 @@ func TestCachedObjectClient_List(t *testing.T) {
// replace mock object client with one that returns more tables
cachedObjectClient.ObjectClient = newMockObjectClient(t, newObjectsInStorage)

objectKeys := func(items []client.StorageObject) []string {
keys := make([]string, 0, len(items))
for _, item := range items {
keys = append(keys, item.Key)
}
return keys
}

// list contents of a table that is in table name cache
objects, _, err = cachedObjectClient.listTable(ctx, "table1")
require.Nil(t, err)
Expand All @@ -110,6 +110,24 @@ func TestCachedObjectClient_List(t *testing.T) {
objectsFromListCall, _, _ = cachedObjectClient.List(ctx, "table3/", "/", false)
require.Equal(t, objectsFromListCall, objects)
})

t.Run("supports prefixed clients", func(t *testing.T) {
ctx := context.Background()

prefix := "my/amazing/prefix/"
objectsInStorage := []string{
prefix + "table1/db.gz",
prefix + "table2/db.gz",
prefix + "table2/db2.gz",
}
objectClient := newMockObjectClient(t, objectsInStorage)
prefixedClient := client.NewPrefixedObjectClient(objectClient, prefix)
cachedObjectClient := newCachedObjectClient(prefixedClient)

objects, _, err := cachedObjectClient.List(ctx, "table2/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"table2/db.gz", "table2/db2.gz"}, objectKeys(objects))
})
}

func TestCachedObjectClient(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type IndexFile struct {
}

func NewIndexStorageClient(origObjectClient client.ObjectClient, storagePrefix string) Client {
objectClient := newCachedObjectClient(newPrefixedObjectClient(origObjectClient, storagePrefix))
objectClient := newCachedObjectClient(client.NewPrefixedObjectClient(origObjectClient, storagePrefix))
return &indexStorageClient{objectClient: objectClient}
}

Expand Down

This file was deleted.

Loading