Skip to content

Commit

Permalink
Merge pull request #130 from hbelmiro/revert-swf-fixes
Browse files Browse the repository at this point in the history
Revert swf fixes
  • Loading branch information
HumairAK authored Jan 23, 2025
2 parents d9ec34e + fb7c5f0 commit 4453f41
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 172 deletions.
6 changes: 3 additions & 3 deletions backend/src/apiserver/client/scheduled_workflow_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (c *FakeScheduledWorkflowClient) Get(ctx context.Context, name string, opti
return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("scheduledworkflows.kubeflow.org"), name)
}

func (c *FakeScheduledWorkflowClient) Update(_ context.Context, scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow
return scheduledWorkflow, nil
func (c *FakeScheduledWorkflowClient) Update(context.Context, *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
}

func (c *FakeScheduledWorkflowClient) DeleteCollection(ctx context.Context, options *v1.DeleteOptions, listOptions v1.ListOptions) error {
Expand Down
19 changes: 3 additions & 16 deletions backend/src/apiserver/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"math"
"reflect"
"strings"

Expand Down Expand Up @@ -98,13 +97,6 @@ type Options struct {
*token
}

func EmptyOptions() *Options {
return &Options{
math.MaxInt32,
&token{},
}
}

// Matches returns trues if the sorting and filtering criteria in o matches that
// of the one supplied in opts.
func (o *Options) Matches(opts *Options) bool {
Expand Down Expand Up @@ -221,14 +213,9 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild
if o.IsDesc {
order = "DESC"
}

if o.SortByFieldName != "" {
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order))
}

if o.KeyFieldName != "" {
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
}
sqlBuilder = sqlBuilder.
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)).
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))

return sqlBuilder
}
Expand Down
7 changes: 0 additions & 7 deletions backend/src/apiserver/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package list

import (
"fmt"
"math"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -647,11 +645,6 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) {
wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124",
wantArgs: nil,
},
{
in: EmptyOptions(),
wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1),
wantArgs: nil,
},
{
in: &Options{
PageSize: 123,
Expand Down
18 changes: 1 addition & 17 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/kubeflow/pipelines/backend/src/apiserver/client"
Expand Down Expand Up @@ -138,25 +137,10 @@ func main() {
}
log.SetLevel(level)

backgroundCtx, backgroundCancel := context.WithCancel(context.Background())
defer backgroundCancel()
wg := sync.WaitGroup{}
wg.Add(1)
go reconcileSwfCrs(resourceManager, backgroundCtx, &wg)
go startRpcServer(resourceManager, tlsConfig)
// This is blocking
startHttpProxy(resourceManager, tlsConfig)
backgroundCancel()
clientManager.Close()
wg.Wait()
}

func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
err := resourceManager.ReconcileSwfCrs(ctx)
if err != nil {
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err)
}
clientManager.Close()
}

// A custom http request header matcher to pass on the user identity
Expand Down
108 changes: 16 additions & 92 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (
"context"
"encoding/json"
"fmt"
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"io"
"net"
"reflect"
"strconv"
"time"

Expand Down Expand Up @@ -584,77 +582,6 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
return newRun, nil
}

// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs.
func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
filterContext := &model.FilterContext{
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()},
}

opts := list.EmptyOptions()

jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)

if err != nil {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}

for i := range jobs {
select {
case <-ctx.Done():
return nil
default:
}

tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToReconcileSwfCrsError(err)
}

newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i], r.getOwnerReferences())
if err != nil {
return failedToReconcileSwfCrsError(err)
}

for {
currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
if err != nil {
if util.IsNotFound(err) {
break
}
return failedToReconcileSwfCrsError(err)
}

if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec
err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow)
if err != nil {
if apierrors.IsConflict(errors.Unwrap(err)) {
continue
} else if util.IsNotFound(errors.Cause(err)) {
break
}
return failedToReconcileSwfCrsError(err)
}
}
break
}
}

return nil
}

func failedToReconcileSwfCrsError(err error) error {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}

func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error {
_, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow)
if err != nil {
return util.Wrap(err, "Failed to update ScheduledWorkflow")
}
return nil
}

