Skip to content

Commit

Permalink
Add configurable index data layout and rollover frequency
Browse files Browse the repository at this point in the history
Signed-off-by: Jared Tan <[email protected]>
  • Loading branch information
JaredTan95 committed Sep 2, 2024
1 parent 7df6975 commit 47c0a63
Show file tree
Hide file tree
Showing 26 changed files with 655 additions and 534 deletions.
16 changes: 5 additions & 11 deletions cmd/es-rollover/app/init/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,11 @@ type Action struct {

func (c Action) getMapping(version uint, templateName string) (string, error) {
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate),
PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate),
PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate),
PrioritySamplingTemplate: int64(c.Config.PrioritySamplingTemplate),
Shards: int64(c.Config.Shards),
Replicas: int64(c.Config.Replicas),
IndexPrefix: c.Config.IndexPrefix,
UseILM: c.Config.UseILM,
ILMPolicyName: c.Config.ILMPolicyName,
EsVersion: version,
TemplateBuilder: es.TextTemplateBuilder{},
Indices: c.Config.Indices,
UseILM: c.Config.UseILM,
ILMPolicyName: c.Config.ILMPolicyName,
EsVersion: version,
}
return mappingBuilder.GetMapping(templateName)
}
Expand Down
28 changes: 16 additions & 12 deletions cmd/es-rollover/app/init/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/es-rollover/app"
cfg "github.com/jaegertracing/jaeger/pkg/es/config"
)

