diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 74ae7a67d645b..3b6a89d4a115d 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -45,7 +45,7 @@ import ( var ( indexGatewayClient index.Client // singleton for each period - boltdbIndexClientsWithShipper = make(map[config.DayTime]index.Client) + boltdbIndexClientsWithShipper = make(map[config.DayTime]*shipper.IndexClient) supportedIndexTypes = []string{ config.BoltDBShipperType, @@ -100,7 +100,7 @@ func ResetBoltDBIndexClientsWithShipper() { client.Stop() } - boltdbIndexClientsWithShipper = make(map[config.DayTime]index.Client) + boltdbIndexClientsWithShipper = make(map[config.DayTime]*shipper.IndexClient) if indexGatewayClient != nil { indexGatewayClient.Stop() @@ -440,13 +440,13 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, if shardingStrategy != nil { filterFn = shardingStrategy.FilterTenants } - shipper, err := shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, filterFn, tableRange, registerer, logger) + indexClient, err := shipper.NewIndexClient(cfg.BoltDBShipperConfig, objectClient, limits, filterFn, tableRange, registerer, logger) if err != nil { return nil, err } - boltdbIndexClientsWithShipper[periodCfg.From] = shipper - return shipper, nil + boltdbIndexClientsWithShipper[periodCfg.From] = indexClient + return indexClient, nil case config.TSDBType: // TODO(chaudum): Move TSDB index client creation into this code path diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index ef41550d3e1de..aea57985dd3f1 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -48,7 +48,7 @@ type writer interface { Stop() } -type indexClient struct { +type IndexClient struct { cfg Config indexShipper indexshipper.IndexShipper writer writer @@ -59,10 +59,10 @@ type indexClient struct { stopOnce sync.Once } -// NewShipper creates a shipper for syncing local objects with a store -func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, - tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (series_index.Client, error) { - i := indexClient{ +// NewIndexClient creates a shipper for syncing local objects with a store +func NewIndexClient(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, + tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (*IndexClient, error) { + i := IndexClient{ cfg: cfg, metrics: newMetrics(registerer), logger: logger, @@ -78,7 +78,7 @@ func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads. return &i, nil } -func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.Limits, +func (i *IndexClient) init(storageClient client.ObjectClient, limits downloads.Limits, tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer) error { var err error i.indexShipper, err = indexshipper.NewIndexShipper(i.cfg.Config, storageClient, limits, tenantFilter, @@ -110,28 +110,28 @@ func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.L return nil } -func (i *indexClient) Stop() { +func (i *IndexClient) Stop() { i.stopOnce.Do(i.stop) } -func (i *indexClient) stop() { +func (i *IndexClient) stop() { if i.writer != nil { i.writer.Stop() } i.indexShipper.Stop() } -func (i *indexClient) NewWriteBatch() series_index.WriteBatch { +func (i *IndexClient) NewWriteBatch() series_index.WriteBatch { return local.NewWriteBatch() } -func (i *indexClient) BatchWrite(ctx context.Context, batch series_index.WriteBatch) error { +func (i *IndexClient) BatchWrite(ctx context.Context, batch series_index.WriteBatch) error { return instrument.CollectedRequest(ctx, "WRITE", instrument.NewHistogramCollector(i.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { return i.writer.BatchWrite(ctx, batch) }) } -func (i *indexClient) QueryPages(ctx context.Context, queries []series_index.Query, callback series_index.QueryPagesCallback) error { +func (i *IndexClient) QueryPages(ctx context.Context, queries []series_index.Query, callback series_index.QueryPagesCallback) error { return instrument.CollectedRequest(ctx, "Shipper.Query", instrument.NewHistogramCollector(i.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { return i.querier.QueryPages(ctx, queries, callback) })