Skip to content

Commit

Permalink
fix: handle double submission issue
Browse files Browse the repository at this point in the history
When a sparkapp CRD is created, on ofas we received 2 times sparkapp events with state `NewState`, that's because we mutate the CRD.

This fix just store each submission appID on a cache with TTL, whatever the result of the submission, we skip the second one
  • Loading branch information
originou committed Jun 4, 2024
1 parent b0f3ed0 commit 4a47bed
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 1 deletion.
14 changes: 13 additions & 1 deletion pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Controller struct {
enableUIService bool
disableExecutorReporting bool
executorsProcessingLimit int
submissionCache *SubmissionCache
}

// NewController creates a new Controller.
Expand Down Expand Up @@ -139,6 +140,7 @@ func newSparkApplicationController(
enableUIService: enableUIService,
disableExecutorReporting: disableExecutorReporting,
executorsProcessingLimit: executorsProcessingLimit,
submissionCache: NewSubmissionCache(5 * time.Minute),
}

if metricsConfig != nil {
Expand Down Expand Up @@ -566,7 +568,17 @@ func (c *Controller) syncSparkApplication(key string) error {
appCopy.Status.AppState.State = v1beta2.FailedState
appCopy.Status.AppState.ErrorMessage = err.Error()
} else {
appCopy = c.submitSparkApplication(appCopy)
// related to ofas, we received 2 times the CRD events with NewState state
// we need to check if the submission was already done
// and skip the submission if it was already done
if c.submissionCache.Exist(key) == false {
appCopy = c.submitSparkApplication(appCopy)
// whatever the result of the submission, we don't want to retry the submission
c.submissionCache.Set(key)
} else {
glog.V(2).Infof("submission attempt already done for: %q, skip it", key)
return nil
}
}
case v1beta2.SucceedingState:
if !shouldRetry(appCopy) {
Expand Down
206 changes: 206 additions & 0 deletions pkg/controller/sparkapplication/controller_ofas_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package sparkapplication

import (
"context"
"fmt"
"os"
"os/exec"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
kubeclientfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"

"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
crdclientfake "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/fake"
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/util"
)

// newFakeOfasController is a copy of the function from the original controller_test
// except we don't enable the UIService (Behavior from ofas spark-operator)
func newFakeOfasController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Controller, *record.FakeRecorder) {
crdclientfake.AddToScheme(scheme.Scheme)
crdClient := crdclientfake.NewSimpleClientset()
kubeClient := kubeclientfake.NewSimpleClientset()
util.IngressCapabilities = map[string]bool{"networking.k8s.io/v1": true}
informerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0*time.Second)
recorder := record.NewFakeRecorder(3)

kubeClient.CoreV1().Nodes().Create(context.TODO(), &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
{
Type: apiv1.NodeExternalIP,
Address: "12.34.56.78",
},
},
},
}, metav1.CreateOptions{})

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)
controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", "", nil, false, false, util.RatelimitConfig{}, 5)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
informer.GetIndexer().Add(app)
}

podInformer := podInformerFactory.Core().V1().Pods().Informer()
for _, pod := range pods {
if pod != nil {
podInformer.GetIndexer().Add(pod)
}
}
return controller, recorder
}

func TestSyncSparkApplication_When_Submission_Successes(t *testing.T) {

/*
test normal case when the submission is successes
we received 2 times the NewState state for each CRD
- first time, the controller will submit the application
- second time, the controller should skip the submission
Check submission is done only once and the expected state is SubmittedState
*/

os.Setenv(sparkHomeEnvVar, "/spark")
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")

restartPolicyNever := v1beta2.RestartPolicy{
Type: v1beta2.Never,
}

// Create a new SparkApplication with NewState
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: restartPolicyNever,
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.NewState,
},
},
}

ctrl, _ := newFakeOfasController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

// Mock the execCommand to return a success
execCommand = func(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcessSuccess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}

// simulate the first NewState
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State)
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))

// simulate the second NewState (should skip the submission)
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
// check the state is still submitted
assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State)
// check the submit count does not change
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
}

func TestSyncSparkApplication_When_Submission_Fails(t *testing.T) {
/*
test case when the submission is failed the first time
we received 2 times the NewState state for each CRD
- first time, the controller will submit the application and failed
- second time, the controller should skip the submission
Check submission is done only once and the expected state is FailedSubmissionState
*/
os.Setenv(sparkHomeEnvVar, "/spark")
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")

restartPolicyNever := v1beta2.RestartPolicy{
Type: v1beta2.Never,
}

// Create a new SparkApplication with NewState
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: restartPolicyNever,
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.NewState,
},
},
}

ctrl, _ := newFakeOfasController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

// Mock the execCommand to return a failure
execCommand = func(command string, args ...string) *exec.Cmd {
cmd := exec.Command("/bin/should-fail")
return cmd
}

err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))

// simulate the second NewState (should skip the submission)

// This time, mock the command to be successful, but we expected the command is not executed
execCommand = func(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcessSuccess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
// check the CR state is still failedSubmission
assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
// check the submit count does not change
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))
}
66 changes: 66 additions & 0 deletions pkg/controller/sparkapplication/submission_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package sparkapplication

/*
A simple cache implementation that stores keys with a time-to-live (TTL) value.
*/

import (
"sync"
"time"
)

type item struct {
expiration int64
}

type SubmissionCache struct {
ttl time.Duration
items map[string]*item
mu sync.RWMutex
}

func NewSubmissionCache(ttl time.Duration) *SubmissionCache {
c := &SubmissionCache{
ttl: ttl,
items: make(map[string]*item),
}

go c.startEviction()

return c
}

func (c *SubmissionCache) Set(key string) {
c.mu.Lock()
defer c.mu.Unlock()

c.items[key] = &item{
expiration: time.Now().Add(c.ttl).UnixNano(),
}
}

func (c *SubmissionCache) Exist(key string) bool {
c.mu.RLock()
defer c.mu.RUnlock()

item, found := c.items[key]
if !found || item.expiration < time.Now().UnixNano() {
return false
}

return true
}

func (c *SubmissionCache) startEviction() {
for {
time.Sleep(c.ttl)

c.mu.Lock()
for key, item := range c.items {
if time.Now().UnixNano() > item.expiration {
delete(c.items, key)
}
}
c.mu.Unlock()
}
}
17 changes: 17 additions & 0 deletions pkg/controller/sparkapplication/submission_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sparkapplication

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSubmissionCacheExist(t *testing.T) {
cache := NewSubmissionCache(1 * time.Second)
assert.False(t, cache.Exist("key1"))
cache.Set("key1")
assert.True(t, cache.Exist("key1"))
time.Sleep(2 * time.Second)
assert.False(t, cache.Exist("key1"))
}

0 comments on commit 4a47bed

Please sign in to comment.