Skip to content

Commit

Permalink
refactor/cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
swi-jared committed May 3, 2024
1 parent 6476fba commit b317d42
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 114 deletions.
134 changes: 24 additions & 110 deletions internal/oboe/oboe.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,17 @@ type Oboe interface {

func NewOboe() Oboe {
return &oboe{
cfg: &oboeSettingsCfg{
settings: make(map[settingKey]*settings),
},
settings: make(map[settingKey]*settings),
}
}

type oboe struct {
cfg *oboeSettingsCfg
sync.RWMutex
settings map[settingKey]*settings
}

var _ Oboe = &oboe{}

// Current settings configuration
type oboeSettingsCfg struct {
settings map[settingKey]*settings
lock sync.RWMutex
}

// FlushRateCounts collects the request counters values by categories.
func (o *oboe) FlushRateCounts() map[string]*metrics.RateCounts {
setting, ok := o.GetSetting()
Expand Down Expand Up @@ -213,69 +206,11 @@ func floatToStr(f float64) string {
return strconv.FormatFloat(f, 'f', -1, 64)
}

type TriggerTraceMode int

const (
// ModeTriggerTraceNotPresent means there is no X-Trace-Options header detected,
// or the X-Trace-Options header is present but trigger_trace flag is not. This
// indicates that it's a trace for regular sampling.
ModeTriggerTraceNotPresent TriggerTraceMode = iota

// ModeInvalidTriggerTrace means X-Trace-Options is detected but no valid trigger-trace
// flag found, or X-Trace-Options-Signature is present but the authentication is failed.
ModeInvalidTriggerTrace

// ModeRelaxedTriggerTrace means X-Trace-Options-Signature is present and valid.
// The trace will be sampled/limited by the relaxed token bucket.
ModeRelaxedTriggerTrace

// ModeStrictTriggerTrace means no X-Trace-Options-Signature is present. The trace
// will be limited by the strict token bucket.
ModeStrictTriggerTrace
)

// Trigger trace response messages
const (
ttOK = "ok"
ttRateExceeded = "rate-exceeded"
ttTracingDisabled = "tracing-disabled"
ttTriggerTracingDisabled = "trigger-tracing-disabled"
ttNotRequested = "not-requested"
ttIgnored = "ignored"
ttSettingsNotAvailable = "settings-not-available"
ttEmpty = ""
)

// Enabled indicates whether it's a trigger-trace request
func (tm TriggerTraceMode) Enabled() bool {
switch tm {
case ModeTriggerTraceNotPresent, ModeInvalidTriggerTrace:
return false
case ModeRelaxedTriggerTrace, ModeStrictTriggerTrace:
return true
default:
panic(fmt.Sprintf("Unhandled trigger trace mode: %x", tm))
}
}

// Requested indicates whether the user tries to issue a trigger-trace request
// (but may be rejected if the header is illegal)
func (tm TriggerTraceMode) Requested() bool {
switch tm {
case ModeTriggerTraceNotPresent:
return false
case ModeRelaxedTriggerTrace, ModeStrictTriggerTrace, ModeInvalidTriggerTrace:
return true
default:
panic(fmt.Sprintf("Unhandled trigger trace mode: %x", tm))
}
}

func (o *oboe) SampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision {
diceRolled := false
setting, ok := o.GetSetting()
if !ok {
return SampleDecision{false, 0, SAMPLE_SOURCE_NONE, false, ttSettingsNotAvailable, 0, 0, diceRolled}
return SampleDecision{false, 0, SAMPLE_SOURCE_NONE, false, TtSettingsNotAvailable, 0, 0, diceRolled}
}

retval := false
Expand All @@ -293,21 +228,21 @@ func (o *oboe) SampleRequest(continued bool, url string, triggerTrace TriggerTra

if triggerTrace.Requested() && !continued {
sampled := (triggerTrace != ModeInvalidTriggerTrace) && (flags.TriggerTraceEnabled())
rsp := ttOK
rsp := TtOK

ret := bucket.count(sampled, false, true)

if flags.TriggerTraceEnabled() && triggerTrace.Enabled() {
if !ret {
rsp = ttRateExceeded
rsp = TtRateExceeded
}
} else if triggerTrace == ModeInvalidTriggerTrace {
rsp = ""
} else {
if !flags.Enabled() {
rsp = ttTracingDisabled
rsp = TtTracingDisabled
} else {
rsp = ttTriggerTracingDisabled
rsp = TtTriggerTracingDisabled
}
}
ttCap, ttRate := setting.getTokenBucketSetting(triggerTrace)
Expand Down Expand Up @@ -344,9 +279,9 @@ func (o *oboe) SampleRequest(continued bool, url string, triggerTrace TriggerTra

retval = bucket.count(retval, continued, doRateLimiting)

rsp := ttNotRequested
rsp := TtNotRequested
if triggerTrace.Requested() {
rsp = ttIgnored
rsp = TtIgnored
}

var bucketCap, bucketRate float64
Expand Down Expand Up @@ -375,13 +310,6 @@ func bytesToFloat64(b []byte) (float64, error) {
return math.Float64frombits(binary.LittleEndian.Uint64(b)), nil
}

func bytesToInt32(b []byte) (int32, error) {
if len(b) != 4 {
return -1, fmt.Errorf("invalid length: %d", len(b))
}
return int32(binary.LittleEndian.Uint32(b)), nil
}

func parseFloat64(args map[string][]byte, key string, fb float64) float64 {
ret := fb
if c, ok := args[key]; ok {
Expand All @@ -396,20 +324,6 @@ func parseFloat64(args map[string][]byte, key string, fb float64) float64 {
return ret
}

func ParseInt32(args map[string][]byte, key string, fb int32) int32 {
ret := fb
if c, ok := args[key]; ok {
v, err := bytesToInt32(c)
if err == nil && v >= 0 {
ret = v
log.Debugf("parsed %s=%d", key, v)
} else {
log.Warningf("parse error: %s=%d err=%v fallback=%d", key, v, err, fb)
}
}
return ret
}

func adjustSampleRate(rate int64) int {
if rate < 0 {
log.Debugf("Invalid sample rate: %d", rate)
Expand Down Expand Up @@ -455,21 +369,21 @@ func (o *oboe) UpdateSetting(sType int32, layer string, flags []byte, value int6
layer: layer,
}

o.cfg.lock.Lock()
o.cfg.settings[key] = merged
o.cfg.lock.Unlock()
o.Lock()
o.settings[key] = merged
o.Unlock()
}

// CheckSettingsTimeout checks and deletes expired settings
func (o *oboe) CheckSettingsTimeout() {
o.cfg.checkSettingsTimeout()
o.checkSettingsTimeout()
}

func (sc *oboeSettingsCfg) checkSettingsTimeout() {
sc.lock.Lock()
defer sc.lock.Unlock()
func (o *oboe) checkSettingsTimeout() {
o.Lock()
defer o.Unlock()

ss := sc.settings
ss := o.settings
for k, s := range ss {
e := s.timestamp.Add(time.Duration(s.ttl) * time.Second)
if e.Before(time.Now()) {
Expand All @@ -479,31 +393,31 @@ func (sc *oboeSettingsCfg) checkSettingsTimeout() {
}

func (o *oboe) GetSetting() (*settings, bool) {
o.cfg.lock.RLock()
defer o.cfg.lock.RUnlock()
o.RLock()
defer o.RUnlock()

// for now only look up the default settings
key := settingKey{
sType: TYPE_DEFAULT,
layer: "",
}
if setting, ok := o.cfg.settings[key]; ok {
if setting, ok := o.settings[key]; ok {
return setting, true
}

return nil, false
}

func (o *oboe) RemoveSetting() {
o.cfg.lock.Lock()
defer o.cfg.lock.Unlock()
o.Lock()
defer o.Unlock()

key := settingKey{
sType: TYPE_DEFAULT,
layer: "",
}

delete(o.cfg.settings, key)
delete(o.settings, key)
}

func (o *oboe) HasDefaultSetting() bool {
Expand Down
74 changes: 74 additions & 0 deletions internal/oboe/trigger_trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// © 2023 SolarWinds Worldwide, LLC. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package oboe

import "fmt"

// Trigger trace response messages
const (
TtOK = "ok"
TtRateExceeded = "rate-exceeded"
TtTracingDisabled = "tracing-disabled"
TtTriggerTracingDisabled = "trigger-tracing-disabled"
TtNotRequested = "not-requested"
TtIgnored = "ignored"
TtSettingsNotAvailable = "settings-not-available"
)

type TriggerTraceMode int

const (
// ModeTriggerTraceNotPresent means there is no X-Trace-Options header detected,
// or the X-Trace-Options header is present but trigger_trace flag is not. This
// indicates that it's a trace for regular sampling.
ModeTriggerTraceNotPresent TriggerTraceMode = iota

// ModeInvalidTriggerTrace means X-Trace-Options is detected but no valid trigger-trace
// flag found, or X-Trace-Options-Signature is present but the authentication is failed.
ModeInvalidTriggerTrace

// ModeRelaxedTriggerTrace means X-Trace-Options-Signature is present and valid.
// The trace will be sampled/limited by the relaxed token bucket.
ModeRelaxedTriggerTrace

// ModeStrictTriggerTrace means no X-Trace-Options-Signature is present. The trace
// will be limited by the strict token bucket.
ModeStrictTriggerTrace
)

// Enabled indicates whether it's a trigger-trace request
func (tm TriggerTraceMode) Enabled() bool {
switch tm {
case ModeTriggerTraceNotPresent, ModeInvalidTriggerTrace:
return false
case ModeRelaxedTriggerTrace, ModeStrictTriggerTrace:
return true
default:
panic(fmt.Sprintf("Unhandled trigger trace mode: %x", tm))
}
}

// Requested indicates whether the user tries to issue a trigger-trace request
// (but may be rejected if the header is illegal)
func (tm TriggerTraceMode) Requested() bool {
switch tm {
case ModeTriggerTraceNotPresent:
return false
case ModeRelaxedTriggerTrace, ModeStrictTriggerTrace, ModeInvalidTriggerTrace:
return true
default:
panic(fmt.Sprintf("Unhandled trigger trace mode: %x", tm))
}
}
30 changes: 26 additions & 4 deletions internal/reporter/reporter_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/binary"
"fmt"
"github.com/google/uuid"
"github.com/solarwinds/apm-go/internal/config"
Expand Down Expand Up @@ -888,19 +889,19 @@ func (r *grpcReporter) updateSettings(settings *collector.SettingsResult) {
r.oboe.UpdateSetting(int32(s.Type), string(s.Layer), s.Flags, s.Value, s.Ttl, s.Arguments)

// update MetricsFlushInterval
mi := oboe.ParseInt32(s.Arguments, constants.KvMetricsFlushInterval, r.collectMetricInterval)
mi := ParseInt32(s.Arguments, constants.KvMetricsFlushInterval, r.collectMetricInterval)
atomic.StoreInt32(&r.collectMetricInterval, mi)

// update events flush interval
o := config.ReporterOpts()
ei := oboe.ParseInt32(s.Arguments, constants.KvEventsFlushInterval, int32(o.GetEventFlushInterval()))
ei := ParseInt32(s.Arguments, constants.KvEventsFlushInterval, int32(o.GetEventFlushInterval()))
o.SetEventFlushInterval(int64(ei))

// update MaxTransactions
mt := oboe.ParseInt32(s.Arguments, constants.KvMaxTransactions, r.registry.ApmMetricsCap())
mt := ParseInt32(s.Arguments, constants.KvMaxTransactions, r.registry.ApmMetricsCap())
r.registry.SetApmMetricsCap(mt)

maxCustomMetrics := oboe.ParseInt32(s.Arguments, constants.KvMaxCustomMetrics, r.registry.CustomMetricsCap())
maxCustomMetrics := ParseInt32(s.Arguments, constants.KvMaxCustomMetrics, r.registry.CustomMetricsCap())
r.registry.SetCustomMetricsCap(maxCustomMetrics)
}

Expand Down Expand Up @@ -1411,3 +1412,24 @@ func printRPCMsg(m Method) {
}
log.Debugf("%s", str)
}

func bytesToInt32(b []byte) (int32, error) {
if len(b) != 4 {
return -1, fmt.Errorf("invalid length: %d", len(b))
}
return int32(binary.LittleEndian.Uint32(b)), nil
}

func ParseInt32(args map[string][]byte, key string, fb int32) int32 {
ret := fb
if c, ok := args[key]; ok {
v, err := bytesToInt32(c)
if err == nil && v >= 0 {
ret = v
log.Debugf("parsed %s=%d", key, v)
} else {
log.Warningf("parse error: %s=%d err=%v fallback=%d", key, v, err, fb)
}
}
return ret
}

0 comments on commit b317d42

Please sign in to comment.