Skip to content

Commit

Permalink
fix: handle double submission issue
Browse files Browse the repository at this point in the history
When a sparkapp CRD is created, on ofas we received 2 times sparkapp events with state `NewState`, that's because we mutate the CRD.

This fix just store each submission appID on a cache with TTL, whatever the result of the submission, we skip the second one
  • Loading branch information
originou committed Aug 19, 2024
1 parent 91653f6 commit 5a60166
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 1 deletion.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.2.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5/go.mod h1:DM4VvS+hD/kDi1U1QsX2fnZowwBhqD0Dk3bRPKF/Oc8=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/jimstudt/http-authentication v0.0.0-20140401203705-3eca13d6893a/go.mod h1:wK6yTYYcgjHE1Z1QtXACPDjcFJyBskHEdagmnq3vsP8=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down
14 changes: 13 additions & 1 deletion pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Controller struct {
enableUIService bool
disableExecutorReporting bool
executorsProcessingLimit int
submissionCache *SubmissionCache
}

// NewController creates a new Controller.
Expand Down Expand Up @@ -139,6 +140,7 @@ func newSparkApplicationController(
enableUIService: enableUIService,
disableExecutorReporting: disableExecutorReporting,
executorsProcessingLimit: executorsProcessingLimit,
submissionCache: NewSubmissionCache(5 * time.Minute),
}

if metricsConfig != nil {
Expand Down Expand Up @@ -566,7 +568,17 @@ func (c *Controller) syncSparkApplication(key string) error {
appCopy.Status.AppState.State = v1beta2.FailedState
appCopy.Status.AppState.ErrorMessage = err.Error()
} else {
appCopy = c.submitSparkApplication(appCopy)
// related to ofas, we received 2 times the CRD events with NewState state
// we need to check if the submission was already done
// and skip the submission if it was already done
if !c.submissionCache.Exist(key) {
appCopy = c.submitSparkApplication(appCopy)
// whatever the result of the submission, we don't want to retry the submission
c.submissionCache.Set(key)
} else {
glog.V(2).Infof("submission attempt already done for: %q, skip it", key)
return nil
}
}
case v1beta2.SucceedingState:
if !shouldRetry(appCopy) {
Expand Down
224 changes: 224 additions & 0 deletions pkg/controller/sparkapplication/controller_another_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package sparkapplication

import (
"context"
"fmt"
"os"
"os/exec"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
kubeclientfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"

"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
crdclientfake "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/fake"
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/util"
)

// separate this test with the original `controller_test.go` to simplify rebasing process in the future

// newAnotherFakeController is a copy of the function from the original controller_test
// except we don't enable the UIService (Behavior from ofas spark-operator)
func newAnotherFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Controller, *record.FakeRecorder) {
crdclientfake.AddToScheme(scheme.Scheme)
crdClient := crdclientfake.NewSimpleClientset()
kubeClient := kubeclientfake.NewSimpleClientset()
util.IngressCapabilities = map[string]bool{"networking.k8s.io/v1": true}
informerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0*time.Second)
recorder := record.NewFakeRecorder(3)

kubeClient.CoreV1().Nodes().Create(context.TODO(), &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
{
Type: apiv1.NodeExternalIP,
Address: "12.34.56.78",
},
},
},
}, metav1.CreateOptions{})

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)
controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", "", nil, false, false, util.RatelimitConfig{}, 5)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
informer.GetIndexer().Add(app)
}

podInformer := podInformerFactory.Core().V1().Pods().Informer()
for _, pod := range pods {
if pod != nil {
podInformer.GetIndexer().Add(pod)
}
}
return controller, recorder
}

func TestSyncSparkApplication_When_Submission_Successes(t *testing.T) {

/*
test normal case when the submission is successes
we received 2 times the NewState state for each CRD
- first time, the controller will submit the application
- second time, the controller should skip the submission
Check submission is done only once and the expected state is SubmittedState
*/

originalSparkHome := os.Getenv(sparkHomeEnvVar)
originalKubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar)
originalKubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar)
os.Setenv(sparkHomeEnvVar, "/spark")
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")
defer func() {
os.Setenv(sparkHomeEnvVar, originalSparkHome)
os.Setenv(kubernetesServiceHostEnvVar, originalKubernetesServiceHost)
os.Setenv(kubernetesServicePortEnvVar, originalKubernetesServicePort)
}()

restartPolicyNever := v1beta2.RestartPolicy{
Type: v1beta2.Never,
}

// Create a new SparkApplication with NewState
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: restartPolicyNever,
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.NewState,
},
},
}

ctrl, _ := newAnotherFakeController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

// Mock the execCommand to return a success
execCommand = func(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcessSuccess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}

// simulate the first NewState
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State)
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))

// simulate the second NewState (should skip the submission)
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
// check the state is still submitted
assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State)
// check the submit count does not change
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
}

func TestSyncSparkApplication_When_Submission_Fails(t *testing.T) {
/*
test case when the submission is failed the first time
we received 2 times the NewState state for each CRD
- first time, the controller will submit the application and failed
- second time, the controller should skip the submission
Check submission is done only once and the expected state is FailedSubmissionState
*/
originalSparkHome := os.Getenv(sparkHomeEnvVar)
originalKubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar)
originalKubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar)
os.Setenv(sparkHomeEnvVar, "/spark")
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")
defer func() {
os.Setenv(sparkHomeEnvVar, originalSparkHome)
os.Setenv(kubernetesServiceHostEnvVar, originalKubernetesServiceHost)
os.Setenv(kubernetesServicePortEnvVar, originalKubernetesServicePort)
}()

restartPolicyNever := v1beta2.RestartPolicy{
Type: v1beta2.Never,
}

// Create a new SparkApplication with NewState
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: restartPolicyNever,
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.NewState,
},
},
}

ctrl, _ := newAnotherFakeController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

// Mock the execCommand to return a failure
execCommand = func(command string, args ...string) *exec.Cmd {
cmd := exec.Command("/bin/should-fail")
return cmd
}

err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))

// simulate the second NewState (should skip the submission)

// This time, mock the command to be successful, but we expected the command is not executed
execCommand = func(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcessSuccess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
// check the CR state is still failedSubmission
assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
// check the submit count does not change
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))
}
34 changes: 34 additions & 0 deletions pkg/controller/sparkapplication/submission_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sparkapplication

/*
A simple cache implementation that stores keys with a time-to-live (TTL) value.
*/

import (
"github.com/jellydator/ttlcache/v3"
"time"
)

type SubmissionCache struct {
cache *ttlcache.Cache[string, any] // value is not used
}

func NewSubmissionCache(ttl time.Duration) *SubmissionCache {
cache := ttlcache.New[string, any](
ttlcache.WithTTL[string, any](ttl),
)

c := &SubmissionCache{
cache: cache,
}
go cache.Start() // start the cache cleanup goroutine
return c
}

func (c *SubmissionCache) Set(key string) {
c.cache.Set(key, nil, ttlcache.DefaultTTL)
}

func (c *SubmissionCache) Exist(key string) bool {
return c.cache.Has(key)
}
17 changes: 17 additions & 0 deletions pkg/controller/sparkapplication/submission_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sparkapplication

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSubmissionCacheExist(t *testing.T) {
cache := NewSubmissionCache(1 * time.Second)
assert.False(t, cache.Exist("key1"))
cache.Set("key1")
assert.True(t, cache.Exist("key1"))
time.Sleep(2 * time.Second)
assert.False(t, cache.Exist("key1"))
}

0 comments on commit 5a60166

Please sign in to comment.