Skip to content

Commit

Permalink
Rename NewShipper to NewIndexClient (grafana#10700)
Browse files Browse the repository at this point in the history
The constructor function name is misleading as it suggests that the returned object is a [shipper](https://github.com/grafana/loki/blob/main/pkg/storage/stores/shipper/index/table_manager.go#L48), rather than an index client.

Also, change the return type to be a struct pointer, rather than an interface.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Sep 27, 2023
1 parent f4ab1e3 commit 7684226
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
10 changes: 5 additions & 5 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
var (
indexGatewayClient index.Client
// singleton for each period
boltdbIndexClientsWithShipper = make(map[config.DayTime]index.Client)
boltdbIndexClientsWithShipper = make(map[config.DayTime]*shipper.IndexClient)

supportedIndexTypes = []string{
config.BoltDBShipperType,
Expand Down Expand Up @@ -100,7 +100,7 @@ func ResetBoltDBIndexClientsWithShipper() {
client.Stop()
}

boltdbIndexClientsWithShipper = make(map[config.DayTime]index.Client)
boltdbIndexClientsWithShipper = make(map[config.DayTime]*shipper.IndexClient)

if indexGatewayClient != nil {
indexGatewayClient.Stop()
Expand Down Expand Up @@ -440,13 +440,13 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange,
if shardingStrategy != nil {
filterFn = shardingStrategy.FilterTenants
}
shipper, err := shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, filterFn, tableRange, registerer, logger)
indexClient, err := shipper.NewIndexClient(cfg.BoltDBShipperConfig, objectClient, limits, filterFn, tableRange, registerer, logger)
if err != nil {
return nil, err
}

boltdbIndexClientsWithShipper[periodCfg.From] = shipper
return shipper, nil
boltdbIndexClientsWithShipper[periodCfg.From] = indexClient
return indexClient, nil

case config.TSDBType:
// TODO(chaudum): Move TSDB index client creation into this code path
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/stores/shipper/shipper_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type writer interface {
Stop()
}

type indexClient struct {
type IndexClient struct {
cfg Config
indexShipper indexshipper.IndexShipper
writer writer
Expand All @@ -59,10 +59,10 @@ type indexClient struct {
stopOnce sync.Once
}

// NewShipper creates a shipper for syncing local objects with a store
func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits,
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (series_index.Client, error) {
i := indexClient{
// NewIndexClient creates a shipper for syncing local objects with a store
func NewIndexClient(cfg Config, storageClient client.ObjectClient, limits downloads.Limits,
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (*IndexClient, error) {
i := IndexClient{
cfg: cfg,
metrics: newMetrics(registerer),
logger: logger,
Expand All @@ -78,7 +78,7 @@ func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.
return &i, nil
}

func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.Limits,
func (i *IndexClient) init(storageClient client.ObjectClient, limits downloads.Limits,
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer) error {
var err error
i.indexShipper, err = indexshipper.NewIndexShipper(i.cfg.Config, storageClient, limits, tenantFilter,
Expand Down Expand Up @@ -110,28 +110,28 @@ func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.L
return nil
}

func (i *indexClient) Stop() {
func (i *IndexClient) Stop() {
i.stopOnce.Do(i.stop)
}

func (i *indexClient) stop() {
func (i *IndexClient) stop() {
if i.writer != nil {
i.writer.Stop()
}
i.indexShipper.Stop()
}

func (i *indexClient) NewWriteBatch() series_index.WriteBatch {
func (i *IndexClient) NewWriteBatch() series_index.WriteBatch {
return local.NewWriteBatch()
}

func (i *indexClient) BatchWrite(ctx context.Context, batch series_index.WriteBatch) error {
func (i *IndexClient) BatchWrite(ctx context.Context, batch series_index.WriteBatch) error {
return instrument.CollectedRequest(ctx, "WRITE", instrument.NewHistogramCollector(i.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error {
return i.writer.BatchWrite(ctx, batch)
})
}

func (i *indexClient) QueryPages(ctx context.Context, queries []series_index.Query, callback series_index.QueryPagesCallback) error {
func (i *IndexClient) QueryPages(ctx context.Context, queries []series_index.Query, callback series_index.QueryPagesCallback) error {
return instrument.CollectedRequest(ctx, "Shipper.Query", instrument.NewHistogramCollector(i.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error {
return i.querier.QueryPages(ctx, queries, callback)
})
Expand Down

0 comments on commit 7684226

Please sign in to comment.