const (
Expand All @@ -23,12 +24,7 @@ const (
// Config holds configuration for index cleaner binary.
type Config struct {
app.Config
Shards int
Replicas int
PrioritySpanTemplate int
PriorityServiceTemplate int
PriorityDependenciesTemplate int
PrioritySamplingTemplate int
cfg.Indices
}

// AddFlags adds flags for TLS to the FlagSet.
Expand All @@ -43,10 +39,18 @@ func (*Config) AddFlags(flags *flag.FlagSet) {

// InitFromViper initializes config from viper.Viper.
func (c *Config) InitFromViper(v *viper.Viper) {
c.Shards = v.GetInt(shards)
c.Replicas = v.GetInt(replicas)
c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate)
c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate)
c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate)
c.PrioritySamplingTemplate = v.GetInt(prioritySamplingTemplate)
c.Indices.Spans.Shards = v.GetInt(shards)
c.Indices.Services.Shards = v.GetInt(shards)
c.Indices.Dependencies.Shards = v.GetInt(shards)
c.Indices.Sampling.Shards = v.GetInt(shards)

c.Indices.Spans.Replicas = v.GetInt(replicas)
c.Indices.Services.Replicas = v.GetInt(replicas)
c.Indices.Dependencies.Replicas = v.GetInt(replicas)
c.Indices.Sampling.Replicas = v.GetInt(replicas)

c.Indices.Spans.Priority = v.GetInt(prioritySpanTemplate)
c.Indices.Services.Priority = v.GetInt(priorityServiceTemplate)
c.Indices.Dependencies.Priority = v.GetInt(priorityDependenciesTemplate)
c.Indices.Sampling.Priority = v.GetInt(prioritySamplingTemplate)
}
12 changes: 6 additions & 6 deletions cmd/es-rollover/app/init/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func TestBindFlags(t *testing.T) {
require.NoError(t, err)

c.InitFromViper(v)
assert.Equal(t, 8, c.Shards)
assert.Equal(t, 16, c.Replicas)
assert.Equal(t, 300, c.PrioritySpanTemplate)
assert.Equal(t, 301, c.PriorityServiceTemplate)
assert.Equal(t, 302, c.PriorityDependenciesTemplate)
assert.Equal(t, 303, c.PrioritySamplingTemplate)
assert.Equal(t, 8, c.Indices.Spans.Shards)
assert.Equal(t, 16, c.Indices.Spans.Replicas)
assert.Equal(t, 300, c.Indices.Spans.Priority)
assert.Equal(t, 301, c.Indices.Services.Priority)
assert.Equal(t, 302, c.Indices.Dependencies.Priority)
assert.Equal(t, 303, c.Indices.Sampling.Priority)
}
8 changes: 4 additions & 4 deletions cmd/esmapping-generator/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type Options struct {
Mapping string
EsVersion uint
Shards int64
Replicas int64
Shards int
Replicas int
IndexPrefix string
UseILM string // using string as util is being used in python and using bool leads to type issues.
ILMPolicyName string
Expand Down Expand Up @@ -40,12 +40,12 @@ func (o *Options) AddFlags(command *cobra.Command) {
esVersionFlag,
7,
"The major Elasticsearch version")
command.Flags().Int64Var(
command.Flags().IntVar(
&o.Shards,
shardsFlag,
5,
"The number of shards per index in Elasticsearch")
command.Flags().Int64Var(
command.Flags().IntVar(
&o.Replicas,
replicasFlag,
1,
Expand Down
9 changes: 5 additions & 4 deletions cmd/esmapping-generator/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ func TestOptionsWithDefaultFlags(t *testing.T) {

assert.Equal(t, "", o.Mapping)
assert.Equal(t, uint(7), o.EsVersion)
assert.Equal(t, int64(5), o.Shards)
assert.Equal(t, int64(1), o.Replicas)
assert.Equal(t, 5, o.Shards)
assert.Equal(t, 1, o.Replicas)

assert.Equal(t, "", o.IndexPrefix)
assert.Equal(t, "false", o.UseILM)
assert.Equal(t, "jaeger-ilm-policy", o.ILMPolicyName)
Expand All @@ -44,8 +45,8 @@ func TestOptionsWithFlags(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "jaeger-span", o.Mapping)
assert.Equal(t, uint(7), o.EsVersion)
assert.Equal(t, int64(5), o.Shards)
assert.Equal(t, int64(1), o.Replicas)
assert.Equal(t, 5, o.Shards)
assert.Equal(t, 1, o.Replicas)
assert.Equal(t, "test", o.IndexPrefix)
assert.Equal(t, "true", o.UseILM)
assert.Equal(t, "jaeger-test-policy", o.ILMPolicyName)
Expand Down
21 changes: 15 additions & 6 deletions cmd/esmapping-generator/app/renderer/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/esmapping-generator/app"
"github.com/jaegertracing/jaeger/pkg/es"
cfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/plugin/storage/es/mappings"
)

Expand All @@ -25,14 +26,22 @@ func GetMappingAsString(builder es.TemplateBuilder, opt *app.Options) (string, e
return "", err
}

indexOpts := cfg.IndexOptions{
Priority: 0,
Shards: opt.Shards,
Replicas: opt.Shards,
}
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: builder,
Shards: opt.Shards,
Replicas: opt.Replicas,
EsVersion: opt.EsVersion,
IndexPrefix: opt.IndexPrefix,
UseILM: enableILM,
ILMPolicyName: opt.ILMPolicyName,
Indices: cfg.Indices{
Spans: indexOpts,
Services: indexOpts,
Dependencies: indexOpts,
Sampling: indexOpts,
},
EsVersion: opt.EsVersion,
UseILM: enableILM,
ILMPolicyName: opt.ILMPolicyName,
}
return mappingBuilder.GetMapping(opt.Mapping)
}
Expand Down
21 changes: 21 additions & 0 deletions cmd/jaeger/config-elasticsearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ extensions:
some_storage:
elasticsearch:
index_prefix: "jaeger-main"
indices:
spans:
date_layout: "2006-01-02"
rollover_frequency: "day"
shards: 5
replicas: 1
services:
date_layout: "2006-01-02"
rollover_frequency: "day"
shards: 5
replicas: 1
dependencies:
date_layout: "2006-01-02"
rollover_frequency: "day"
shards: 5
replicas: 1
sampling:
date_layout: "2006-01-02"
rollover_frequency: "day"
shards: 5
replicas: 1
another_storage:
elasticsearch:
index_prefix: "jaeger-archive"
Expand Down
152 changes: 80 additions & 72 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,55 @@ import (
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

// IndexOptions describes the index format and rollover frequency
type IndexOptions struct {
Prefix string `mapstructure:"prefix"`
Priority int `mapstructure:"priority"`
DateLayout string `mapstructure:"date_layout"`
Shards int `mapstructure:"shards"`
Replicas int `mapstructure:"replicas"`
RolloverFrequency string `mapstructure:"rollover_frequency"` // "hour" or "day"
}

// Indices describes different configuration options for each index type
type Indices struct {
Spans IndexOptions `mapstructure:"spans"`
Services IndexOptions `mapstructure:"services"`
Dependencies IndexOptions `mapstructure:"dependencies"`
Sampling IndexOptions `mapstructure:"sampling"`
}

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls" valid:"required,url"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
PasswordFilePath string `mapstructure:"password_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `mapstructure:"num_shards"`
NumReplicas int64 `mapstructure:"num_replicas"`
PrioritySpanTemplate int64 `mapstructure:"priority_span_template"`
PriorityServiceTemplate int64 `mapstructure:"priority_service_template"`
PriorityDependenciesTemplate int64 `mapstructure:"priority_dependencies_template"`
Timeout time.Duration `mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayoutSpans string `mapstructure:"-"`
IndexDateLayoutServices string `mapstructure:"-"`
IndexDateLayoutSampling string `mapstructure:"-"`
IndexDateLayoutDependencies string `mapstructure:"-"`
IndexRolloverFrequencySpans string `mapstructure:"-"`
IndexRolloverFrequencyServices string `mapstructure:"-"`
IndexRolloverFrequencySampling string `mapstructure:"-"`
ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"`
AdaptiveSamplingLookback time.Duration `mapstructure:"-"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
SendGetBodyAs string `mapstructure:"send_get_body_as"`
Servers []string `mapstructure:"server_urls" valid:"required,url"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
PasswordFilePath string `mapstructure:"password_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads
Timeout time.Duration `mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
Indices Indices `mapstructure:"indices"`
ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"`
AdaptiveSamplingLookback time.Duration `mapstructure:"-"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
SendGetBodyAs string `mapstructure:"send_get_body_as"`
}

// TagsAsFields holds configuration for tag schema.
Expand Down Expand Up @@ -207,6 +214,32 @@ func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, err
return esV8.NewClient(options)
}

func setDefaultIndexOptions(cfg, source *IndexOptions) {
if cfg.Shards == 0 {
cfg.Shards = source.Shards
}

if cfg.Replicas == 0 {
cfg.Replicas = source.Replicas
}

if cfg.Priority == 0 {
cfg.Priority = source.Priority
}

if cfg.DateLayout == "" {
cfg.DateLayout = source.DateLayout
}

if cfg.RolloverFrequency == "" {
cfg.RolloverFrequency = source.RolloverFrequency
}

if cfg.Prefix == "" {
cfg.Prefix = source.Prefix
}
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
func (c *Configuration) ApplyDefaults(source *Configuration) {
if len(c.RemoteReadClusters) == 0 {
Expand All @@ -227,21 +260,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.AdaptiveSamplingLookback == 0 {
c.AdaptiveSamplingLookback = source.AdaptiveSamplingLookback
}
if c.NumShards == 0 {
c.NumShards = source.NumShards
}
if c.NumReplicas == 0 {
c.NumReplicas = source.NumReplicas
}
if c.PrioritySpanTemplate == 0 {
c.PrioritySpanTemplate = source.PrioritySpanTemplate
}
if c.PriorityServiceTemplate == 0 {
c.PriorityServiceTemplate = source.PriorityServiceTemplate
}
if c.PrioritySpanTemplate == 0 {
c.PriorityDependenciesTemplate = source.PriorityDependenciesTemplate
}

setDefaultIndexOptions(&c.Indices.Spans, &source.Indices.Spans)
setDefaultIndexOptions(&c.Indices.Services, &source.Indices.Services)
setDefaultIndexOptions(&c.Indices.Dependencies, &source.Indices.Dependencies)

if c.BulkSize == 0 {
c.BulkSize = source.BulkSize
}
Expand Down Expand Up @@ -280,23 +303,8 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration {
return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySpans)
}

// GetIndexRolloverFrequencyServicesDuration returns jaeger-service index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duration {
return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencyServices)
}

// GetIndexRolloverFrequencySamplingDuration returns jaeger-sampling index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencySamplingDuration() time.Duration {
return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySampling)
}

// GetIndexRolloverFrequencyDuration returns the index rollover frequency duration for the given frequency string
func getIndexRolloverFrequencyDuration(frequency string) time.Duration {
// RolloverFrequencyAsNegativeDuration returns the index rollover frequency duration for the given frequency string
func RolloverFrequencyAsNegativeDuration(frequency string) time.Duration {
if frequency == "hour" {
return -1 * time.Hour
}
Expand Down
Loading

0 comments on commit 47c0a63

Please sign in to comment.