Skip to content

Commit

Permalink
chore: minor pipeline controller refactor (#2039)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Sep 7, 2024
1 parent 3287887 commit 2189ea9
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 134 deletions.
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19691,7 +19691,7 @@
"x-kubernetes-patch-strategy": "merge"
},
"drainedOnPause": {
"description": "Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false",
"description": "Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused. True means it has been successfully drained.",
"type": "boolean"
},
"lastUpdated": {
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19678,7 +19678,7 @@
"x-kubernetes-patch-strategy": "merge"
},
"drainedOnPause": {
"description": "Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false",
"description": "Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused. True means it has been successfully drained.",
"type": "boolean"
},
"lastUpdated": {
Expand Down
1 change: 0 additions & 1 deletion config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9822,7 +9822,6 @@ spec:
type: object
type: array
drainedOnPause:
default: false
type: boolean
lastUpdated:
format: date-time
Expand Down
1 change: 0 additions & 1 deletion config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18095,7 +18095,6 @@ spec:
type: object
type: array
drainedOnPause:
default: false
type: boolean
lastUpdated:
format: date-time
Expand Down
1 change: 0 additions & 1 deletion config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18095,7 +18095,6 @@ spec:
type: object
type: array
drainedOnPause:
default: false
type: boolean
lastUpdated:
format: date-time
Expand Down
7 changes: 4 additions & 3 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -8011,11 +8011,12 @@ The generation observed by the Pipeline controller.

<td>

<em>(Optional)</em>
<p>

Field to indicate if a pipeline drain successfully occurred, or it timed
out. Set to true when the Pipeline is in Paused state, and after it has
successfully been drained. defaults to false
Field to indicate if a pipeline drain successfully occurred, only
meaningful when the pipeline is paused. True means it has been
successfully drained.
</p>

</td>
Expand Down
2 changes: 1 addition & 1 deletion examples/1-simple-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ spec:
- from: in
to: cat
- from: cat
to: out
to: out
7 changes: 3 additions & 4 deletions pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,9 @@ type PipelineStatus struct {
// The generation observed by the Pipeline controller.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty" protobuf:"varint,11,opt,name=observedGeneration"`
// Field to indicate if a pipeline drain successfully occurred, or it timed out.
// Set to true when the Pipeline is in Paused state, and after it has successfully been drained.
// defaults to false
// +kubebuilder:default=false
// Field to indicate if a pipeline drain successfully occurred, only meaningful when the pipeline is paused.
// True means it has been successfully drained.
// +optional
DrainedOnPause bool `json:"drainedOnPause,omitempty" protobuf:"bytes,12,opt,name=drainedOnPause"`
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/reconciler/monovertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (mr *monoVertexReconciler) reconcile(ctx context.Context, monoVtx *dfv1.Mon
}
}()

monoVtx.Status.InitializeConditions()
monoVtx.Status.SetObservedGeneration(monoVtx.Generation)
if monoVtx.Scalable() {
mr.scaler.StartWatching(mVtxKey)
Expand Down Expand Up @@ -251,7 +252,7 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df
if updatedReplicas+toBeUpdated > desiredReplicas {
toBeUpdated = desiredReplicas - updatedReplicas
}
log.Infof("Rolling update %d replicas, [%d, %d)\n", toBeUpdated, updatedReplicas, updatedReplicas+toBeUpdated)
log.Infof("Rolling update %d replicas, [%d, %d)", toBeUpdated, updatedReplicas, updatedReplicas+toBeUpdated)

// Create pods [updatedReplicas, updatedReplicas+toBeUpdated), and clean up any pods in that range that has a different hash
if err := mr.orchestratePodsFromTo(ctx, monoVtx, *podSpec, updatedReplicas, updatedReplicas+toBeUpdated, monoVtx.Status.UpdateHash); err != nil {
Expand Down Expand Up @@ -310,7 +311,7 @@ func (mr *monoVertexReconciler) cleanUpPodsFromTo(ctx context.Context, monoVtx *
if err := mr.client.Delete(ctx, &pod); err != nil {
return fmt.Errorf("failed to delete pod %s: %w", pod.Name, err)
}
log.Infof("Deleted MonoVertx pod %s\n", pod.Name)
log.Infof("Deleted MonoVertx pod %q", pod.Name)
mr.recorder.Eventf(monoVtx, corev1.EventTypeNormal, "DeletePodSuccess", "Succeeded to delete a mono vertex pod %s", pod.Name)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/monovertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int)
min := monoVtx.Spec.Scale.GetMinReplicas()
if desired > max {
desired = max
log.Infof("Calculated desired replica number %d of MonoVertex %q is greater than max, using max %d.", monoVtxName, desired, max)
log.Infof("Calculated desired replica number %d of MonoVertex %q is greater than max, using max %d.", desired, monoVtxName, max)
}
if desired < min {
desired = min
log.Infof("Calculated desired replica number %d of MonoVertex %q is smaller than min, using min %d.", monoVtxName, desired, min)
log.Infof("Calculated desired replica number %d of MonoVertex %q is smaller than min, using min %d.", desired, monoVtxName, min)
}
current := int32(monoVtx.Status.Replicas)
if current > max || current < min { // Someone might have manually scaled up/down the MonoVertex
Expand Down
66 changes: 33 additions & 33 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,25 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}
}()

pl.Status.InitConditions()
pl.Status.SetObservedGeneration(pl.Generation)
// Regular pipeline change
// This should be happening in all cases to ensure a clean initialization regardless of the lifecycle phase
// Eg: even for a pipeline started with desiredPhase = Pause, we should still create the resources for the pipeline
result, err := r.reconcileNonLifecycleChanges(ctx, pl)
if err != nil {
// Orchestrate pipeline sub resources.
// This should be happening in all cases to ensure a clean initialization regardless of the lifecycle phase.
// Eg: even for a pipeline started with desiredPhase = Pause, we should still create the resources for the pipeline.
if err := r.reconcileFixedResources(ctx, pl); err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", "Failed to reconcile pipeline: %v", err.Error())
return result, err
return ctrl.Result{}, err
}
// If the pipeline has a lifecycle change, then do not update the phase as
// this should happen only after the required configs for the lifecycle changes
// have been applied.
if !isLifecycleChange(pl) {
pl.Status.SetPhase(pl.Spec.Lifecycle.GetDesiredPhase(), "")
}
if err := r.checkChildrenResourceStatus(ctx, pl); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to check pipeline children resource status, %w", err)
}

// check if any changes related to pause/resume lifecycle for the pipeline
if isLifecycleChange(pl) {
oldPhase := pl.Status.Phase
Expand All @@ -171,7 +181,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}
return ctrl.Result{}, nil
}
return result, nil
return ctrl.Result{}, nil
}

