diff --git a/.github/workflows/ci-e2e-cassandra.yml b/.github/workflows/ci-e2e-cassandra.yml index 72a930156c9..3ca6709e012 100644 --- a/.github/workflows/ci-e2e-cassandra.yml +++ b/.github/workflows/ci-e2e-cassandra.yml @@ -22,6 +22,7 @@ jobs: fail-fast: false matrix: jaeger-version: [v1, v2] + create-schema: [manual, auto] version: - distribution: cassandra major: 4.x @@ -29,7 +30,11 @@ jobs: - distribution: cassandra major: 5.x schema: v004 - name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }} + exclude: + # Exclude v1 as create schema on fly is available for v2 only + - jaeger-version: v1 + create-schema: auto + name: ${{ matrix.version.distribution }}-${{ matrix.version.major }} ${{ matrix.jaeger-version }} schema=${{ matrix.create-schema }} steps: - name: Harden Runner uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1 @@ -45,9 +50,11 @@ jobs: - name: Run cassandra integration tests id: test-execution run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.major }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }} + env: + SKIP_APPLY_SCHEMA: ${{ matrix.create-schema == 'auto' && true || false }} - name: Upload coverage to codecov uses: ./.github/actions/upload-codecov with: files: cover.out - flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }} + flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}-${{ matrix.create-schema }} diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 0b7550535da..4b55737f624 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -34,6 +34,7 @@ extensions: cassandra: schema: keyspace: "jaeger_v1_dc1" + create: "${env:CASSANDRA_CREATE_SCHEMA:-true}" connection: auth: basic: @@ -44,7 +45,8 @@ extensions: another_storage: cassandra: schema: - keyspace: "jaeger_v1_dc1" + keyspace: "jaeger_v1_dc1_archive" + create: "${env:CASSANDRA_CREATE_SCHEMA:-true}" connection: auth: basic: diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 6bab9c75da4..ea4e839a519 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -6,15 +6,13 @@ package config import ( "context" + "errors" "fmt" "time" "github.com/asaskevich/govalidator" "github.com/gocql/gocql" "go.opentelemetry.io/collector/config/configtls" - - "github.com/jaegertracing/jaeger/pkg/cassandra" - gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" ) // Configuration describes the configuration properties needed to connect to a Cassandra cluster. @@ -58,6 +56,19 @@ type Schema struct { // while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB, // that do not support SnappyCompression. DisableCompression bool `mapstructure:"disable_compression"` + // CreateSchema tells if the schema ahould be created during session initialization based on the configs provided + CreateSchema bool `mapstructure:"create" valid:"optional"` + // Datacenter is the name for network topology + Datacenter string `mapstructure:"datacenter" valid:"optional"` + // TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second + TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional"` + // DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second + DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional"` + // Replication factor for the db + ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"` + // CompactionWindow is the size of the window for TimeWindowCompactionStrategy. + // All SSTables within that window are grouped together into one SSTable. + CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional"` } type Query struct { @@ -86,7 +97,13 @@ type BasicAuthenticator struct { func DefaultConfiguration() Configuration { return Configuration{ Schema: Schema{ - Keyspace: "jaeger_v1_test", + CreateSchema: false, + Keyspace: "jaeger_dc1", + Datacenter: "dc1", + TraceTTL: 2 * 24 * time.Hour, + DependenciesTTL: 2 * 24 * time.Hour, + ReplicationFactor: 1, + CompactionWindow: time.Minute, }, Connection: Connection{ Servers: []string{"127.0.0.1"}, @@ -106,6 +123,27 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.Schema.Keyspace == "" { c.Schema.Keyspace = source.Schema.Keyspace } + + if c.Schema.Datacenter == "" { + c.Schema.Datacenter = source.Schema.Datacenter + } + + if c.Schema.TraceTTL == 0 { + c.Schema.TraceTTL = source.Schema.TraceTTL + } + + if c.Schema.DependenciesTTL == 0 { + c.Schema.DependenciesTTL = source.Schema.DependenciesTTL + } + + if c.Schema.ReplicationFactor == 0 { + c.Schema.ReplicationFactor = source.Schema.ReplicationFactor + } + + if c.Schema.CompactionWindow == 0 { + c.Schema.CompactionWindow = source.Schema.CompactionWindow + } + if c.Connection.ConnectionsPerHost == 0 { c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost } @@ -129,24 +167,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { } } -// SessionBuilder creates new cassandra.Session -type SessionBuilder interface { - NewSession() (cassandra.Session, error) -} - -// NewSession creates a new Cassandra session -func (c *Configuration) NewSession() (cassandra.Session, error) { - cluster, err := c.NewCluster() - if err != nil { - return nil, err - } - session, err := cluster.CreateSession() - if err != nil { - return nil, err - } - return gocqlw.WrapCQLSession(session), nil -} - // NewCluster creates a new gocql cluster from the configuration func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { cluster := gocql.NewCluster(c.Connection.Servers...) @@ -210,7 +230,27 @@ func (c *Configuration) String() string { return fmt.Sprintf("%+v", *c) } +func isValidTTL(duration time.Duration) bool { + return duration == 0 || duration >= time.Second +} + func (c *Configuration) Validate() error { _, err := govalidator.ValidateStruct(c) - return err + if err != nil { + return err + } + + if !isValidTTL(c.Schema.TraceTTL) { + return errors.New("trace_ttl can either be 0 or greater than or equal to 1 second") + } + + if !isValidTTL(c.Schema.DependenciesTTL) { + return errors.New("dependencies_ttl can either be 0 or greater than or equal to 1 second") + } + + if c.Schema.CompactionWindow < time.Minute { + return errors.New("compaction_window should at least be 1 minute") + } + + return nil } diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go index 8d532f65c60..f1de4f6d55b 100644 --- a/pkg/cassandra/config/config_test.go +++ b/pkg/cassandra/config/config_test.go @@ -5,6 +5,7 @@ package config import ( "testing" + "time" "github.com/gocql/gocql" "github.com/stretchr/testify/assert" @@ -43,6 +44,9 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) { Connection: Connection{ Servers: []string{"localhost:9200"}, }, + Schema: Schema{ + CompactionWindow: time.Minute, + }, } err := cfg.Validate() @@ -94,3 +98,23 @@ func TestToString(t *testing.T) { s := cfg.String() assert.Contains(t, s, "Keyspace:test") } + +func TestConfigSchemaValidation(t *testing.T) { + cfg := DefaultConfiguration() + err := cfg.Validate() + require.NoError(t, err) + + cfg.Schema.TraceTTL = time.Millisecond + err = cfg.Validate() + require.Error(t, err) + + cfg.Schema.TraceTTL = time.Second + cfg.Schema.CompactionWindow = time.Minute - 1 + err = cfg.Validate() + require.Error(t, err) + + cfg.Schema.CompactionWindow = time.Minute + cfg.Schema.DependenciesTTL = time.Second - 1 + err = cfg.Validate() + require.Error(t, err) +} diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 4654a2a2c82..49c7a2a9c55 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -17,6 +17,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/cassandra" "github.com/jaegertracing/jaeger/pkg/cassandra/config" + gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" "github.com/jaegertracing/jaeger/pkg/distributedlock" "github.com/jaegertracing/jaeger/pkg/hostname" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -24,6 +25,7 @@ import ( cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra" cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore" cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema" cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore" "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage" @@ -55,17 +57,22 @@ type Factory struct { logger *zap.Logger tracer trace.TracerProvider - primaryConfig config.SessionBuilder + primaryConfig config.Configuration + archiveConfig *config.Configuration + primarySession cassandra.Session - archiveConfig config.SessionBuilder archiveSession cassandra.Session + + // tests can override this + sessionBuilderFn func(*config.Configuration) (cassandra.Session, error) } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ - tracer: otel.GetTracerProvider(), - Options: NewOptions(primaryStorageConfig, archiveStorageConfig), + tracer: otel.GetTracerProvider(), + Options: NewOptions(primaryStorageConfig, archiveStorageConfig), + sessionBuilderFn: NewSession, } } @@ -126,9 +133,7 @@ func (f *Factory) configureFromOptions(o *Options) { o.others = make(map[string]*NamespaceConfig) } f.primaryConfig = o.GetPrimary() - if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { - f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error - } + f.archiveConfig = f.Options.Get(archiveStorageConfig) } // Initialize implements storage.Factory @@ -137,14 +142,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) f.logger = logger - primarySession, err := f.primaryConfig.NewSession() + primarySession, err := f.sessionBuilderFn(&f.primaryConfig) if err != nil { return err } f.primarySession = primarySession if f.archiveConfig != nil { - archiveSession, err := f.archiveConfig.NewSession() + archiveSession, err := f.sessionBuilderFn(f.archiveConfig) if err != nil { return err } @@ -155,6 +160,48 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) return nil } +// createSession creates session from a configuration +func createSession(c *config.Configuration) (cassandra.Session, error) { + cluster, err := c.NewCluster() + if err != nil { + return nil, err + } + + session, err := cluster.CreateSession() + if err != nil { + return nil, err + } + + return gocqlw.WrapCQLSession(session), nil +} + +// newSessionPrerequisites creates tables and types before creating a session +func newSessionPrerequisites(c *config.Configuration) error { + if !c.Schema.CreateSchema { + return nil + } + + cfg := *c // clone because we need to connect without specifying a keyspace + cfg.Schema.Keyspace = "" + + session, err := createSession(&cfg) + if err != nil { + return err + } + + sc := schema.NewSchemaCreator(session, c.Schema) + return sc.CreateSchemaIfNotPresent() +} + +// NewSession creates a new Cassandra session +func NewSession(c *config.Configuration) (cassandra.Session, error) { + if err := newSessionPrerequisites(c); err != nil { + return nil, err + } + + return createSession(c) +} + // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 9a2fcc486cf..d8800089a75 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -12,43 +12,48 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/cassandra" - cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" + "github.com/jaegertracing/jaeger/pkg/cassandra/config" "github.com/jaegertracing/jaeger/pkg/cassandra/mocks" - "github.com/jaegertracing/jaeger/pkg/config" + viperize "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" ) type mockSessionBuilder struct { - session *mocks.Session - err error + index int + sessions []*mocks.Session + errors []error } -func newMockSessionBuilder(session *mocks.Session, err error) *mockSessionBuilder { - return &mockSessionBuilder{ - session: session, - err: err, - } +func (m *mockSessionBuilder) add(session *mocks.Session, err error) *mockSessionBuilder { + m.sessions = append(m.sessions, session) + m.errors = append(m.errors, err) + return m } -func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { - return m.session, m.err +func (m *mockSessionBuilder) build(*config.Configuration) (cassandra.Session, error) { + if m.index >= len(m.sessions) { + return nil, errors.New("no more sessions") + } + session := m.sessions[m.index] + err := m.errors[m.index] + m.index++ + return session, err } func TestCassandraFactory(t *testing.T) { logger, logBuf := testutils.NewLogger() f := NewFactory() - v, command := config.Viperize(f.AddFlags) + v, command := viperize.Viperize(f.AddFlags) command.ParseFlags([]string{"--cassandra-archive.enabled=true"}) f.InitFromViper(v, zap.NewNop()) - // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, - // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") + f.sessionBuilderFn = new(mockSessionBuilder).add(nil, errors.New("made-up primary error")).build + require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up primary error") var ( session = &mocks.Session{} @@ -57,11 +62,13 @@ func TestCassandraFactory(t *testing.T) { session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) session.On("Close").Return() query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") + f.sessionBuilderFn = new(mockSessionBuilder). + add(session, nil). + add(nil, errors.New("made-up archive error")).build + require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up archive error") f.archiveConfig = nil + f.sessionBuilderFn = new(mockSessionBuilder).add(session, nil).build require.NoError(t, f.Initialize(metrics.NullFactory, logger)) assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") @@ -80,7 +87,8 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() require.EqualError(t, err, "archive storage not configured") - f.archiveConfig = newMockSessionBuilder(session, nil) + f.archiveConfig = &config.Configuration{} + f.sessionBuilderFn = new(mockSessionBuilder).add(session, nil).add(session, nil).build require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err = f.CreateArchiveSpanReader() @@ -99,9 +107,8 @@ func TestCassandraFactory(t *testing.T) { } func TestExclusiveWhitelistBlacklist(t *testing.T) { - logger, logBuf := testutils.NewLogger() f := NewFactory() - v, command := config.Viperize(f.AddFlags) + v, command := viperize.Viperize(f.AddFlags) command.ParseFlags([]string{ "--cassandra-archive.enabled=true", "--cassandra.index.tag-whitelist=a,b,c", @@ -109,29 +116,19 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { }) f.InitFromViper(v, zap.NewNop()) - // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, - // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - var ( session = &mocks.Session{} query = &mocks.Query{} ) session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - - f.archiveConfig = nil - require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") + f.sessionBuilderFn = new(mockSessionBuilder).add(session, nil).build _, err := f.CreateSpanWriter() require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") - f.archiveConfig = &mockSessionBuilder{} + f.archiveConfig = &config.Configuration{} + f.sessionBuilderFn = new(mockSessionBuilder).add(session, nil).add(session, nil).build require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err = f.CreateArchiveSpanWriter() @@ -140,7 +137,7 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { func TestWriterOptions(t *testing.T) { opts := NewOptions("cassandra") - v, command := config.Viperize(opts.AddFlags) + v, command := viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{"--cassandra.index.tag-whitelist=a,b,c"}) opts.InitFromViper(v) @@ -148,7 +145,7 @@ func TestWriterOptions(t *testing.T) { assert.Len(t, options, 1) opts = NewOptions("cassandra") - v, command = config.Viperize(opts.AddFlags) + v, command = viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{"--cassandra.index.tag-blacklist=a,b,c"}) opts.InitFromViper(v) @@ -156,7 +153,7 @@ func TestWriterOptions(t *testing.T) { assert.Len(t, options, 1) opts = NewOptions("cassandra") - v, command = config.Viperize(opts.AddFlags) + v, command = viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{"--cassandra.index.tags=false"}) opts.InitFromViper(v) @@ -164,7 +161,7 @@ func TestWriterOptions(t *testing.T) { assert.Len(t, options, 1) opts = NewOptions("cassandra") - v, command = config.Viperize(opts.AddFlags) + v, command = viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{"--cassandra.index.tags=false", "--cassandra.index.tag-blacklist=a,b,c"}) opts.InitFromViper(v) @@ -172,7 +169,7 @@ func TestWriterOptions(t *testing.T) { assert.Len(t, options, 1) opts = NewOptions("cassandra") - v, command = config.Viperize(opts.AddFlags) + v, command = viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{""}) opts.InitFromViper(v) @@ -194,11 +191,7 @@ func TestNewFactoryWithConfig(t *testing.T) { t.Run("valid configuration", func(t *testing.T) { opts := &Options{ Primary: NamespaceConfig{ - Configuration: cassandraCfg.Configuration{ - Connection: cassandraCfg.Connection{ - Servers: []string{"localhost:9200"}, - }, - }, + Configuration: config.DefaultConfiguration(), }, } f := NewFactory() @@ -216,11 +209,7 @@ func TestNewFactoryWithConfig(t *testing.T) { expErr := errors.New("made-up error") opts := &Options{ Primary: NamespaceConfig{ - Configuration: cassandraCfg.Configuration{ - Connection: cassandraCfg.Connection{ - Servers: []string{"localhost:9200"}, - }, - }, + Configuration: config.DefaultConfiguration(), }, } f := NewFactory() @@ -257,3 +246,33 @@ func TestFactory_Purge(t *testing.T) { session.AssertCalled(t, "Query", mock.AnythingOfType("string"), mock.Anything) query.AssertCalled(t, "Exec") } + +func TestNewSessionErrors(t *testing.T) { + t.Run("NewCluster error", func(t *testing.T) { + cfg := &config.Configuration{ + Connection: config.Connection{ + TLS: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "foobar", + }, + }, + }, + } + _, err := NewSession(cfg) + require.ErrorContains(t, err, "failed to load TLS config") + }) + t.Run("CreateSession error", func(t *testing.T) { + cfg := &config.Configuration{} + _, err := NewSession(cfg) + require.ErrorContains(t, err, "no hosts provided") + }) + t.Run("CreateSession error with schema", func(t *testing.T) { + cfg := &config.Configuration{ + Schema: config.Schema{ + CreateSchema: true, + }, + } + _, err := NewSession(cfg) + require.ErrorContains(t, err, "no hosts provided") + }) +} diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index e266436ba29..505d99c1ef3 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -256,8 +256,8 @@ func (cfg *NamespaceConfig) initFromViper(v *viper.Viper) { } // GetPrimary returns primary configuration. -func (opt *Options) GetPrimary() *config.Configuration { - return &opt.Primary.Configuration +func (opt *Options) GetPrimary() config.Configuration { + return opt.Primary.Configuration } // Get returns auxiliary named configuration. diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index d2190f5511e..d6c2b1f3fa1 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -14,6 +14,7 @@ import ( cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -36,7 +37,7 @@ func main() { Timeout: time.Millisecond * 750, }, } - cqlSession, err := cConfig.NewSession() + cqlSession, err := cassandra.NewSession(cConfig) if err != nil { logger.Fatal("Cannot create Cassandra session", zap.Error(err)) } diff --git a/plugin/storage/cassandra/schema/package_test.go b/plugin/storage/cassandra/schema/package_test.go new file mode 100644 index 00000000000..5c3adb21543 --- /dev/null +++ b/plugin/storage/cassandra/schema/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package schema + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/plugin/storage/cassandra/schema/schema.go b/plugin/storage/cassandra/schema/schema.go new file mode 100644 index 00000000000..25aa172355f --- /dev/null +++ b/plugin/storage/cassandra/schema/schema.go @@ -0,0 +1,154 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package schema + +import ( + "bytes" + "embed" + "errors" + "fmt" + "text/template" + "time" + + "github.com/jaegertracing/jaeger/pkg/cassandra" + "github.com/jaegertracing/jaeger/pkg/cassandra/config" +) + +//go:embed v004-go-tmpl.cql.tmpl +var schemaFile embed.FS + +type templateParams struct { + // Keyspace in which tables and types will be created for storage + Keyspace string + // Replication is the replication strategy used. Ex: "{'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }" + Replication string + // CompactionWindowInMinutes is constructed from CompactionWindow for using in template + CompactionWindowInMinutes int64 + // TraceTTLInSeconds is constructed from TraceTTL for using in template + TraceTTLInSeconds int64 + // DependenciesTTLInSeconds is constructed from DependenciesTTL for using in template + DependenciesTTLInSeconds int64 +} + +type Creator struct { + session cassandra.Session + schema config.Schema +} + +// NewSchemaCreator returns a new SchemaCreator +func NewSchemaCreator(session cassandra.Session, schema config.Schema) *Creator { + return &Creator{ + session: session, + schema: schema, + } +} + +func (sc *Creator) constructTemplateParams() templateParams { + return templateParams{ + Keyspace: sc.schema.Keyspace, + Replication: fmt.Sprintf("{'class': 'NetworkTopologyStrategy', 'replication_factor': '%v' }", sc.schema.ReplicationFactor), + CompactionWindowInMinutes: int64(sc.schema.CompactionWindow / time.Minute), + TraceTTLInSeconds: int64(sc.schema.TraceTTL / time.Second), + DependenciesTTLInSeconds: int64(sc.schema.DependenciesTTL / time.Second), + } +} + +func (*Creator) getQueryFileAsBytes(fileName string, params templateParams) ([]byte, error) { + tmpl, err := template.ParseFS(schemaFile, fileName) + if err != nil { + return nil, err + } + + var result bytes.Buffer + err = tmpl.Execute(&result, params) + if err != nil { + return nil, err + } + + return result.Bytes(), nil +} + +func (*Creator) getQueriesFromBytes(queryFile []byte) ([]string, error) { + lines := bytes.Split(queryFile, []byte("\n")) + + var extractedLines [][]byte + + for _, line := range lines { + // Remove any comments, if at the end of the line + commentIndex := bytes.Index(line, []byte(`--`)) + if commentIndex != -1 { + // remove everything after comment + line = line[0:commentIndex] + } + + trimmedLine := bytes.TrimSpace(line) + + if len(trimmedLine) == 0 { + continue + } + + extractedLines = append(extractedLines, trimmedLine) + } + + var queries []string + + // Construct individual queries strings + var queryString string + for _, line := range extractedLines { + queryString += string(line) + "\n" + if bytes.HasSuffix(line, []byte(";")) { + queries = append(queries, queryString) + queryString = "" + } + } + + if len(queryString) > 0 { + return nil, errors.New(`query exists in template without ";"`) + } + + return queries, nil +} + +func (sc *Creator) getCassandraQueriesFromQueryStrings(queries []string) []cassandra.Query { + var casQueries []cassandra.Query + + for _, query := range queries { + casQueries = append(casQueries, sc.session.Query(query)) + } + + return casQueries +} + +func (sc *Creator) contructSchemaQueries() ([]cassandra.Query, error) { + params := sc.constructTemplateParams() + + queryFile, err := sc.getQueryFileAsBytes(`v004-go-tmpl.cql.tmpl`, params) + if err != nil { + return nil, err + } + + queryStrings, err := sc.getQueriesFromBytes(queryFile) + if err != nil { + return nil, err + } + + casQueries := sc.getCassandraQueriesFromQueryStrings(queryStrings) + + return casQueries, nil +} + +func (sc *Creator) CreateSchemaIfNotPresent() error { + casQueries, err := sc.contructSchemaQueries() + if err != nil { + return err + } + + for _, query := range casQueries { + if err := query.Exec(); err != nil { + return err + } + } + + return nil +} diff --git a/plugin/storage/cassandra/schema/schema_test.go b/plugin/storage/cassandra/schema/schema_test.go new file mode 100644 index 00000000000..39e69b89916 --- /dev/null +++ b/plugin/storage/cassandra/schema/schema_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package schema + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueryGenerationFromBytes(t *testing.T) { + queriesAsString := ` +query1 -- comment (this should be removed) +query1-continue +query1-finished; -- + + +query2; +query-3 query-3-continue query-3-finished; +` + expGeneratedQueries := []string{ + `query1 +query1-continue +query1-finished; +`, + `query2; +`, + `query-3 query-3-continue query-3-finished; +`, + } + + sc := Creator{} + queriesAsBytes := []byte(queriesAsString) + queries, err := sc.getQueriesFromBytes(queriesAsBytes) + require.NoError(t, err) + + require.Equal(t, len(expGeneratedQueries), len(queries)) + + for i := range len(expGeneratedQueries) { + require.Equal(t, expGeneratedQueries[i], queries[i]) + } +} + +func TestInvalidQueryTemplate(t *testing.T) { + queriesAsString := ` + query1 -- comment (this should be removed) + query1-continue + query1-finished; -- + + + query2; + query-3 query-3-continue query-3-finished -- missing semicolon + ` + sc := Creator{} + queriesAsBytes := []byte(queriesAsString) + _, err := sc.getQueriesFromBytes(queriesAsBytes) + require.Error(t, err) +} diff --git a/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl b/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl new file mode 100644 index 00000000000..41ea7494274 --- /dev/null +++ b/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl @@ -0,0 +1,200 @@ +CREATE KEYSPACE IF NOT EXISTS {{.Keyspace}} WITH replication = {{.Replication}}; + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.log ( + ts bigint, -- microseconds since epoch + fields frozen>> +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.process ( + service_name text, + tags frozen>> +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, -- microseconds since epoch + duration bigint, -- microseconds + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '{{.CompactionWindowInMinutes}}', + 'compaction_window_unit': 'MINUTES', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_name_index ( + service_name text, + bucket int, + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.duration_index ( + service_name text, -- service name + operation_name text, -- operation name, or blank for queries without span name + bucket timestamp, -- time bucket, - the start_time of the given span rounded to an hour + duration bigint, -- span duration, in microseconds + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, -- microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.dependency ( + parent text, + child text, + call_count bigint, + source text +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.DependenciesTTLInSeconds}}; + +-- adaptive sampling tables +-- ./plugin/storage/cassandra/samplingstore/storage.go +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.operation_throughput ( + bucket int, + ts timeuuid, + throughput text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.sampling_probabilities ( + bucket int, + ts timeuuid, + hostname text, + probabilities text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +-- distributed lock +-- ./plugin/pkg/distributedlock/cassandra/lock.go +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.leases ( + name text, + owner text, + PRIMARY KEY (name) +); \ No newline at end of file diff --git a/scripts/cassandra-integration-test.sh b/scripts/cassandra-integration-test.sh index 9f5e295bc3c..3789017d419 100755 --- a/scripts/cassandra-integration-test.sh +++ b/scripts/cassandra-integration-test.sh @@ -11,6 +11,9 @@ success="false" timeout=600 end_time=$((SECONDS + timeout)) +SKIP_APPLY_SCHEMA=${SKIP_APPLY_SCHEMA:-"false"} +export CASSANDRA_CREATE_SCHEMA=${SKIP_APPLY_SCHEMA} + usage() { echo $"Usage: $0 " exit 1 @@ -98,8 +101,10 @@ run_integration_test() { healthcheck_cassandra "${major_version}" - apply_schema "$schema_version" "$primaryKeyspace" - apply_schema "$schema_version" "$archiveKeyspace" + if [ "${SKIP_APPLY_SCHEMA}" = "false" ]; then + apply_schema "$schema_version" "$primaryKeyspace" + apply_schema "$schema_version" "$archiveKeyspace" + fi if [ "${jaegerVersion}" = "v1" ]; then STORAGE=cassandra make storage-integration-test