Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split store-gateway into index-gateway and chunks-gateway #2

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cortexproject/cortex/pkg/alertmanager"
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
"github.com/cortexproject/cortex/pkg/chunksgateway"
"github.com/cortexproject/cortex/pkg/compactor"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor"
Expand Down Expand Up @@ -367,6 +368,11 @@ func (a *API) RegisterStoreGateway(s *storegateway.StoreGateway) {
a.RegisterRoute("/store-gateway/ring", http.HandlerFunc(s.RingHandler), false, "GET", "POST")
}

// RegisterStoreGateway registers the ring UI page associated with the store-gateway.
func (a *API) RegisterChunksGateway(s *chunksgateway.ChunksGateway) {
storegatewaypb.RegisterChunksGatewayServer(a.server.GRPC, s)
}

// RegisterCompactor registers the ring UI page associated with the compactor.
func (a *API) RegisterCompactor(c *compactor.Compactor) {
a.indexPage.AddLink(SectionAdminEndpoints, "/compactor/ring", "Compactor Ring Status")
Expand Down
120 changes: 120 additions & 0 deletions pkg/chunksgateway/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package chunksgateway

import (
"context"
"time"

cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/weaveworks/common/logging"

"github.com/cortexproject/cortex/pkg/storage/bucket"
)

const (
syncReasonInitial = "initial"
syncReasonPeriodic = "periodic"
)

type ChunksGateway struct {
services.Service

logger log.Logger

gatewayCfg storegateway.Config
storageCfg cortex_tsdb.BlocksStorageConfig

stores *storegateway.BucketStores
}

func NewChunksGateway(gatewayCfg storegateway.Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ChunksGateway, error) {
bucketClient, err := createBucketClient(storageCfg, logger, reg)
if err != nil {
return nil, err
}

return newChunksGateway(gatewayCfg, storageCfg, bucketClient, limits, logLevel, logger, reg)
}

func newChunksGateway(gatewayCfg storegateway.Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ChunksGateway, error) {
var err error

g := &ChunksGateway{
gatewayCfg: gatewayCfg,
storageCfg: storageCfg,
logger: logger,
}

// Init sharding strategy.
shardingStrategy := storegateway.NewNoShardingStrategy()

g.stores, err = storegateway.NewBucketStores(storageCfg, shardingStrategy, bucketClient, false, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg))
if err != nil {
return nil, errors.Wrap(err, "create bucket stores")
}

g.Service = services.NewBasicService(g.starting, g.running, g.stopping)

return g, nil
}

func (g *ChunksGateway) starting(ctx context.Context) (err error) {
if err = g.stores.InitialSync(ctx); err != nil {
return errors.Wrap(err, "initial blocks synchronization")
}

return nil
}

func (g *ChunksGateway) running(ctx context.Context) error {
// Apply a jitter to the sync frequency in order to increase the probability
// of hitting the shared cache (if any).
syncTicker := time.NewTicker(util.DurationWithJitter(g.storageCfg.BucketStore.SyncInterval, 0.2))
defer syncTicker.Stop()

for {
select {
case <-syncTicker.C:
g.syncStores(ctx, syncReasonPeriodic)
case <-ctx.Done():
return nil
}
}
}

func (g *ChunksGateway) stopping(error) error {
return nil
}

func (g *ChunksGateway) syncStores(ctx context.Context, reason string) {
level.Info(g.logger).Log("msg", "synchronizing TSDB blocks for all users", "reason", reason)

if err := g.stores.SyncBlocks(ctx); err != nil {
level.Warn(g.logger).Log("msg", "failed to synchronize TSDB blocks", "reason", reason, "err", err)
} else {
level.Info(g.logger).Log("msg", "successfully synchronized TSDB blocks for all users", "reason", reason)
}
}

func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "store-gateway", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "create bucket client")
}

return bucketClient, nil
}

func (g *ChunksGateway) Chunks(srv storegatewaypb.ChunksGateway_ChunksServer) (err error) {
return g.stores.Chunks(srv)
}
18 changes: 10 additions & 8 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/chunksgateway"
"github.com/cortexproject/cortex/pkg/util/grpcclient"

"github.com/cortexproject/cortex/pkg/alertmanager"
Expand Down Expand Up @@ -308,14 +309,15 @@ type Cortex struct {
QuerierEngine v1.QueryEngine
QueryFrontendTripperware tripperware.Tripperware

Ruler *ruler.Ruler
RulerStorage rulestore.RuleStore
ConfigAPI *configAPI.API
ConfigDB db.DB
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.Compactor
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInitService
Ruler *ruler.Ruler
RulerStorage rulestore.RuleStore
ConfigAPI *configAPI.API
ConfigDB db.DB
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.Compactor
StoreGateway *storegateway.StoreGateway
ChunksGateway *chunksgateway.ChunksGateway
MemberlistKV *memberlist.KVInitService

// Queryables that the querier should use to query the long
// term storage. It depends on the storage engine used.
Expand Down
20 changes: 19 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/api"
"github.com/cortexproject/cortex/pkg/chunksgateway"
"github.com/cortexproject/cortex/pkg/compactor"
configAPI "github.com/cortexproject/cortex/pkg/configs/api"
"github.com/cortexproject/cortex/pkg/configs/db"
Expand Down Expand Up @@ -81,6 +82,7 @@ const (
AlertManager string = "alertmanager"
Compactor string = "compactor"
StoreGateway string = "store-gateway"
ChunksGateway string = "chunks-gateway"
MemberlistKV string = "memberlist-kv"
TenantDeletion string = "tenant-deletion"
Purger string = "purger"
Expand Down Expand Up @@ -681,6 +683,20 @@ func (t *Cortex) initStoreGateway() (serv services.Service, err error) {
return t.StoreGateway, nil
}

func (t *Cortex) initChunksGateway() (serv services.Service, err error) {
t.Cfg.StoreGateway.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort

t.ChunksGateway, err = chunksgateway.NewChunksGateway(t.Cfg.StoreGateway, t.Cfg.BlocksStorage, t.Overrides, t.Cfg.Server.LogLevel, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

// Expose HTTP endpoints.
t.API.RegisterChunksGateway(t.ChunksGateway)

return t.ChunksGateway, nil
}

func (t *Cortex) initMemberlistKV() (services.Service, error) {
reg := prometheus.DefaultRegisterer
t.Cfg.MemberlistKV.MetricsRegisterer = reg
Expand Down Expand Up @@ -759,6 +775,7 @@ func (t *Cortex) setupModuleManager() error {
mm.RegisterModule(AlertManager, t.initAlertManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(StoreGateway, t.initStoreGateway)
mm.RegisterModule(ChunksGateway, t.initChunksGateway)
mm.RegisterModule(TenantDeletion, t.initTenantDeletionAPI, modules.UserInvisibleModule)
mm.RegisterModule(Purger, nil)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
Expand Down Expand Up @@ -790,10 +807,11 @@ func (t *Cortex) setupModuleManager() error {
AlertManager: {API, MemberlistKV, Overrides},
Compactor: {API, MemberlistKV, Overrides},
StoreGateway: {API, Overrides, MemberlistKV},
ChunksGateway: {API, Overrides, MemberlistKV},
TenantDeletion: {API, Overrides, DeleteRequestsStore},
Purger: {TenantDeletion},
TenantFederation: {Queryable},
All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler},
All: {QueryFrontend, Querier, Ingester, Distributor, Purger, ChunksGateway, StoreGateway, Ruler},
}
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
deps[Ruler] = []string{Overrides, DeleteRequestsStore, RulerStorage}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"time"

"github.com/cortexproject/cortex/pkg/storegateway/storepb"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand Down
24 changes: 12 additions & 12 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,34 @@ import (
"math"
"sort"

"github.com/cortexproject/cortex/pkg/storegateway/typespb"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/cortexproject/cortex/pkg/querier/iterators"
"github.com/cortexproject/cortex/pkg/querier/series"
)

func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMatcher {
var converted []storepb.LabelMatcher
func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []typespb.LabelMatcher {
var converted []typespb.LabelMatcher
for _, m := range matchers {
var t storepb.LabelMatcher_Type
var t typespb.LabelMatcher_Type
switch m.Type {
case labels.MatchEqual:
t = storepb.LabelMatcher_EQ
t = typespb.EQ
case labels.MatchNotEqual:
t = storepb.LabelMatcher_NEQ
t = typespb.NEQ
case labels.MatchRegexp:
t = storepb.LabelMatcher_RE
t = typespb.RE
case labels.MatchNotRegexp:
t = storepb.LabelMatcher_NRE
t = typespb.NRE
}

converted = append(converted, storepb.LabelMatcher{
converted = append(converted, typespb.LabelMatcher{
Type: t,
Name: m.Name,
Value: m.Value,
Expand All @@ -42,7 +42,7 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa

// Implementation of storage.SeriesSet, based on individual responses from store client.
type blockQuerierSeriesSet struct {
series []*storepb.Series
series []*typespb.Series
warnings annotations.Annotations

// next response to process
Expand Down Expand Up @@ -88,7 +88,7 @@ func (bqss *blockQuerierSeriesSet) Warnings() annotations.Annotations {
}

// newBlockQuerierSeries makes a new blockQuerierSeries. Input labels must be already sorted by name.
func newBlockQuerierSeries(lbls []labels.Label, chunks []storepb.AggrChunk) *blockQuerierSeries {
func newBlockQuerierSeries(lbls []labels.Label, chunks []typespb.AggrChunk) *blockQuerierSeries {
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].MinTime < chunks[j].MinTime
})
Expand All @@ -98,7 +98,7 @@ func newBlockQuerierSeries(lbls []labels.Label, chunks []storepb.AggrChunk) *blo

type blockQuerierSeries struct {
labels labels.Labels
chunks []storepb.AggrChunk
chunks []typespb.AggrChunk
}

func (bqs *blockQuerierSeries) Labels() labels.Labels {
Expand Down
Loading
Loading