diff --git a/cmd/anonymizer/app/query/query_test.go b/cmd/anonymizer/app/query/query_test.go index 86b7fd55ed8..ea6797e9e89 100644 --- a/cmd/anonymizer/app/query/query_test.go +++ b/cmd/anonymizer/app/query/query_test.go @@ -7,6 +7,7 @@ import ( "net" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -77,6 +78,7 @@ func newTestServer(t *testing.T) *testServer { assert.NoError(t, server.Serve(lis)) exited.Done() }() + time.Sleep(time.Second) t.Cleanup(func() { server.Stop() exited.Wait() // don't allow test to finish before server exits diff --git a/cmd/jaeger/internal/index_cleaner.go b/cmd/jaeger/internal/index_cleaner.go new file mode 100644 index 00000000000..f1c75e0feb2 --- /dev/null +++ b/cmd/jaeger/internal/index_cleaner.go @@ -0,0 +1,26 @@ +package internal + +import ( + "log" + + "github.com/jaegertracing/jaeger/pkg/es/config" + "github.com/spf13/viper" +) + +// StartCleaner runs the index cleaner if enabled and handles config loading +func StartCleaner(v *viper.Viper) { + var cleanerConfig config.Cleaner + if err := v.UnmarshalKey("cleaner", &cleanerConfig); err != nil { + log.Printf("Error loading cleaner configuration: %v", err) + return + } + + cfg := &config.Configuration{ + Cleaner: cleanerConfig, + } + + // If cleaner is enabled, run the cleaner + if cfg.Cleaner.Enabled { + go cfg.RunCleaner() + } +} diff --git a/cmd/jaeger/main.go b/cmd/jaeger/main.go index d63268845e7..a3baccb6356 100644 --- a/cmd/jaeger/main.go +++ b/cmd/jaeger/main.go @@ -24,6 +24,8 @@ func main() { command, ) + internal.StartCleaner(v) + if err := command.Execute(); err != nil { log.Fatal(err) } diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index d0ae9c6ad3d..1d4b21c351b 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -8,8 +8,11 @@ import ( "bufio" "context" "crypto/tls" + "encoding/json" "errors" "fmt" + "io" + "log" "net/http" "os" "path/filepath" @@ -37,6 +40,22 @@ const ( IndexPrefixSeparator = "-" ) +// Cleaner struct for configuring index cleaning +type Cleaner struct { + Enabled bool `mapstructure:"enabled"` + Frequency time.Duration `mapstructure:"frequency"` + IncludeArchive bool `mapstructure:"include_archive"` +} + +// Validation function for Cleaner struct +func (c *Cleaner) Validate() error { + if c.Frequency <= 0 { + return fmt.Errorf("frequency must be greater than 0") + } + + return nil +} + // IndexOptions describes the index format and rollover frequency type IndexOptions struct { // Priority contains the priority of index template (ESv8 only). @@ -136,6 +155,10 @@ type Configuration struct { Tags TagsAsFields `mapstructure:"tags_as_fields"` // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. Enabled bool `mapstructure:"-"` + + // ---- ES-specific config ---- + // Cleaner holds the configuration for the Elasticsearch index Cleaner. + Cleaner Cleaner `mapstructure:"cleaner"` } // TagsAsFields holds configuration for tag schema. @@ -585,3 +608,198 @@ func (c *Configuration) Validate() error { _, err := govalidator.ValidateStruct(c) return err } + +// Run starts the cleaner that runs periodically based on the Frequency field. +func (cfg *Configuration) RunCleaner() { + if !cfg.Cleaner.Enabled { + log.Println("Cleaner is disabled, skipping...") + return + } + + ticker := time.NewTicker(cfg.Cleaner.Frequency) + defer ticker.Stop() + + // Use for range to iterate over ticker.C channel + for range ticker.C { + log.Println("Running index cleaner...") + err := cfg.cleanIndices() + if err != nil { + log.Printf("Error cleaning indices: %v", err) + } + } +} + +// cleanIndices connects to ElasticSearch and deletes the indices based on the configuration. +func (cfg *Configuration) cleanIndices() error { + log.Println("Cleaning Elasticsearch indices...") + + // Delete old indices (older than max span age) + err := cfg.deleteOldIndices() + if err != nil { + return fmt.Errorf("failed to delete old indices: %w", err) + } + + // If IncludeArchive is true, also delete archived indices + if cfg.Cleaner.IncludeArchive { + err = cfg.deleteArchivedIndices() + if err != nil { + return fmt.Errorf("failed to delete archived indices: %w", err) + } + } + + return nil +} + +// deleteOldIndices deletes indices older than the configured MaxSpanAge. +func (cfg *Configuration) deleteOldIndices() error { + esClient, err := esV8.NewClient( + esV8.Config{Addresses: cfg.Servers}, + ) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + + indexNames, err := fetchAllIndexNames(esClient) + if err != nil { + return fmt.Errorf("failed to fetch indices: %w", err) + } + + for _, indexName := range indexNames { + log.Printf("Checking index: %s", indexName) + creationDate, err := fetchIndexCreationDate(cfg.Servers[0], indexName) + if err != nil { + log.Printf("Failed to get creation date for index %s: %v", indexName, err) + continue + } + + if time.Since(creationDate) > cfg.MaxSpanAge { + log.Printf("Index %s is older than MaxSpanAge, deleting...", indexName) + if err := deleteIndex(esClient, indexName); err != nil { + log.Printf("Failed to delete index: %s, error: %v", indexName, err) + } else { + log.Printf("Successfully deleted index: %s", indexName) + } + } + } + + return nil +} + +// fetchAllIndexNames retrieves all index names from the Elasticsearch cluster. +func fetchAllIndexNames(esClient *esV8.Client) ([]string, error) { + res, err := esClient.Indices.GetAlias() + if err != nil { + return nil, err + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + var indicesMap map[string]interface{} + if err := json.Unmarshal(body, &indicesMap); err != nil { + return nil, err + } + + var indexNames []string + for indexName := range indicesMap { + indexNames = append(indexNames, indexName) + } + + return indexNames, nil +} + +// fetchIndexCreationDate fetches the creation date of the given index. +func fetchIndexCreationDate(serverURL, indexName string) (time.Time, error) { + url := fmt.Sprintf("%s/%s/_settings", serverURL, indexName) + resp, err := http.Get(url) + if err != nil { + return time.Time{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return time.Time{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return time.Time{}, err + } + + var settingsMap map[string]interface{} + if err := json.Unmarshal(body, &settingsMap); err != nil { + return time.Time{}, err + } + + creationDateStr, ok := settingsMap[indexName].(map[string]interface{})["settings"].(map[string]interface{})["index.creation_date"].(string) + if !ok { + return time.Time{}, fmt.Errorf("creation date not found for index %s", indexName) + } + + creationDateMillis, err := strconv.ParseInt(creationDateStr, 10, 64) + if err != nil { + return time.Time{}, err + } + + return time.Unix(0, creationDateMillis*int64(time.Millisecond)), nil +} + +// deleteIndex deletes the given index. +func deleteIndex(esClient *esV8.Client, indexName string) error { + _, err := esClient.Indices.Delete([]string{indexName}) + return err +} + +// deleteArchivedIndices simulates deletion of archived indices if includeArchive is true. +func (cfg *Configuration) deleteArchivedIndices() error { + if !cfg.Cleaner.IncludeArchive { + log.Printf("Skipping archived indices deletion because IncludeArchive is false.") + return nil + } + + log.Println("Deleting archived indices...") + + // Use the servers from the configuration + esClient, err := esV8.NewClient(esV8.Config{ + Addresses: cfg.Servers, // Dynamically use configured servers + }) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + + // Fetch all indices from Elasticsearch + res, err := esClient.Indices.GetAlias() + if err != nil { + return fmt.Errorf("failed to fetch indices: %w", err) + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + var indicesMap map[string]interface{} + err = json.Unmarshal(body, &indicesMap) + if err != nil { + return fmt.Errorf("failed to unmarshal response body: %w", err) + } + + for indexName := range indicesMap { + if len(indexName) > 8 && indexName[:8] == "archive_" { + log.Printf("Found archived index: %s", indexName) + + _, err = esClient.Indices.Delete([]string{indexName}) + if err != nil { + log.Printf("Failed to delete archived index: %s, error %v", indexName, err) + } else { + log.Printf("Successfully deleted archived index: %s", indexName) + } + } + } + + return nil +} diff --git a/pkg/es/config/config_test.go b/pkg/es/config/config_test.go index c295d37b70c..1d3ed952d57 100644 --- a/pkg/es/config/config_test.go +++ b/pkg/es/config/config_test.go @@ -799,3 +799,33 @@ func TestApplyForIndexPrefix(t *testing.T) { func TestMain(m *testing.M) { testutils.VerifyGoLeaks(m) } + +func TestCleanerValidate(t *testing.T) { + tests := []struct { + name string + cleaner *Cleaner + wantErr bool + }{ + { + name: "Valid Cleaner", + cleaner: &Cleaner{Enabled: true, Frequency: time.Minute, IncludeArchive: true}, + wantErr: false, + }, + { + name: "Invalid Frequency", + cleaner: &Cleaner{Enabled: true, Frequency: 0, IncludeArchive: true}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cleaner.Validate() + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +}