Skip to content

Commit

Permalink
fix: handle double submission issue
Browse files Browse the repository at this point in the history
When a sparkapp CRD is created, on ofas we received 2 times sparkapp events with state `NewState`, that's because we mutate the CRD.

This fix just store each submission appID on a cache with TTL, whatever the result of the submission, we skip the second one
  • Loading branch information
originou committed Jul 29, 2024
1 parent 848e664 commit 7bdf2d0
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 6 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ require (
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/google/go-cloud v0.1.1
github.com/google/uuid v1.1.2
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/olekukonko/tablewriter v0.0.4
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.2.0
github.com/robfig/cron v1.2.0
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.2
golang.org/x/net v0.0.0-20220722155237-a158d28d115b
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sync v0.1.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
k8s.io/api v0.25.3
k8s.io/apiextensions-apiserver v0.25.3
Expand Down
12 changes: 9 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5/go.mod h1:DM4VvS+hD/kDi1U1QsX2fnZowwBhqD0Dk3bRPKF/Oc8=
github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/jimstudt/http-authentication v0.0.0-20140401203705-3eca13d6893a/go.mod h1:wK6yTYYcgjHE1Z1QtXACPDjcFJyBskHEdagmnq3vsP8=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down Expand Up @@ -777,8 +779,9 @@ github.com/storageos/go-api v0.0.0-20180912212459-343b3eff91fc/go.mod h1:ZrLn+e0
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand All @@ -787,8 +790,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/thecodeteam/goscaleio v0.1.0/go.mod h1:68sdkZAsK8bvEwBlbQnlLS+xU+hvLYM/iQ8KXej1AwM=
Expand Down Expand Up @@ -866,6 +870,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
Expand Down Expand Up @@ -1026,8 +1031,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180329131831-378d26f46672/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
14 changes: 13 additions & 1 deletion pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Controller struct {
enableUIService bool
disableExecutorReporting bool
executorsProcessingLimit int
submissionCache *SubmissionCache
}

// NewController creates a new Controller.
Expand Down Expand Up @@ -139,6 +140,7 @@ func newSparkApplicationController(
enableUIService: enableUIService,
disableExecutorReporting: disableExecutorReporting,
executorsProcessingLimit: executorsProcessingLimit,
submissionCache: NewSubmissionCache(5 * time.Minute),
}

if metricsConfig != nil {
Expand Down Expand Up @@ -566,7 +568,17 @@ func (c *Controller) syncSparkApplication(key string) error {
appCopy.Status.AppState.State = v1beta2.FailedState
appCopy.Status.AppState.ErrorMessage = err.Error()
} else {
appCopy = c.submitSparkApplication(appCopy)
// related to ofas, we received 2 times the CRD events with NewState state
// we need to check if the submission was already done
// and skip the submission if it was already done
if !c.submissionCache.Exist(key) {
appCopy = c.submitSparkApplication(appCopy)
// whatever the result of the submission, we don't want to retry the submission
c.submissionCache.Set(key)
} else {
glog.V(2).Infof("submission attempt already done for: %q, skip it", key)
return nil
}
}
case v1beta2.SucceedingState:
if !shouldRetry(appCopy) {
Expand Down
224 changes: 224 additions & 0 deletions pkg/controller/sparkapplication/controller_another_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package sparkapplication

import (
"context"
"fmt"
"os"
"os/exec"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
kubeclientfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"

"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
crdclientfake "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/fake"
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/util"
)

// separate this test with the original `controller_test.go` to simplify rebasing process in the future

// newAnotherFakeController is a copy of the function from the original controller_test
// except we don't enable the UIService (Behavior from ofas spark-operator)
func newAnotherFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Controller, *record.FakeRecorder) {
crdclientfake.AddToScheme(scheme.Scheme)
crdClient := crdclientfake.NewSimpleClientset()
kubeClient := kubeclientfake.NewSimpleClientset()
util.IngressCapabilities = map[string]bool{"networking.k8s.io/v1": true}
informerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0*time.Second)
recorder := record.NewFakeRecorder(3)

kubeClient.CoreV1().Nodes().Create(context.TODO(), &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
{
Type: apiv1.NodeExternalIP,
Address: "12.34.56.78",
},
},
},
}, metav1.CreateOptions{})

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)
controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", "", nil, false, false, util.RatelimitConfig{}, 5)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
informer.GetIndexer().Add(app)
}

