Skip to content

Commit

Permalink
WIP + mv SManager back else circular import
Browse files Browse the repository at this point in the history
  • Loading branch information
tammy-baylis-swi committed May 31, 2024
1 parent 7def873 commit f236826
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 113 deletions.
79 changes: 0 additions & 79 deletions internal/oboe/file_watcher.go

This file was deleted.

103 changes: 103 additions & 0 deletions internal/oboe/settings_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"

"github.com/solarwinds/apm-go/internal/utils"
collector "github.com/solarwinds/apm-proto/go/collectorpb"
)

type settingLambdaFromFile struct {
Expand Down Expand Up @@ -50,6 +51,12 @@ type settingLambdaNormalized struct {
value int64
ttl int64
args map[string][]byte
// Type int32
// Layer string
// Flags []byte
// Value int64
// Ttl int64
// Arguments map[string][]byte
}

// newSettingLambdaNormalized accepts settings in json-unmarshalled format
Expand Down Expand Up @@ -107,3 +114,99 @@ func newSettingLambdaFromFile() (*settingLambdaNormalized, error) {

return newSettingLambdaNormalized(&settingLambda), nil
}

// ================
// TODO Move/Repurpose everything below

// Similar to collector type OboeSetting
type OboeSettingLambda struct {
state string
sizeCache string
unknownFields string

Type int64
Flags []byte
Timestamp int64
Value int64
Layer []byte
Arguments map[string][]byte
Ttl int64
}

// newOboeSetting returns an OboeSetting struct
// without protoimpl fields set
func newOboeSetting(fromFile *settingLambdaFromFile) *OboeSettingLambda {
flags := []byte(fromFile.Flags)

var unusedToken = "TOKEN"
args := utils.ArgsToMap(
fromFile.Arguments.BucketCapacity,
fromFile.Arguments.BucketRate,
fromFile.Arguments.TriggerRelaxedBucketCapacity,
fromFile.Arguments.TriggerRelaxedBucketRate,
fromFile.Arguments.TriggerStrictBucketCapacity,
fromFile.Arguments.TriggerStrictBucketRate,
fromFile.Arguments.MetricsFlushInterval,
-1,
[]byte(unusedToken),
)

oset := &OboeSettingLambda{
"foo",
"foo",
"foo",
1, // always DEFAULT_SAMPLE_RATE
flags,
int64(fromFile.Timestamp),
int64(fromFile.Value),
[]byte(""), // not set since type is always DEFAULT_SAMPLE_RATE
args,
int64(fromFile.Ttl),
}

return oset
}

// newOboeSettingLambdaFromFile unmarshals sampling settings from a JSON file at a
// specific path in a specific format then returns OboeSetting for calling
// oboe UpdateSetting like grpcReporter does, else returns error.
func NewOboeSettingLambdaFromFile() (*OboeSettingLambda, error) {
settingFile, err := os.Open("/tmp/solarwinds-apm-settings.json")
if err != nil {
return nil, err
}
settingBytes, err := io.ReadAll(settingFile)
if err != nil {
return nil, err
}
// Settings file should be an array with a single settings object
var settingLambdas []settingLambdaFromFile
if err := json.Unmarshal(settingBytes, &settingLambdas); err != nil {
return nil, err
}
if len(settingLambdas) != 1 {
return nil, errors.New("settings file is incorrectly formatted")
}

settingLambda := settingLambdas[0]

return newOboeSetting(&settingLambda), nil
}

// TODO comment
func NewOboeSettingFromReporter(settings *collector.SettingsResult) *OboeSettingLambda {
// TODO have a check
s := settings.GetSettings()[0]
return &OboeSettingLambda{
"foo",
"foo",
"foo",
int64(s.Type),
s.Flags,
s.Timestamp,
s.Value,
s.Layer,
s.Arguments,
s.Ttl,
}
}
22 changes: 13 additions & 9 deletions internal/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/solarwinds/apm-go/internal/state"
"github.com/solarwinds/apm-go/internal/swotel/semconv"
"github.com/solarwinds/apm-go/internal/utils"
collector "github.com/solarwinds/apm-proto/go/collectorpb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
Expand All @@ -53,6 +54,8 @@ type Reporter interface {
// GetServiceName retrieves the current service name, preferring an otel `service.name` from resource attributes,
// falling back to the service name in the service key
GetServiceName() string

GetSettings() (*collector.SettingsResult, error)
}

var (
Expand All @@ -62,15 +65,16 @@ var (
// a noop reporter
type nullReporter struct{}

func newNullReporter() *nullReporter { return &nullReporter{} }
func (r *nullReporter) ReportEvent(Event) error { return nil }
func (r *nullReporter) ReportStatus(Event) error { return nil }
func (r *nullReporter) Shutdown(context.Context) error { return nil }
func (r *nullReporter) ShutdownNow() {}
func (r *nullReporter) Closed() bool { return true }
func (r *nullReporter) WaitForReady(context.Context) bool { return true }
func (r *nullReporter) SetServiceKey(string) error { return nil }
func (r *nullReporter) GetServiceName() string { return "" }
func newNullReporter() *nullReporter { return &nullReporter{} }
func (r *nullReporter) ReportEvent(Event) error { return nil }
func (r *nullReporter) ReportStatus(Event) error { return nil }
func (r *nullReporter) Shutdown(context.Context) error { return nil }
func (r *nullReporter) ShutdownNow() {}
func (r *nullReporter) Closed() bool { return true }
func (r *nullReporter) WaitForReady(context.Context) bool { return true }
func (r *nullReporter) SetServiceKey(string) error { return nil }
func (r *nullReporter) GetServiceName() string { return "" }
func (r *nullReporter) GetSettings() (*collector.SettingsResult, error) { return nil, nil }

func Start(rsrc *resource.Resource, registry interface{}, o oboe.Oboe) (Reporter, error) {
log.SetLevelFromStr(config.DebugLevel())
Expand Down
4 changes: 2 additions & 2 deletions internal/reporter/reporter_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ func (r *grpcReporter) getAndUpdateSettings(ready chan bool) {
// notify caller that this routine has terminated (defered to end of routine)
defer func() { ready <- true }()

remoteSettings, err := r.getSettings()
remoteSettings, err := r.GetSettings()
if err == nil {
r.updateSettings(remoteSettings)
} else if errors.Is(err, errInvalidServiceKey) {
Expand All @@ -839,7 +839,7 @@ func (r *grpcReporter) getAndUpdateSettings(ready chan bool) {
}

// retrieves settings from collector and returns them
func (r *grpcReporter) getSettings() (*collector.SettingsResult, error) {
func (r *grpcReporter) GetSettings() (*collector.SettingsResult, error) {
method := newGetSettingsMethod(r.serviceKey.Load())
if err := r.conn.InvokeRPC(r.done, method); err == nil {
logger := log.Info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package oboe
package settings

import (
"fmt"
Expand All @@ -34,11 +34,11 @@ type settingsManager struct {
settingsTimeoutCheckInterval int // check interval for timed out settings in seconds
o oboe.Oboe // instance of Oboe to directly UpdateSetting
// TODO make optional
r reporter.Reporter // instance of gRPCReporter for remote settings
// TODO optional fileBasedWatcher
r reporter.Reporter // instance of gRPCReporter for remote settings
isLambda bool // is this running in lambda
}

func NewSettingsManager(o *oboe.Oboe, r *reporter.Reporter) (*settingsManager, error) {
func NewSettingsManager(o *oboe.Oboe, r *reporter.Reporter, isLambda bool) (*settingsManager, error) {
// TODO make reporter optional
if o == nil || r == nil {
return nil, fmt.Errorf("oboe nor reporter must not be nil")
Expand All @@ -48,6 +48,7 @@ func NewSettingsManager(o *oboe.Oboe, r *reporter.Reporter) (*settingsManager, e
settingsTimeoutCheckInterval: grpcSettingsTimeoutCheckIntervalDefault,
o: *o,
r: *r,
isLambda: isLambda,
}, nil
}

Expand Down Expand Up @@ -106,28 +107,34 @@ func (sm *settingsManager) getAndUpdateSettings(ready chan bool) {
// notify caller that this routine has terminated (defered to end of routine)
defer func() { ready <- true }()

// TODO
// (A) Excise grpcReporter's call to oboe.UpdateSetting but keep LegacyRegistry updates.
// Also get grpcReporter to return collector settings to this Manager.
// or
// (B) Add new GetSettings interface to let grpcReporter keep all original behaviour

// TODO
// Or get settings from file instead of remote
settings, err := sm.getOboeSettings()
if err == nil {
sm.updateOboeSettings(settings)
} else {
log.Errorf("SettingsManager could not getAndUpdateSettings: %s", err)
}
}

// TODO
// updateOboeSettings with remote/file settings
// instead of foo string
sm.updateOboeSettings("foo")
// retrieves settings, normalizes, and returns them
func (sm *settingsManager) getOboeSettings() (*oboe.OboeSettingLambda, error) {
if sm.isLambda {
// Get oboe-style settings from file
ns, err := oboe.NewOboeSettingLambdaFromFile()
return ns, err
} else {
// Note: getting remote settings makes another InvokeRPC call, in addition
// to grpcReporter's invoke to update legacy registry for metrics
s, err := sm.r.GetSettings()
// Map apm-proto settings to oboe-style settings
ns := oboe.NewOboeSettingFromReporter(s)
return ns, err
}
}

// updates the existing settings with the newly received
// settings new settings
func (sm *settingsManager) updateOboeSettings(foo string) {
// TODO
// sm.o.UpdateSetting with remote/file settings
// instead of single foo string
log.Info("updateOboeSettings with ", foo)
// settings, oboe-style settings
func (sm *settingsManager) updateOboeSettings(s *oboe.OboeSettingLambda) {
sm.o.UpdateSetting(int32(s.Type), string(s.Layer), s.Flags, s.Value, s.Ttl, s.Arguments)
}

// delete settings that have timed out according to their TTL
Expand Down
7 changes: 6 additions & 1 deletion swo/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ func Start(resourceAttrs ...attribute.KeyValue) (func(), error) {
if err != nil {
return func() {}, err
}
sMan, err := settings.NewSettingsManager(&o, &_reporter)
// TODO settings if lambda vs not
sMan, err := settings.NewSettingsManager(
&o,
&_reporter,
true,
)
if err != nil {
return func() {}, err
}
Expand Down

0 comments on commit f236826

Please sign in to comment.