Skip to content

Commit

Permalink
Merge branch 'main' into storage-factory-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Nov 30, 2024
2 parents 6d9e80b + e9fac05 commit d0319f1
Show file tree
Hide file tree
Showing 13 changed files with 662 additions and 90 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/ci-e2e-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ jobs:
fail-fast: false
matrix:
jaeger-version: [v1, v2]
create-schema: [manual, auto]
version:
- distribution: cassandra
major: 4.x
schema: v004
- distribution: cassandra
major: 5.x
schema: v004
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }}
exclude:
# Exclude v1 as create schema on fly is available for v2 only
- jaeger-version: v1
create-schema: auto
name: ${{ matrix.version.distribution }}-${{ matrix.version.major }} ${{ matrix.jaeger-version }} schema=${{ matrix.create-schema }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
Expand All @@ -45,9 +50,11 @@ jobs:
- name: Run cassandra integration tests
id: test-execution
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.major }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }}
env:
SKIP_APPLY_SCHEMA: ${{ matrix.create-schema == 'auto' && true || false }}

- name: Upload coverage to codecov
uses: ./.github/actions/upload-codecov
with:
files: cover.out
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}-${{ matrix.create-schema }}
4 changes: 3 additions & 1 deletion cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
create: "${env:CASSANDRA_CREATE_SCHEMA:-true}"
connection:
auth:
basic:
Expand All @@ -44,7 +45,8 @@ extensions:
another_storage:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
keyspace: "jaeger_v1_dc1_archive"
create: "${env:CASSANDRA_CREATE_SCHEMA:-true}"
connection:
auth:
basic:
Expand Down
86 changes: 63 additions & 23 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ package config

import (
"context"
"errors"
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/cassandra"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
)

// Configuration describes the configuration properties needed to connect to a Cassandra cluster.
Expand Down Expand Up @@ -58,6 +56,19 @@ type Schema struct {
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
// CreateSchema tells if the schema ahould be created during session initialization based on the configs provided
CreateSchema bool `mapstructure:"create" valid:"optional"`
// Datacenter is the name for network topology
Datacenter string `mapstructure:"datacenter" valid:"optional"`
// TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second
TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional"`
// DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second
DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional"`
// Replication factor for the db
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
// CompactionWindow is the size of the window for TimeWindowCompactionStrategy.
// All SSTables within that window are grouped together into one SSTable.
CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional"`
}

type Query struct {
Expand Down Expand Up @@ -86,7 +97,13 @@ type BasicAuthenticator struct {
func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
CreateSchema: false,
Keyspace: "jaeger_dc1",
Datacenter: "dc1",
TraceTTL: 2 * 24 * time.Hour,
DependenciesTTL: 2 * 24 * time.Hour,
ReplicationFactor: 1,
CompactionWindow: time.Minute,
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Expand All @@ -106,6 +123,27 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace
}

if c.Schema.Datacenter == "" {
c.Schema.Datacenter = source.Schema.Datacenter
}

if c.Schema.TraceTTL == 0 {
c.Schema.TraceTTL = source.Schema.TraceTTL
}

if c.Schema.DependenciesTTL == 0 {
c.Schema.DependenciesTTL = source.Schema.DependenciesTTL
}

if c.Schema.ReplicationFactor == 0 {
c.Schema.ReplicationFactor = source.Schema.ReplicationFactor
}

if c.Schema.CompactionWindow == 0 {
c.Schema.CompactionWindow = source.Schema.CompactionWindow
}

if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
Expand All @@ -129,24 +167,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// SessionBuilder creates new cassandra.Session
type SessionBuilder interface {
NewSession() (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}
session, err := cluster.CreateSession()
if err != nil {
return nil, err
}
return gocqlw.WrapCQLSession(session), nil
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
Expand Down Expand Up @@ -210,7 +230,27 @@ func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}

func isValidTTL(duration time.Duration) bool {
return duration == 0 || duration >= time.Second
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
if err != nil {
return err
}

if !isValidTTL(c.Schema.TraceTTL) {
return errors.New("trace_ttl can either be 0 or greater than or equal to 1 second")
}

if !isValidTTL(c.Schema.DependenciesTTL) {
return errors.New("dependencies_ttl can either be 0 or greater than or equal to 1 second")
}

if c.Schema.CompactionWindow < time.Minute {
return errors.New("compaction_window should at least be 1 minute")
}

return nil
}
24 changes: 24 additions & 0 deletions pkg/cassandra/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config

import (
"testing"
"time"

"github.com/gocql/gocql"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -43,6 +44,9 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) {
Connection: Connection{
Servers: []string{"localhost:9200"},
},
Schema: Schema{
CompactionWindow: time.Minute,
},
}

err := cfg.Validate()
Expand Down Expand Up @@ -94,3 +98,23 @@ func TestToString(t *testing.T) {
s := cfg.String()
assert.Contains(t, s, "Keyspace:test")
}

func TestConfigSchemaValidation(t *testing.T) {
cfg := DefaultConfiguration()
err := cfg.Validate()
require.NoError(t, err)

cfg.Schema.TraceTTL = time.Millisecond
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.TraceTTL = time.Second
cfg.Schema.CompactionWindow = time.Minute - 1
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.CompactionWindow = time.Minute
cfg.Schema.DependenciesTTL = time.Second - 1
err = cfg.Validate()
require.Error(t, err)
}
65 changes: 56 additions & 9 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (

"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -57,17 +59,22 @@ type Factory struct {
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primaryConfig config.Configuration
archiveConfig *config.Configuration

primarySession cassandra.Session
archiveConfig config.SessionBuilder
archiveSession cassandra.Session

// tests can override this
sessionBuilderFn func(*config.Configuration) (cassandra.Session, error)
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
sessionBuilderFn: NewSession,
}
}

Expand Down Expand Up @@ -128,9 +135,7 @@ func (f *Factory) configureFromOptions(o *Options) {
o.others = make(map[string]*NamespaceConfig)
}
f.primaryConfig = o.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
}
f.archiveConfig = f.Options.Get(archiveStorageConfig)
}

// Initialize implements storage.Factory
Expand All @@ -152,14 +157,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
)
f.logger = logger

primarySession, err := f.primaryConfig.NewSession()
primarySession, err := f.sessionBuilderFn(&f.primaryConfig)
if err != nil {
return err
}
f.primarySession = primarySession

if f.archiveConfig != nil {
archiveSession, err := f.archiveConfig.NewSession()
archiveSession, err := f.sessionBuilderFn(f.archiveConfig)
if err != nil {
return err
}
Expand All @@ -170,6 +175,48 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return nil
}

// createSession creates session from a configuration
func createSession(c *config.Configuration) (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

return gocqlw.WrapCQLSession(session), nil
}

// newSessionPrerequisites creates tables and types before creating a session
func newSessionPrerequisites(c *config.Configuration) error {
if !c.Schema.CreateSchema {
return nil
}

cfg := *c // clone because we need to connect without specifying a keyspace
cfg.Schema.Keyspace = ""

session, err := createSession(&cfg)
if err != nil {
return err
}

sc := schema.NewSchemaCreator(session, c.Schema)
return sc.CreateSchemaIfNotPresent()
}

// NewSession creates a new Cassandra session
func NewSession(c *config.Configuration) (cassandra.Session, error) {
if err := newSessionPrerequisites(c); err != nil {
return nil, err
}

return createSession(c)
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
sr, err := cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
Expand Down
Loading

0 comments on commit d0319f1

Please sign in to comment.