Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement labels & series endpoints for RF1 queriers #13974

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,9 +1855,6 @@ func (t *Loki) initMetastore() (services.Service, error) {
if !t.Cfg.IngesterRF1.Enabled {
return nil, nil
}
if t.Cfg.isTarget(All) {
t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%d", t.Cfg.Server.GRPCListenPort)
}
m, err := metastore.New(t.Cfg.Metastore, log.With(util_log.Logger, "component", "metastore"), prometheus.DefaultRegisterer, t.health)
if err != nil {
return nil, err
Expand All @@ -1872,6 +1869,9 @@ func (t *Loki) initMetastoreClient() (services.Service, error) {
if !t.Cfg.IngesterRF1.Enabled && !t.Cfg.QuerierRF1.Enabled {
return nil, nil
}
if t.Cfg.isTarget(All) {
t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%d", t.Cfg.Server.GRPCListenPort)
}
mc, err := metastoreclient.New(t.Cfg.MetastoreClient, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
Expand Down
140 changes: 3 additions & 137 deletions pkg/querier-rf1/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type Rf1Querier struct {
deleteGetter deleteGetter
logger log.Logger
patternQuerier PatterQuerier
walQuerier logql.Querier
walQuerier querier.Querier
}

type deleteGetter interface {
Expand Down Expand Up @@ -214,49 +214,16 @@ func (q *Rf1Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*lo
if err != nil {
return nil, err
}

if *req.Start, *req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil {
return nil, err
}

var matchers []*labels.Matcher
if req.Query != "" {
matchers, err = syntax.ParseMatchers(req.Query, true)
if err != nil {
return nil, err
}
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(ctx, userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

g, ctx := errgroup.WithContext(ctx)

var storeValues []string
g.Go(func() error {
var (
err error
from = model.TimeFromUnixNano(req.Start.UnixNano())
through = model.TimeFromUnixNano(req.End.UnixNano())
)

if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...)
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...)
}
return err
})

if err := g.Wait(); err != nil {
return nil, err
}

return &logproto.LabelResponse{
Values: storeValues,
}, nil
return q.walQuerier.Label(ctx, req)
}

// Check implements the grpc healthcheck
Expand Down Expand Up @@ -285,108 +252,7 @@ func (q *Rf1Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*
ctx, cancel := context.WithDeadlineCause(ctx, time.Now().Add(queryTimeout), errors.New("query timeout reached"))
defer cancel()

return q.awaitSeries(ctx, req)
}

func (q *Rf1Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
// buffer the channels to the # of calls they're expecting su
series := make(chan [][]logproto.SeriesIdentifier, 1)
errs := make(chan error, 1)

go func() {
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups(), req.Shards)
if err != nil {
errs <- err
return
}
series <- [][]logproto.SeriesIdentifier{storeValues}
}()

var sets [][]logproto.SeriesIdentifier
for i := 0; i < 2; i++ {
select {
case err := <-errs:
return nil, err
case s := <-series:
sets = append(sets, s...)
}
}

response := &logproto.SeriesResponse{
Series: make([]logproto.SeriesIdentifier, 0),
}
seen := make(map[uint64]struct{})
b := make([]byte, 0, 1024)
for _, set := range sets {
for _, s := range set {
key := s.Hash(b)
if _, exists := seen[key]; !exists {
seen[key] = struct{}{}
response.Series = append(response.Series, s)
}
}
}

return response, nil
}

// seriesForMatchers fetches series from the store for each matcher set
// TODO: make efficient if/when the index supports labels so we don't have to read chunks
func (q *Rf1Querier) seriesForMatchers(
ctx context.Context,
from, through time.Time,
groups []string,
shards []string,
) ([]logproto.SeriesIdentifier, error) {
var results []logproto.SeriesIdentifier
// If no matchers were specified for the series query,
// we send a query with an empty matcher which will match every series.
if len(groups) == 0 {
var err error
results, err = q.seriesForMatcher(ctx, from, through, "", shards)
if err != nil {
return nil, err
}
} else {
for _, group := range groups {
ids, err := q.seriesForMatcher(ctx, from, through, group, shards)
if err != nil {
return nil, err
}
results = append(results, ids...)
}
}
return results, nil
}

// seriesForMatcher fetches series from the store for a given matcher
func (q *Rf1Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string, shards []string) ([]logproto.SeriesIdentifier, error) {
var parsed syntax.Expr
var err error
if matcher != "" {
parsed, err = syntax.ParseExpr(matcher)
if err != nil {
return nil, err
}
}

ids, err := q.store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: matcher,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
Shards: shards,
Plan: &plan.QueryPlan{
AST: parsed,
},
},
})
if err != nil {
return nil, err
}
return ids, nil
return q.walQuerier.Series(ctx, req)
}

func (q *Rf1Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) {
Expand Down
Loading
Loading