Skip to content

Commit

Permalink
feature: Implement labels & series endpoints for RF1 queriers
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed Aug 27, 2024
1 parent 115fef4 commit c99b85e
Show file tree
Hide file tree
Showing 4 changed files with 543 additions and 141 deletions.
6 changes: 3 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,9 +1855,6 @@ func (t *Loki) initMetastore() (services.Service, error) {
if !t.Cfg.IngesterRF1.Enabled {
return nil, nil
}
if t.Cfg.isTarget(All) {
t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%d", t.Cfg.Server.GRPCListenPort)
}
m, err := metastore.New(t.Cfg.Metastore, log.With(util_log.Logger, "component", "metastore"), prometheus.DefaultRegisterer, t.health)
if err != nil {
return nil, err
Expand All @@ -1872,6 +1869,9 @@ func (t *Loki) initMetastoreClient() (services.Service, error) {
if !t.Cfg.IngesterRF1.Enabled && !t.Cfg.QuerierRF1.Enabled {
return nil, nil
}
if t.Cfg.isTarget(All) {
t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%d", t.Cfg.Server.GRPCListenPort)
}
mc, err := metastoreclient.New(t.Cfg.MetastoreClient, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
Expand Down
140 changes: 3 additions & 137 deletions pkg/querier-rf1/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type Rf1Querier struct {
deleteGetter deleteGetter
logger log.Logger
patternQuerier PatterQuerier
walQuerier logql.Querier
walQuerier querier.Querier
}

type deleteGetter interface {
Expand Down Expand Up @@ -214,49 +214,16 @@ func (q *Rf1Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*lo
if err != nil {
return nil, err
}

if *req.Start, *req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil {
return nil, err
}

var matchers []*labels.Matcher
if req.Query != "" {
matchers, err = syntax.ParseMatchers(req.Query, true)
if err != nil {
return nil, err
}
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(ctx, userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

g, ctx := errgroup.WithContext(ctx)

var storeValues []string
g.Go(func() error {
var (
err error
from = model.TimeFromUnixNano(req.Start.UnixNano())
through = model.TimeFromUnixNano(req.End.UnixNano())
)

if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...)
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...)
}
return err
})

if err := g.Wait(); err != nil {
return nil, err
}

return &logproto.LabelResponse{
Values: storeValues,
}, nil
return q.walQuerier.Label(ctx, req)
}

// Check implements the grpc healthcheck
Expand Down Expand Up @@ -285,108 +252,7 @@ func (q *Rf1Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*
ctx, cancel := context.WithDeadlineCause(ctx, time.Now().Add(queryTimeout), errors.New("query timeout reached"))
defer cancel()

return q.awaitSeries(ctx, req)
}

func (q *Rf1Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
// buffer the channels to the # of calls they're expecting su
series := make(chan [][]logproto.SeriesIdentifier, 1)
errs := make(chan error, 1)

go func() {
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups(), req.Shards)
if err != nil {
errs <- err
return
}
series <- [][]logproto.SeriesIdentifier{storeValues}
}()

var sets [][]logproto.SeriesIdentifier
for i := 0; i < 2; i++ {
select {
case err := <-errs:
return nil, err
case s := <-series:
sets = append(sets, s...)
}
}

response := &logproto.SeriesResponse{
Series: make([]logproto.SeriesIdentifier, 0),
}
seen := make(map[uint64]struct{})
b := make([]byte, 0, 1024)
for _, set := range sets {
for _, s := range set {
key := s.Hash(b)
if _, exists := seen[key]; !exists {
seen[key] = struct{}{}
response.Series = append(response.Series, s)
}
}
}

return response, nil
}

// seriesForMatchers fetches series from the store for each matcher set
// TODO: make efficient if/when the index supports labels so we don't have to read chunks
func (q *Rf1Querier) seriesForMatchers(
ctx context.Context,
from, through time.Time,
groups []string,
shards []string,
) ([]logproto.SeriesIdentifier, error) {
var results []logproto.SeriesIdentifier
// If no matchers were specified for the series query,
// we send a query with an empty matcher which will match every series.
if len(groups) == 0 {
var err error
results, err = q.seriesForMatcher(ctx, from, through, "", shards)
if err != nil {
return nil, err
}
} else {
for _, group := range groups {
ids, err := q.seriesForMatcher(ctx, from, through, group, shards)
if err != nil {
return nil, err
}
results = append(results, ids...)
}
}
return results, nil
}

// seriesForMatcher fetches series from the store for a given matcher
func (q *Rf1Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string, shards []string) ([]logproto.SeriesIdentifier, error) {
var parsed syntax.Expr
var err error
if matcher != "" {
parsed, err = syntax.ParseExpr(matcher)
if err != nil {
return nil, err
}
}

ids, err := q.store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: matcher,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
Shards: shards,
Plan: &plan.QueryPlan{
AST: parsed,
},
},
})
if err != nil {
return nil, err
}
return ids, nil
return q.walQuerier.Series(ctx, req)
}

func (q *Rf1Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) {
Expand Down
Loading

0 comments on commit c99b85e

Please sign in to comment.