// isLifecycleChange determines whether there has been a change requested in the lifecycle
Expand All @@ -190,17 +200,16 @@ func isLifecycleChange(pl *dfv1.Pipeline) bool {
return false
}

// reconcileNonLifecycleChanges do the jobs not related to pipeline lifecycle changes.
func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, pl *dfv1.Pipeline) (ctrl.Result, error) {
// reconcileFixedResources do the jobs of creating fixed resources such as daemon service, vertex objects, and ISB management jobs, etc
func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *dfv1.Pipeline) error {
log := logging.FromContext(ctx)
if !controllerutil.ContainsFinalizer(pl, finalizerName) {
controllerutil.AddFinalizer(pl, finalizerName)
}
pl.Status.InitConditions()
if err := ValidatePipeline(pl); err != nil {
log.Errorw("Validation failed", zap.Error(err))
pl.Status.MarkNotConfigured("InvalidSpec", err.Error())
return ctrl.Result{}, err
return err
}
pl.Status.SetVertexCounts(pl.Spec.Vertices)
pl.Status.MarkConfigured()
Expand All @@ -215,31 +224,31 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
if apierrors.IsNotFound(err) {
pl.Status.MarkDeployFailed("ISBSvcNotFound", "ISB Service not found.")
log.Errorw("ISB Service not found", zap.String("isbsvc", isbSvcName), zap.Error(err))
return ctrl.Result{}, fmt.Errorf("isbsvc %s not found", isbSvcName)
return fmt.Errorf("isbsvc %s not found", isbSvcName)
}
pl.Status.MarkDeployFailed("GetISBSvcFailed", err.Error())
log.Errorw("Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err))
return ctrl.Result{}, err
return err
}
if !isbSvc.Status.IsHealthy() {
pl.Status.MarkDeployFailed("ISBSvcNotHealthy", "ISB Service not healthy.")
log.Errorw("ISB Service is not in healthy status", zap.String("isbsvc", isbSvcName), zap.Error(err))
return ctrl.Result{}, fmt.Errorf("isbsvc not healthy")
return fmt.Errorf("isbsvc not healthy")
}

// Create or update the Side Inputs Manager deployments
if err := r.createOrUpdateSIMDeployments(ctx, pl, isbSvc.Status.Config); err != nil {
log.Errorw("Failed to create or update Side Inputs Manager deployments", zap.Error(err))
pl.Status.MarkDeployFailed("CreateOrUpdateSIMDeploymentsFailed", err.Error())
r.recorder.Eventf(pl, corev1.EventTypeWarning, "CreateOrUpdateSIMDeploymentsFailed", "Failed to create or update Side Inputs Manager deployments: %w", err.Error())
return ctrl.Result{}, err
return err
}

existingObjs, err := r.findExistingVertices(ctx, pl)
if err != nil {
log.Errorw("Failed to find existing vertices", zap.Error(err))
pl.Status.MarkDeployFailed("ListVerticesFailed", err.Error())
return ctrl.Result{}, err
return err
}
oldBuffers := make(map[string]string)
newBuffers := make(map[string]string)
Expand Down Expand Up @@ -279,7 +288,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
} else {
pl.Status.MarkDeployFailed("CreateVertexFailed", err.Error())
r.recorder.Eventf(pl, corev1.EventTypeWarning, "CreateVertexFailed", "Failed to create vertex: %w", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to create vertex, err: %w", err)
return fmt.Errorf("failed to create vertex, err: %w", err)
}
}
log.Infow("Created vertex successfully", zap.String("vertex", vertexName))
Expand All @@ -291,7 +300,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
if err := r.client.Update(ctx, &oldObj); err != nil {
pl.Status.MarkDeployFailed("UpdateVertexFailed", err.Error())
r.recorder.Eventf(pl, corev1.EventTypeWarning, "UpdateVertexFailed", "Failed to update vertex: %w", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to update vertex, err: %w", err)
return fmt.Errorf("failed to update vertex, err: %w", err)
}
log.Infow("Updated vertex successfully", zap.String("vertex", vertexName))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "UpdateVertexSuccess", "Updated vertex %s successfully", vertexName)
Expand All @@ -303,7 +312,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
if err := r.client.Delete(ctx, &v); err != nil {
pl.Status.MarkDeployFailed("DeleteStaleVertexFailed", err.Error())
r.recorder.Eventf(pl, corev1.EventTypeWarning, "DeleteStaleVertexFailed", "Failed to delete vertex: %w", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to delete vertex, err: %w", err)
return fmt.Errorf("failed to delete vertex, err: %w", err)
}
log.Infow("Deleted stale vertex successfully", zap.String("vertex", v.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "DeleteStaleVertexSuccess", "Deleted stale vertex %s successfully", v.Name)
Expand All @@ -328,7 +337,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-create", args, "cre")
if err := r.client.Create(ctx, batchJob); err != nil && !apierrors.IsAlreadyExists(err) {
pl.Status.MarkDeployFailed("CreateISBSvcCreatingJobFailed", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to create ISB Svc creating job, err: %w", err)
return fmt.Errorf("failed to create ISB Svc creating job, err: %w", err)
}
log.Infow("Created a job successfully for ISB Svc creating", zap.Any("buffers", bfs), zap.Any("buckets", bks), zap.Any("servingStreams", pl.GetServingSourceStreamNames()))
}
Expand All @@ -346,31 +355,22 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-delete", args, "del")
if err := r.client.Create(ctx, batchJob); err != nil && !apierrors.IsAlreadyExists(err) {
pl.Status.MarkDeployFailed("CreateISBSvcDeletingJobFailed", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to create ISB Svc deleting job, err: %w", err)
return fmt.Errorf("failed to create ISB Svc deleting job, err: %w", err)
}
log.Infow("Created ISB Svc deleting job successfully", zap.Any("buffers", bfs), zap.Any("buckets", bks))
}

// Daemon service
if err := r.createOrUpdateDaemonService(ctx, pl); err != nil {
return ctrl.Result{}, err
return err
}
// Daemon deployment
if err := r.createOrUpdateDaemonDeployment(ctx, pl, isbSvc.Status.Config); err != nil {
return ctrl.Result{}, err
return err
}

pl.Status.MarkDeployed()
// If the pipeline has a lifecycle change, then do not update the phase as
// this should happen only after the required configs for the lifecycle changes
// have been applied.
if !isLifecycleChange(pl) {
pl.Status.SetPhase(pl.Spec.Lifecycle.GetDesiredPhase(), "")
}
if err := r.checkChildrenResourceStatus(ctx, pl); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to check pipeline children resource status, %w", err)
}
return ctrl.Result{}, nil
return nil
}

func (r *pipelineReconciler) createOrUpdateDaemonService(ctx context.Context, pl *dfv1.Pipeline) error {
Expand Down
Loading

0 comments on commit 2189ea9

Please sign in to comment.