diff --git a/go.mod b/go.mod index 1a6a3583d..2393621f6 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.2.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/go.sum b/go.sum index b317f96cc..1d504e19e 100644 --- a/go.sum +++ b/go.sum @@ -1515,6 +1515,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5/go.mod h1:DM4VvS+hD/kDi1U1QsX2fnZowwBhqD0Dk3bRPKF/Oc8= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= +github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jimstudt/http-authentication v0.0.0-20140401203705-3eca13d6893a/go.mod h1:wK6yTYYcgjHE1Z1QtXACPDjcFJyBskHEdagmnq3vsP8= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= diff --git a/pkg/controller/sparkapplication/controller.go b/pkg/controller/sparkapplication/controller.go index 5fedd2ea1..51e65fa4d 100644 --- a/pkg/controller/sparkapplication/controller.go +++ b/pkg/controller/sparkapplication/controller.go @@ -81,6 +81,7 @@ type Controller struct { enableUIService bool disableExecutorReporting bool executorsProcessingLimit int + submissionCache *SubmissionCache } // NewController creates a new Controller. @@ -139,6 +140,7 @@ func newSparkApplicationController( enableUIService: enableUIService, disableExecutorReporting: disableExecutorReporting, executorsProcessingLimit: executorsProcessingLimit, + submissionCache: NewSubmissionCache(5 * time.Minute), } if metricsConfig != nil { @@ -566,7 +568,17 @@ func (c *Controller) syncSparkApplication(key string) error { appCopy.Status.AppState.State = v1beta2.FailedState appCopy.Status.AppState.ErrorMessage = err.Error() } else { - appCopy = c.submitSparkApplication(appCopy) + // related to ofas, we received 2 times the CRD events with NewState state + // we need to check if the submission was already done + // and skip the submission if it was already done + if !c.submissionCache.Exist(key) { + appCopy = c.submitSparkApplication(appCopy) + // whatever the result of the submission, we don't want to retry the submission + c.submissionCache.Set(key) + } else { + glog.V(2).Infof("submission attempt already done for: %q, skip it", key) + return nil + } } case v1beta2.SucceedingState: if !shouldRetry(appCopy) { diff --git a/pkg/controller/sparkapplication/controller_another_test.go b/pkg/controller/sparkapplication/controller_another_test.go new file mode 100644 index 000000000..62b50e57f --- /dev/null +++ b/pkg/controller/sparkapplication/controller_another_test.go @@ -0,0 +1,224 @@ +package sparkapplication + +import ( + "context" + "fmt" + "os" + "os/exec" + "testing" + "time" + + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + kubeclientfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + + "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2" + crdclientfake "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/fake" + crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions" + "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/util" +) + +// separate this test with the original `controller_test.go` to simplify rebasing process in the future + +// newAnotherFakeController is a copy of the function from the original controller_test +// except we don't enable the UIService (Behavior from ofas spark-operator) +func newAnotherFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Controller, *record.FakeRecorder) { + crdclientfake.AddToScheme(scheme.Scheme) + crdClient := crdclientfake.NewSimpleClientset() + kubeClient := kubeclientfake.NewSimpleClientset() + util.IngressCapabilities = map[string]bool{"networking.k8s.io/v1": true} + informerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0*time.Second) + recorder := record.NewFakeRecorder(3) + + kubeClient.CoreV1().Nodes().Create(context.TODO(), &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + { + Type: apiv1.NodeExternalIP, + Address: "12.34.56.78", + }, + }, + }, + }, metav1.CreateOptions{}) + + podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second) + controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder, + &util.MetricConfig{}, "", "", nil, false, false, util.RatelimitConfig{}, 5) + + informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer() + if app != nil { + informer.GetIndexer().Add(app) + } + + podInformer := podInformerFactory.Core().V1().Pods().Informer() + for _, pod := range pods { + if pod != nil { + podInformer.GetIndexer().Add(pod) + } + } + return controller, recorder +} + +func TestSyncSparkApplication_When_Submission_Successes(t *testing.T) { + + /* + test normal case when the submission is successes + we received 2 times the NewState state for each CRD + - first time, the controller will submit the application + - second time, the controller should skip the submission + Check submission is done only once and the expected state is SubmittedState + */ + + originalSparkHome := os.Getenv(sparkHomeEnvVar) + originalKubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar) + originalKubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar) + os.Setenv(sparkHomeEnvVar, "/spark") + os.Setenv(kubernetesServiceHostEnvVar, "localhost") + os.Setenv(kubernetesServicePortEnvVar, "443") + defer func() { + os.Setenv(sparkHomeEnvVar, originalSparkHome) + os.Setenv(kubernetesServiceHostEnvVar, originalKubernetesServiceHost) + os.Setenv(kubernetesServicePortEnvVar, originalKubernetesServicePort) + }() + + restartPolicyNever := v1beta2.RestartPolicy{ + Type: v1beta2.Never, + } + + // Create a new SparkApplication with NewState + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + Spec: v1beta2.SparkApplicationSpec{ + RestartPolicy: restartPolicyNever, + }, + Status: v1beta2.SparkApplicationStatus{ + AppState: v1beta2.ApplicationState{ + State: v1beta2.NewState, + }, + }, + } + + ctrl, _ := newAnotherFakeController(app) + _, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + // Mock the execCommand to return a success + execCommand = func(command string, args ...string) *exec.Cmd { + cs := []string{"-test.run=TestHelperProcessSuccess", "--", command} + cs = append(cs, args...) + cmd := exec.Command(os.Args[0], cs...) + cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"} + return cmd + } + + // simulate the first NewState + err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Nil(t, err) + updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State) + assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{})) + + // simulate the second NewState (should skip the submission) + err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Nil(t, err) + updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) + assert.Nil(t, err) + // check the state is still submitted + assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State) + // check the submit count does not change + assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{})) +} + +func TestSyncSparkApplication_When_Submission_Fails(t *testing.T) { + /* + test case when the submission is failed the first time + we received 2 times the NewState state for each CRD + - first time, the controller will submit the application and failed + - second time, the controller should skip the submission + Check submission is done only once and the expected state is FailedSubmissionState + */ + originalSparkHome := os.Getenv(sparkHomeEnvVar) + originalKubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar) + originalKubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar) + os.Setenv(sparkHomeEnvVar, "/spark") + os.Setenv(kubernetesServiceHostEnvVar, "localhost") + os.Setenv(kubernetesServicePortEnvVar, "443") + defer func() { + os.Setenv(sparkHomeEnvVar, originalSparkHome) + os.Setenv(kubernetesServiceHostEnvVar, originalKubernetesServiceHost) + os.Setenv(kubernetesServicePortEnvVar, originalKubernetesServicePort) + }() + + restartPolicyNever := v1beta2.RestartPolicy{ + Type: v1beta2.Never, + } + + // Create a new SparkApplication with NewState + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + Spec: v1beta2.SparkApplicationSpec{ + RestartPolicy: restartPolicyNever, + }, + Status: v1beta2.SparkApplicationStatus{ + AppState: v1beta2.ApplicationState{ + State: v1beta2.NewState, + }, + }, + } + + ctrl, _ := newAnotherFakeController(app) + _, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + // Mock the execCommand to return a failure + execCommand = func(command string, args ...string) *exec.Cmd { + cmd := exec.Command("/bin/should-fail") + return cmd + } + + err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Nil(t, err) + updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State) + assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{})) + assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{})) + + // simulate the second NewState (should skip the submission) + + // This time, mock the command to be successful, but we expected the command is not executed + execCommand = func(command string, args ...string) *exec.Cmd { + cs := []string{"-test.run=TestHelperProcessSuccess", "--", command} + cs = append(cs, args...) + cmd := exec.Command(os.Args[0], cs...) + cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"} + return cmd + } + err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Nil(t, err) + updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) + assert.Nil(t, err) + // check the CR state is still failedSubmission + assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State) + // check the submit count does not change + assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{})) + assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{})) +} diff --git a/pkg/controller/sparkapplication/submission_cache.go b/pkg/controller/sparkapplication/submission_cache.go new file mode 100644 index 000000000..0dd8d8353 --- /dev/null +++ b/pkg/controller/sparkapplication/submission_cache.go @@ -0,0 +1,34 @@ +package sparkapplication + +/* +A simple cache implementation that stores keys with a time-to-live (TTL) value. +*/ + +import ( + "github.com/jellydator/ttlcache/v3" + "time" +) + +type SubmissionCache struct { + cache *ttlcache.Cache[string, any] // value is not used +} + +func NewSubmissionCache(ttl time.Duration) *SubmissionCache { + cache := ttlcache.New[string, any]( + ttlcache.WithTTL[string, any](ttl), + ) + + c := &SubmissionCache{ + cache: cache, + } + go cache.Start() // start the cache cleanup goroutine + return c +} + +func (c *SubmissionCache) Set(key string) { + c.cache.Set(key, nil, ttlcache.DefaultTTL) +} + +func (c *SubmissionCache) Exist(key string) bool { + return c.cache.Has(key) +} diff --git a/pkg/controller/sparkapplication/submission_cache_test.go b/pkg/controller/sparkapplication/submission_cache_test.go new file mode 100644 index 000000000..54c32166b --- /dev/null +++ b/pkg/controller/sparkapplication/submission_cache_test.go @@ -0,0 +1,17 @@ +package sparkapplication + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSubmissionCacheExist(t *testing.T) { + cache := NewSubmissionCache(1 * time.Second) + assert.False(t, cache.Exist("key1")) + cache.Set("key1") + assert.True(t, cache.Exist("key1")) + time.Sleep(2 * time.Second) + assert.False(t, cache.Exist("key1")) +}