Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[storage] Remove dependency on archive flag in ES reader #6490

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (f *Factory) getArchiveClient() es.Client {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.logger, f.tracer)
sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, f.logger, f.tracer, "", f.primaryConfig.UseReadWriteAliases)
if err != nil {
return nil, err
}
Expand All @@ -191,7 +191,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, f.metricsFactory, f.logger, "", f.primaryConfig.UseReadWriteAliases)
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -204,7 +204,11 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.logger, f.tracer)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
readAlias := "archive"
if f.archiveConfig.UseReadWriteAliases {
readAlias += "-read"
}
sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, readAlias, true)
if err != nil {
return nil, err
}
Expand All @@ -223,22 +227,27 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
writeAlias := "archive"
if f.archiveConfig.UseReadWriteAliases {
writeAlias += "-write"
}
archiveMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, archiveMetricsFactory, f.logger)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, archiveMetricsFactory, f.logger, writeAlias, true)
}

func createSpanReader(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
logger *zap.Logger,
tp trace.TracerProvider,
readAlias string,
useReadWriteAliases bool,
) (spanstore.Reader, error) {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, errors.New("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
Expand All @@ -251,8 +260,8 @@ func createSpanReader(
SpanIndex: cfg.Indices.Spans,
ServiceIndex: cfg.Indices.Services,
TagDotReplacement: cfg.Tags.DotReplacement,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
UseReadWriteAliases: useReadWriteAliases,
ReadAliasSuffix: readAlias,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
Tracer: tp.Tracer("esSpanStore.SpanReader"),
Expand All @@ -262,9 +271,10 @@ func createSpanReader(
func createSpanWriter(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
writeAlias string,
useReadWriteAliases bool,
) (spanstore.Writer, error) {
var tags []string
var err error
Expand All @@ -284,8 +294,8 @@ func createSpanWriter(
AllTagsAsFields: cfg.Tags.AllAsFields,
TagKeysAsFields: tags,
TagDotReplacement: cfg.Tags.DotReplacement,
Archive: archive,
UseReadWriteAliases: cfg.UseReadWriteAliases,
UseReadWriteAliases: useReadWriteAliases,
WriteAlias: writeAlias,
Logger: logger,
MetricsFactory: mFactory,
ServiceCacheTTL: cfg.ServiceCacheTTL,
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestArchiveDisabled(t *testing.T) {
func TestArchiveEnabled(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{}
f.archiveConfig = &escfg.Configuration{Enabled: true}
f.archiveConfig = &escfg.Configuration{Enabled: true, UseReadWriteAliases: true}
f.newClientFn = (&mockClientBuilder{}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
Expand Down
5 changes: 0 additions & 5 deletions plugin/storage/es/spanstore/index_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,3 @@ func indexWithDate(indexPrefix, indexDateLayout string, date time.Time) string {
spanDate := date.UTC().Format(indexDateLayout)
return indexPrefix + spanDate
}

// returns archive index name
func archiveIndex(indexPrefix, archiveSuffix string) string {
return indexPrefix + archiveSuffix
}
64 changes: 28 additions & 36 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ import (
)

const (
spanIndexBaseName = "jaeger-span-"
serviceIndexBaseName = "jaeger-service-"
archiveIndexSuffix = "archive"
archiveReadIndexSuffix = archiveIndexSuffix + "-read"
archiveWriteIndexSuffix = archiveIndexSuffix + "-write"
traceIDAggregation = "traceIDs"
indexPrefixSeparator = "-"
spanIndexBaseName = "jaeger-span-"
serviceIndexBaseName = "jaeger-service-"
traceIDAggregation = "traceIDs"
indexPrefixSeparator = "-"

traceIDField = "traceID"
durationField = "duration"
Expand All @@ -51,7 +48,7 @@ const (

defaultNumTraces = 100

rolloverMaxSpanAge = time.Hour * 24 * 365 * 50
dawnOfTimeSpanAge = time.Hour * 24 * 365 * 50
)

var (
Expand Down Expand Up @@ -109,7 +106,7 @@ type SpanReaderParams struct {
SpanIndex cfg.IndexOptions
ServiceIndex cfg.IndexOptions
TagDotReplacement string
Archive bool
ReadAliasSuffix string
UseReadWriteAliases bool
RemoteReadClusters []string
Logger *zap.Logger
Expand All @@ -119,10 +116,16 @@ type SpanReaderParams struct {
// NewSpanReader returns a new SpanReader with a metrics.
func NewSpanReader(p SpanReaderParams) *SpanReader {
maxSpanAge := p.MaxSpanAge
readAlias := ""
// Setting the maxSpanAge to a large duration will ensure all spans in the "read" alias are accessible by queries (query window = [now - maxSpanAge, now]).
// When read/write aliases are enabled, which are required for index rollovers, only the "read" alias is queried and therefore should not affect performance.
if p.UseReadWriteAliases {
maxSpanAge = rolloverMaxSpanAge
maxSpanAge = dawnOfTimeSpanAge
if p.ReadAliasSuffix != "" {
readAlias = p.ReadAliasSuffix
} else {
readAlias = "read"
}
}

return &SpanReader{
Expand All @@ -136,9 +139,12 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
timeRangeIndices: getLoggingTimeRangeIndexFn(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getLoggingTimeRangeIndexFn is a decorator here. Below we use another decorator addRemoteReadClusters, which is also always applied. I recommend moving addRemoteReadClusters to be attached here, instead of inside getTimeRangeIndexFn, to make getTimeRangeIndexFn easier to read.

p.Logger,
getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters),
addRemoteReadClusters(
getTimeRangeIndexFn(p.UseReadWriteAliases, readAlias),
p.RemoteReadClusters,
),
),
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
sourceFn: getSourceFn(p.MaxDocCount),
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
logger: p.Logger,
Expand All @@ -161,24 +167,13 @@ func getLoggingTimeRangeIndexFn(logger *zap.Logger, fn timeRangeIndexFn) timeRan
}
}

func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn {
if archive {
var archiveSuffix string
if useReadWriteAliases {
archiveSuffix = archiveReadIndexSuffix
} else {
archiveSuffix = archiveIndexSuffix
}
return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{archiveIndex(indexPrefix, archiveSuffix)}
}, remoteReadClusters)
}
func getTimeRangeIndexFn(useReadWriteAliases bool, readAlias string) timeRangeIndexFn {
if useReadWriteAliases {
return addRemoteReadClusters(func(indexPrefix string, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{indexPrefix + "read"}
}, remoteReadClusters)
return func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{indexPrefix + readAlias}
}
}
return addRemoteReadClusters(timeRangeIndices, remoteReadClusters)
return timeRangeIndices
}

// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices.
Expand All @@ -201,16 +196,13 @@ func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) tim
}
}

func getSourceFn(archive bool, maxDocCount int) sourceFn {
func getSourceFn(maxDocCount int) sourceFn {
return func(query elastic.Query, nextTime uint64) *elastic.SearchSource {
s := elastic.NewSearchSource().
return elastic.NewSearchSource().
Query(query).
Size(maxDocCount)
if !archive {
s.Sort("startTime", true).
SearchAfter(nextTime)
}
return s
Size(maxDocCount).
Sort("startTime", true).
SearchAfter(nextTime)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
76 changes: 22 additions & 54 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func withArchiveSpanReader(t *testing.T, readAlias bool, fn func(r *spanReaderTe
Tracer: tracer.Tracer("test"),
MaxSpanAge: 0,
TagDotReplacement: "@",
Archive: true,
ReadAliasSuffix: "archive",
UseReadWriteAliases: readAlias,
}),
}
Expand Down Expand Up @@ -199,7 +199,6 @@ func TestSpanReaderIndices(t *testing.T) {
}{
{
params: SpanReaderParams{
Archive: false,
SpanIndex: spanIndexOpts,
ServiceIndex: serviceIndexOpts,
},
Expand All @@ -213,7 +212,12 @@ func TestSpanReaderIndices(t *testing.T) {
},
{
params: SpanReaderParams{
Archive: false,
ReadAliasSuffix: "archive", // ignored because ReadWriteAliases is false
},
indices: []string{spanIndexBaseName, serviceIndexBaseName},
},
{
params: SpanReaderParams{
SpanIndex: spanIndexOpts,
ServiceIndex: serviceIndexOpts,
IndexPrefix: "foo:",
Expand All @@ -228,27 +232,21 @@ func TestSpanReaderIndices(t *testing.T) {
},
{
params: SpanReaderParams{
Archive: true,
},
indices: []string{spanIndexBaseName + archiveIndexSuffix, serviceIndexBaseName + archiveIndexSuffix},
},
{
params: SpanReaderParams{
SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true,
ReadAliasSuffix: "archive",
UseReadWriteAliases: true,
},
indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveIndexSuffix, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + archiveIndexSuffix},
indices: []string{spanIndexBaseName + "archive", serviceIndexBaseName + "archive"},
},
{
params: SpanReaderParams{
SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true,
SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", UseReadWriteAliases: true, ReadAliasSuffix: "archive",
},
indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveReadIndexSuffix, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + archiveReadIndexSuffix},
indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + "archive"},
},
{
params: SpanReaderParams{
SpanIndex: spanIndexOpts,
ServiceIndex: serviceIndexOpts,
Archive: false,
RemoteReadClusters: []string{"cluster_one", "cluster_two"},
},
indices: []string{
Expand All @@ -262,20 +260,20 @@ func TestSpanReaderIndices(t *testing.T) {
},
{
params: SpanReaderParams{
Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
UseReadWriteAliases: true, ReadAliasSuffix: "archive", RemoteReadClusters: []string{"cluster_one", "cluster_two"},
},
indices: []string{
spanIndexBaseName + archiveIndexSuffix,
"cluster_one:" + spanIndexBaseName + archiveIndexSuffix,
"cluster_two:" + spanIndexBaseName + archiveIndexSuffix,
serviceIndexBaseName + archiveIndexSuffix,
"cluster_one:" + serviceIndexBaseName + archiveIndexSuffix,
"cluster_two:" + serviceIndexBaseName + archiveIndexSuffix,
spanIndexBaseName + "archive",
"cluster_one:" + spanIndexBaseName + "archive",
"cluster_two:" + spanIndexBaseName + "archive",
serviceIndexBaseName + "archive",
"cluster_one:" + serviceIndexBaseName + "archive",
"cluster_two:" + serviceIndexBaseName + "archive",
},
},
{
params: SpanReaderParams{
Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
},
indices: []string{
spanIndexBaseName + "read",
Expand All @@ -286,19 +284,6 @@ func TestSpanReaderIndices(t *testing.T) {
"cluster_two:" + serviceIndexBaseName + "read",
},
},
{
params: SpanReaderParams{
Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
},
indices: []string{
spanIndexBaseName + archiveReadIndexSuffix,
"cluster_one:" + spanIndexBaseName + archiveReadIndexSuffix,
"cluster_two:" + spanIndexBaseName + archiveReadIndexSuffix,
serviceIndexBaseName + archiveReadIndexSuffix,
"cluster_one:" + serviceIndexBaseName + archiveReadIndexSuffix,
"cluster_two:" + serviceIndexBaseName + archiveReadIndexSuffix,
},
},
}
for _, testCase := range testCases {
testCase.params.Client = clientFn
Expand Down Expand Up @@ -1267,30 +1252,13 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) {
}

func TestSpanReader_ArchiveTraces(t *testing.T) {
withArchiveSpanReader(t, false, func(r *spanReaderTest) {
mockSearchService(r).
Return(&elastic.SearchResult{}, nil)
mockArchiveMultiSearchService(r, "jaeger-span-archive").
Return(&elastic.MultiSearchResult{
Responses: []*elastic.SearchResult{},
}, nil)
query := spanstore.GetTraceParameters{}
trace, err := r.reader.GetTrace(context.Background(), query)
require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded")
require.Nil(t, trace)
require.EqualError(t, err, "trace not found")
})
}

func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this test? It seems it was counting on different index names, so useful to keep as backwards compatibility guard.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Its because we don't concatenate the two inside the reader anymore. That logic is handled by the factory. If use_aliases is set and the read alias suffix is set, then we just use that suffix instead of concatenating the two.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we model that by altering withArchiveSpanReader constructor? Specifically to test the condition where the indices become ***archive vs ***archive-read (which I understand happens depending on use_aliases being set or not). You don't have any significant changes to the tests for the factory, but that's where the distinction is being captured.

Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro That would just be a difference of changing ArchiveIndexSuffix to be archive vs archive-read because the span reader just respects what is passed in today. This test would need to go into the factory but there isn't a nice way to do that right now because the reader is a concrete type so we can't use a mock to see what is passed into the createSpanReader function. We could make some changes to make the create functions parameters so they can be injected for testing.

withArchiveSpanReader(t, true, func(r *spanReaderTest) {
mockSearchService(r).
Return(&elastic.SearchResult{}, nil)
mockArchiveMultiSearchService(r, "jaeger-span-archive-read").
mockArchiveMultiSearchService(r, "jaeger-span-archive").
Return(&elastic.MultiSearchResult{
Responses: []*elastic.SearchResult{},
}, nil)

query := spanstore.GetTraceParameters{}
trace, err := r.reader.GetTrace(context.Background(), query)
require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded")
Expand Down Expand Up @@ -1342,7 +1310,7 @@ func TestBuildTraceByIDQuery(t *testing.T) {
}

func TestTerminateAfterNotSet(t *testing.T) {
srcFn := getSourceFn(false, 99)
srcFn := getSourceFn(99)
searchSource := srcFn(elastic.NewMatchAllQuery(), 1)
sp, err := searchSource.Source()
require.NoError(t, err)
Expand Down
Loading
Loading