podInformer := podInformerFactory.Core().V1().Pods().Informer()
for _, pod := range pods {
if pod != nil {
podInformer.GetIndexer().Add(pod)
}
}
return controller, recorder
}

func TestSyncSparkApplication_When_Submission_Successes(t *testing.T) {

/*
test normal case when the submission is successes
we received 2 times the NewState state for each CRD
- first time, the controller will submit the application
- second time, the controller should skip the submission
Check submission is done only once and the expected state is SubmittedState
*/

originalSparkHome := os.Getenv(sparkHomeEnvVar)
originalKubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar)
originalKubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar)
os.Setenv(sparkHomeEnvVar, "/spark")
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")
defer func() {
os.Setenv(sparkHomeEnvVar, originalSparkHome)
os.Setenv(kubernetesServiceHostEnvVar, originalKubernetesServiceHost)
os.Setenv(kubernetesServicePortEnvVar, originalKubernetesServicePort)
}()

restartPolicyNever := v1beta2.RestartPolicy{
Type: v1beta2.Never,
}

// Create a new SparkApplication with NewState
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: restartPolicyNever,
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.NewState,
},
},
}

ctrl, _ := newAnotherFakeController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

// Mock the execCommand to return a success
execCommand = func(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcessSuccess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}

// simulate the first NewState
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State)
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))

// simulate the second NewState (should skip the submission)
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
// check the state is still submitted
assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State)
// check the submit count does not change
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
}

func TestSyncSparkApplication_When_Submission_Fails(t *testing.T) {
/*
test case when the submission is failed the first time
we received 2 times the NewState state for each CRD
- first time, the controller will submit the application and failed
- second time, the controller should skip the submission
Check submission is done only once and the expected state is FailedSubmissionState
*/
originalSparkHome := os.Getenv(sparkHomeEnvVar)
originalKubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar)
originalKubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar)
os.Setenv(sparkHomeEnvVar, "/spark")
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")
defer func() {
os.Setenv(sparkHomeEnvVar, originalSparkHome)
os.Setenv(kubernetesServiceHostEnvVar, originalKubernetesServiceHost)
os.Setenv(kubernetesServicePortEnvVar, originalKubernetesServicePort)
}()

restartPolicyNever := v1beta2.RestartPolicy{
Type: v1beta2.Never,
}

// Create a new SparkApplication with NewState
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: restartPolicyNever,
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.NewState,
},
},
}

ctrl, _ := newAnotherFakeController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

// Mock the execCommand to return a failure
execCommand = func(command string, args ...string) *exec.Cmd {
cmd := exec.Command("/bin/should-fail")
return cmd
}

err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))

// simulate the second NewState (should skip the submission)

// This time, mock the command to be successful, but we expected the command is not executed
execCommand = func(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcessSuccess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
// check the CR state is still failedSubmission
assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
// check the submit count does not change
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))
}
34 changes: 34 additions & 0 deletions pkg/controller/sparkapplication/submission_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sparkapplication

/*
A simple cache implementation that stores keys with a time-to-live (TTL) value.
*/

import (
"github.com/jellydator/ttlcache/v3"
"time"
)

type SubmissionCache struct {
cache *ttlcache.Cache[string, any] // value is not used
}

func NewSubmissionCache(ttl time.Duration) *SubmissionCache {
cache := ttlcache.New[string, any](
ttlcache.WithTTL[string, any](ttl),
)

c := &SubmissionCache{
cache: cache,
}
go cache.Start() // start the cache cleanup goroutine
return c
}

func (c *SubmissionCache) Set(key string) {
c.cache.Set(key, nil, ttlcache.DefaultTTL)
}

func (c *SubmissionCache) Exist(key string) bool {
return c.cache.Has(key)
}
17 changes: 17 additions & 0 deletions pkg/controller/sparkapplication/submission_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sparkapplication

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSubmissionCacheExist(t *testing.T) {
cache := NewSubmissionCache(1 * time.Second)
assert.False(t, cache.Exist("key1"))
cache.Set("key1")
assert.True(t, cache.Exist("key1"))
time.Sleep(2 * time.Second)
assert.False(t, cache.Exist("key1"))
}

0 comments on commit 7bdf2d0

Please sign in to comment.