// Fetches a run with a given id.
func (r *ResourceManager) GetRun(runId string) (*model.Run, error) {
run, err := r.runStore.GetRun(runId)
Expand Down Expand Up @@ -1063,6 +990,12 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences())
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
// Create a new ScheduledWorkflow at the ScheduledWorkflow client.
k8sNamespace := job.Namespace
if k8sNamespace == "" {
Expand All @@ -1071,15 +1004,6 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
if k8sNamespace == "" {
return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo scheduled workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a recurring run due to empty namespace")
}

job.Namespace = k8sNamespace

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences())
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
Expand All @@ -1091,23 +1015,23 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
swf := util.NewScheduledWorkflow(newScheduledWorkflow)
job.UUID = string(swf.UID)
job.K8SName = swf.Name
job.Namespace = swf.Namespace
job.Conditions = model.StatusState(swf.ConditionSummary()).ToString()
for _, modelRef := range job.ResourceReferences {
modelRef.ResourceUUID = string(swf.UID)
}
if tmpl.GetTemplateType() == template.V1 {
// Get the service account
serviceAccount := ""
if swf.Spec.Workflow != nil {
execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow)
if err == nil {
serviceAccount = execSpec.ServiceAccount()
}
// Get the service account
serviceAccount := ""
if swf.Spec.Workflow != nil {
execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow)
if err == nil {
serviceAccount = execSpec.ServiceAccount()
}
job.ServiceAccount = serviceAccount
}
job.ServiceAccount = serviceAccount
if tmpl.GetTemplateType() == template.V1 {
job.PipelineSpec.WorkflowSpecManifest = manifest
} else {
job.ServiceAccount = newScheduledWorkflow.Spec.ServiceAccount
job.PipelineSpec.PipelineSpecManifest = manifest
}
return r.jobStore.CreateJob(job)
Expand Down
29 changes: 0 additions & 29 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3200,35 +3200,6 @@ func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing.
assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1())
}

func TestReconcileSwfCrs(t *testing.T) {
store, manager, job := initWithJobV2(t)
defer store.Close()

fetchedJob, err := manager.GetJob(job.UUID)
require.Nil(t, err)
require.NotNil(t, fetchedJob)

swfClient := store.SwfClient().ScheduledWorkflow("ns1")

options := v1.GetOptions{}
ctx := context.Background()

swf, err := swfClient.Get(ctx, "job-", options)
require.Nil(t, err)

// emulates an invalid/outdated spec
swf.Spec.Workflow.Spec = nil
swf, err = swfClient.Update(ctx, swf)
require.Nil(t, swf.Spec.Workflow.Spec)

err = manager.ReconcileSwfCrs(ctx)
require.Nil(t, err)

swf, err = swfClient.Get(ctx, "job-", options)
require.Nil(t, err)
require.NotNil(t, swf.Spec.Workflow.Spec)
}

func TestReportScheduledWorkflowResource_Error(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
Expand Down
7 changes: 3 additions & 4 deletions backend/src/apiserver/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,9 @@ func TestScheduledWorkflow(t *testing.T) {
Parameters: []scheduledworkflow.Parameter{{Name: "y", Value: "\"world\""}},
Spec: "",
},
PipelineId: "1",
PipelineName: "pipeline name",
NoCatchup: util.BoolPointer(true),
ServiceAccount: "pipeline-runner",
PipelineId: "1",
PipelineName: "pipeline name",
NoCatchup: util.BoolPointer(true),
},
}

Expand Down
6 changes: 2 additions & 4 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1
if modelJob.Namespace != "" {
executionSpec.SetExecutionNamespace(modelJob.Namespace)
}
if executionSpec.ServiceAccount() == "" {
setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount)
}
setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount)
// Disable istio sidecar injection if not specified
executionSpec.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled)
swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName)
Expand Down Expand Up @@ -137,7 +135,7 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1
PipelineId: modelJob.PipelineId,
PipelineName: modelJob.PipelineName,
PipelineVersionId: modelJob.PipelineVersionId,
ServiceAccount: executionSpec.ServiceAccount(),
ServiceAccount: modelJob.ServiceAccount,
},
}
return scheduledWorkflow, nil
Expand Down

0 comments on commit 4453f41

Please sign in to comment.