Skip to content

Commit

Permalink
Send app status related info only if initialized (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgalsaleh authored Oct 13, 2023
1 parent 4e3d3d0 commit 172b9c2
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 31 deletions.
5 changes: 2 additions & 3 deletions pkg/apiserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func bootstrap(params APIServerParams) error {
channelName = verifiedLicense.Spec.ChannelName
}

storeOptions := store.InitInMemoryStoreOptions{
store.InitInMemory(store.InitInMemoryStoreOptions{
License: verifiedLicense,
LicenseFields: params.LicenseFields,
AppName: params.AppName,
Expand All @@ -105,8 +105,7 @@ func bootstrap(params APIServerParams) error {
Namespace: params.Namespace,
ReplicatedID: replicatedID,
AppID: appID,
}
store.InitInMemory(storeOptions)
})

isIntegrationModeEnabled, err := integration.IsEnabled(params.Context, clientset, store.GetStore().GetNamespace(), store.GetStore().GetLicense())
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/appstate/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ func (o *Operator) ApplyAppInformers(args types.AppInformersArgs) {
if len(informers) == 0 {
// no informers, set state to ready and return
defaultReadyStatus := types.AppStatus{
AppSlug: appSlug,
UpdatedAt: time.Now(),
State: types.StateReady,
Sequence: sequence,
AppSlug: appSlug,
ResourceStates: types.ResourceStates{},
UpdatedAt: time.Now(),
State: types.StateReady,
Sequence: sequence,
}
if err := o.setAppStatus(defaultReadyStatus); err != nil {
log.Printf("error updating app status: %v", err)
Expand All @@ -125,7 +126,7 @@ func (o *Operator) setAppStatus(newAppStatus types.AppStatus) error {
store.GetStore().SetAppStatus(newAppStatus)

if newAppStatus.State != currentAppStatus.State {
log.Printf("app state changed from %s to %s", currentAppStatus.State, newAppStatus.State)
log.Printf("app state changed from %q to %q", currentAppStatus.State, newAppStatus.State)
go func() {
clientset, err := k8sutil.GetClientset()
if err != nil {
Expand Down
10 changes: 4 additions & 6 deletions pkg/heartbeat/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@ func SendAppHeartbeat(clientset kubernetes.Interface, sdkStore store.Store) erro

heartbeatInfo := GetHeartbeatInfo(sdkStore)

marshalledRS, err := json.Marshal(heartbeatInfo.ResourceStates)
if err != nil {
return errors.Wrap(err, "failed to marshal resource states")
}
reqPayload := map[string]interface{}{
"resource_states": string(marshalledRS),
// build the request body
reqPayload := map[string]interface{}{}
if err := InjectHeartbeatInfoPayload(reqPayload, heartbeatInfo); err != nil {
return errors.Wrap(err, "failed to inject heartbeat info payload")
}
reqBody, err := json.Marshal(reqPayload)
if err != nil {
Expand Down
39 changes: 38 additions & 1 deletion pkg/heartbeat/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package heartbeat

import (
"context"
"encoding/json"
"net/http"
"os"
"strconv"
Expand All @@ -15,6 +16,38 @@ import (
"k8s.io/client-go/kubernetes"
)

func InjectHeartbeatInfoPayload(reqPayload map[string]interface{}, heartbeatInfo *types.HeartbeatInfo) error {
payload, err := GetHeartbeatInfoPayload(heartbeatInfo)
if err != nil {
return errors.Wrap(err, "failed to get heartbeat info payload")
}

for key, value := range payload {
reqPayload[key] = value
}

return nil
}

func GetHeartbeatInfoPayload(heartbeatInfo *types.HeartbeatInfo) (map[string]interface{}, error) {
payload := make(map[string]interface{})

if heartbeatInfo == nil {
return payload, nil
}

// only include app status related information if it's been initialized
if heartbeatInfo.AppStatus != "" {
marshalledRS, err := json.Marshal(heartbeatInfo.ResourceStates)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal resource states")
}
payload["resource_states"] = string(marshalledRS)
}

return payload, nil
}

func InjectHeartbeatInfoHeaders(req *http.Request, heartbeatInfo *types.HeartbeatInfo) {
headers := GetHeartbeatInfoHeaders(heartbeatInfo)

Expand All @@ -31,10 +64,14 @@ func GetHeartbeatInfoHeaders(heartbeatInfo *types.HeartbeatInfo) map[string]stri
}

headers["X-Replicated-K8sVersion"] = heartbeatInfo.K8sVersion
headers["X-Replicated-AppStatus"] = heartbeatInfo.AppStatus
headers["X-Replicated-ClusterID"] = heartbeatInfo.ClusterID
headers["X-Replicated-InstanceID"] = heartbeatInfo.InstanceID

// only include app status related information if it's been initialized
if heartbeatInfo.AppStatus != "" {
headers["X-Replicated-AppStatus"] = heartbeatInfo.AppStatus
}

if heartbeatInfo.ChannelID != "" {
headers["X-Replicated-DownstreamChannelID"] = heartbeatInfo.ChannelID
} else if heartbeatInfo.ChannelName != "" {
Expand Down
60 changes: 59 additions & 1 deletion pkg/heartbeat/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,64 @@ package heartbeat
import (
"testing"

appstatetypes "github.com/replicatedhq/replicated-sdk/pkg/appstate/types"
"github.com/replicatedhq/replicated-sdk/pkg/heartbeat/types"
"github.com/replicatedhq/replicated-sdk/pkg/k8sutil"
"github.com/replicatedhq/replicated-sdk/pkg/util"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes/fake"
)

func TestInjectHeartbeatInfoPayload(t *testing.T) {
heartbeatInfo := &types.HeartbeatInfo{
AppStatus: "ready",
ResourceStates: appstatetypes.ResourceStates{
{
Kind: "Deployment",
Name: "test-deployment",
Namespace: "test-namespace",
State: appstatetypes.StateDegraded,
},
{
Kind: "Service",
Name: "test-service",
Namespace: "test-namespace",
State: appstatetypes.StateUnavailable,
},
},
}

reqPayload := make(map[string]interface{})

err := InjectHeartbeatInfoPayload(reqPayload, heartbeatInfo)
assert.NoError(t, err)

expectedPayload := map[string]interface{}{
"resource_states": `[{"kind":"Deployment","name":"test-deployment","namespace":"test-namespace","state":"degraded"},{"kind":"Service","name":"test-service","namespace":"test-namespace","state":"unavailable"}]`,
}
assert.Equal(t, expectedPayload, reqPayload)

// test nil heartbeat info
reqPayload = make(map[string]interface{})

err = InjectHeartbeatInfoPayload(reqPayload, nil)
assert.NoError(t, err)

expectedPayload = map[string]interface{}{}
assert.Equal(t, expectedPayload, reqPayload)

// test empty app status
reqPayload = make(map[string]interface{})

err = InjectHeartbeatInfoPayload(reqPayload, &types.HeartbeatInfo{
AppStatus: "",
})
assert.NoError(t, err)

expectedPayload = map[string]interface{}{}
assert.Equal(t, expectedPayload, reqPayload)
}

func TestGetHeartbeatInfoHeaders(t *testing.T) {
heartbeatInfo := &types.HeartbeatInfo{
AppStatus: "ready",
Expand All @@ -32,11 +83,18 @@ func TestGetHeartbeatInfoHeaders(t *testing.T) {
"X-Replicated-DownstreamChannelSequence": "42",
"X-Replicated-K8sDistribution": "k3s",
}

assert.Equal(t, expectedHeaders, headers)

// nil heartbeat info
nilHeaders := GetHeartbeatInfoHeaders(nil)
assert.Empty(t, nilHeaders)

// empty app status
emptyAppStatusHeaders := GetHeartbeatInfoHeaders(&types.HeartbeatInfo{
AppStatus: "",
})
_, appStatusOk := emptyAppStatusHeaders["X-Replicated-AppStatus"]
assert.False(t, appStatusOk)
}

func TestCanReport(t *testing.T) {
Expand Down
8 changes: 0 additions & 8 deletions pkg/store/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,6 @@ func (s *InMemoryStore) GetNamespace() string {
}

func (s *InMemoryStore) GetAppStatus() appstatetypes.AppStatus {
if s.appStatus.State == "" {
// initialize with none state so that subsequent changes will trigger reporting
return appstatetypes.AppStatus{
AppSlug: s.appSlug,
Sequence: s.releaseSequence,
State: appstatetypes.StateMissing,
}
}
return s.appStatus
}

Expand Down
11 changes: 4 additions & 7 deletions pkg/upstream/replicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,12 @@ func GetUpdates(sdkStore store.Store, license *kotsv1beta1.License, currentCurso

url := fmt.Sprintf("%s://%s/release/%s/pending?%s", u.Scheme, hostname, license.Spec.AppSlug, urlValues.Encode())

// build the request body
heartbeatInfo := heartbeat.GetHeartbeatInfo(sdkStore)

marshalledRS, err := json.Marshal(heartbeatInfo.ResourceStates)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal resource states")
}
reqPayload := map[string]interface{}{
"resource_states": string(marshalledRS),
// build the request body
reqPayload := map[string]interface{}{}
if err := heartbeat.InjectHeartbeatInfoPayload(reqPayload, heartbeatInfo); err != nil {
return nil, errors.Wrap(err, "failed to inject heartbeat info payload")
}
reqBody, err := json.Marshal(reqPayload)
if err != nil {
Expand Down

0 comments on commit 172b9c2

Please sign in to comment.