Skip to content

Commit

Permalink
fix: pause lifecyle changes and add drained status (#2028)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Sep 6, 2024
1 parent 9f13068 commit cf90e25
Show file tree
Hide file tree
Showing 14 changed files with 701 additions and 497 deletions.
4 changes: 4 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19681,6 +19681,10 @@
"x-kubernetes-patch-merge-key": "type",
"x-kubernetes-patch-strategy": "merge"
},
"drainedOnPause": {
"description": "Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false",
"type": "boolean"
},
"lastUpdated": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
},
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19668,6 +19668,10 @@
"x-kubernetes-patch-merge-key": "type",
"x-kubernetes-patch-strategy": "merge"
},
"drainedOnPause": {
"description": "Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false",
"type": "boolean"
},
"lastUpdated": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
},
Expand Down
3 changes: 3 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9821,6 +9821,9 @@ spec:
- type
type: object
type: array
drainedOnPause:
default: false
type: boolean
lastUpdated:
format: date-time
type: string
Expand Down
3 changes: 3 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18074,6 +18074,9 @@ spec:
- type
type: object
type: array
drainedOnPause:
default: false
type: boolean
lastUpdated:
format: date-time
type: string
Expand Down
3 changes: 3 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18074,6 +18074,9 @@ spec:
- type
type: object
type: array
drainedOnPause:
default: false
type: boolean
lastUpdated:
format: date-time
type: string
Expand Down
20 changes: 20 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -7941,6 +7941,26 @@ The generation observed by the Pipeline controller.

</tr>

<tr>

<td>

<code>drainedOnPause</code></br> <em> bool </em>
</td>

<td>

<p>

Field to indicate if a pipeline drain successfully occurred, or it timed
out. Set to true when the Pipeline is in Paused state, and after it has
successfully been drained. defaults to false
</p>

</td>

</tr>

</tbody>

</table>
Expand Down
994 changes: 513 additions & 481 deletions pkg/apis/numaflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,11 @@ type PipelineStatus struct {
// The generation observed by the Pipeline controller.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty" protobuf:"varint,11,opt,name=observedGeneration"`
// Field to indicate if a pipeline drain successfully occurred, or it timed out.
// Set to true when the Pipeline is in Paused state, and after it has successfully been drained.
// defaults to false
// +kubebuilder:default=false
DrainedOnPause bool `json:"drainedOnPause,omitempty" protobuf:"bytes,12,opt,name=drainedOnPause"`
}

// SetVertexCounts sets the counts of vertices.
Expand Down Expand Up @@ -764,6 +769,16 @@ func (pls *PipelineStatus) SetObservedGeneration(value int64) {
pls.ObservedGeneration = value
}

// MarkDrainedOnPauseTrue sets the DrainedOnPause field to true
func (pls *PipelineStatus) MarkDrainedOnPauseTrue() {
pls.DrainedOnPause = true
}

// MarkDrainedOnPauseFalse sets the DrainedOnPause field to false
func (pls *PipelineStatus) MarkDrainedOnPauseFalse() {
pls.DrainedOnPause = false
}

// IsHealthy indicates whether the pipeline is in healthy status
func (pls *PipelineStatus) IsHealthy() bool {
switch pls.Phase {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/numaflow/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,15 @@ func Test_PipelineMarkPhases(t *testing.T) {
assert.Equal(t, PipelinePhaseRunning, s.Phase)
}

func Test_PipelineMarkDrained(t *testing.T) {
s := PipelineStatus{}
assert.Equal(t, false, s.DrainedOnPause)
s.MarkDrainedOnPauseTrue()
assert.Equal(t, true, s.DrainedOnPause)
s.MarkDrainedOnPauseFalse()
assert.Equal(t, false, s.DrainedOnPause)
}

func Test_GetDownstreamEdges(t *testing.T) {
pl := Pipeline{
ObjectMeta: metav1.ObjectMeta{
Expand Down
56 changes: 41 additions & 15 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,17 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}()

pl.Status.SetObservedGeneration(pl.Generation)

if oldPhase := pl.Status.Phase; pl.Spec.Lifecycle.GetDesiredPhase() == dfv1.PipelinePhasePaused ||
oldPhase == dfv1.PipelinePhasePaused || oldPhase == dfv1.PipelinePhasePausing {
// Regular pipeline change
// This should be happening in call cases to ensure a clean initialization regardless of the lifecycle phase
// Eg: even for a pipeline started with desiredPhase = Pause, we should still create the resources for the pipeline
result, err := r.reconcileNonLifecycleChanges(ctx, pl)
if err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", "Failed to reconcile pipeline: %v", err.Error())
return result, err
}
// check if any changes related to pause/resume lifecycle for the pipeline
if isLifecycleChange(pl) {
oldPhase := pl.Status.Phase
requeue, err := r.updateDesiredState(ctx, pl)
if err != nil {
logMsg := fmt.Sprintf("Updated desired pipeline phase failed: %v", zap.Error(err))
Expand All @@ -162,16 +170,24 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
return ctrl.Result{RequeueAfter: dfv1.DefaultRequeueAfter}, nil
}
return ctrl.Result{}, nil

}
return result, nil
}

// Regular pipeline change
result, err := r.reconcileNonLifecycleChanges(ctx, pl)
if err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", "Failed to reconcile pipeline: %v", err.Error())
// isLifecycleChange determines whether there has been a change requested in the lifecycle
// of a Pipeline object, specifically relating to the paused and pausing states.
func isLifecycleChange(pl *dfv1.Pipeline) bool {
// Extract the current phase from the status of the pipeline.
// Check if the desired phase of the pipeline is 'Paused', or if the current phase of the
// pipeline is either 'Paused' or 'Pausing'. This indicates a transition into or out of
// a paused state which is a lifecycle phase change
if oldPhase := pl.Status.Phase; pl.Spec.Lifecycle.GetDesiredPhase() == dfv1.PipelinePhasePaused ||
oldPhase == dfv1.PipelinePhasePaused || oldPhase == dfv1.PipelinePhasePausing {
return true
}

return result, err
// If none of the conditions are met, return false
return false
}

// reconcileNonLifecycleChanges do the jobs not related to pipeline lifecycle changes.
Expand Down Expand Up @@ -345,7 +361,12 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
}

pl.Status.MarkDeployed()
pl.Status.SetPhase(pl.Spec.Lifecycle.GetDesiredPhase(), "")
// If the pipeline has a lifecycle change, then do not update the phase as
// this should happen only after the required configs for the lifecycle changes
// have been applied.
if !isLifecycleChange(pl) {
pl.Status.SetPhase(pl.Spec.Lifecycle.GetDesiredPhase(), "")
}
if err := r.checkChildrenResourceStatus(ctx, pl); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to check pipeline children resource status, %w", err)
}
Expand Down Expand Up @@ -599,7 +620,8 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
copyVertexTemplate(pl, vCopy)
copyVertexLimits(pl, vCopy)
replicas := int32(1)
if pl.Status.Phase == dfv1.PipelinePhasePaused {
// If the desired phase is pause or we are in the middle of pausing we should not start any vertex replicas
if isLifecycleChange(pl) {
replicas = int32(0)
} else if v.IsReduceUDF() {
partitions := pl.NumOfPartitions(v.Name)
Expand Down Expand Up @@ -794,7 +816,6 @@ func (r *pipelineReconciler) updateDesiredState(ctx context.Context, pl *dfv1.Pi
}

func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {

// reset pause timestamp
if pl.GetAnnotations()[dfv1.KeyPauseTimestamp] != "" {
err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(dfv1.RemovePauseTimestampPatch)))
Expand All @@ -806,17 +827,18 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli
}
}
}

_, err := r.scaleUpAllVertices(ctx, pl)
if err != nil {
return false, err
}
// mark the drained field as false to refresh the drained status as this will
// be a new lifecycle from running
pl.Status.MarkDrainedOnPauseFalse()
pl.Status.MarkPhaseRunning()
return false, nil
}

func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {

// check that annotations / pause timestamp annotation exist
if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" {
pl.SetAnnotations(map[string]string{dfv1.KeyPauseTimestamp: time.Now().Format(time.RFC3339)})
Expand Down Expand Up @@ -855,12 +877,16 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
return false, err
}

// if drain is completed or we have exceed pause deadline, mark pl as paused and scale down
// if drain is completed, or we have exceeded the pause deadline, mark pl as paused and scale down
if time.Now().After(pauseTimestamp.Add(time.Duration(pl.Spec.Lifecycle.GetPauseGracePeriodSeconds())*time.Second)) || drainCompleted {
_, err := r.scaleDownAllVertices(ctx, pl)
if err != nil {
return true, err
}
// if the drain completed succesfully, then set the DrainedOnPause field to true
if drainCompleted {
pl.Status.MarkDrainedOnPauseTrue()
}
pl.Status.MarkPhasePaused()
return false, nil
}
Expand Down
70 changes: 69 additions & 1 deletion pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ func Test_reconcileEvents(t *testing.T) {
_, err = r.reconcile(ctx, testObj)
assert.Error(t, err)
events := getEvents(t, r)
assert.Contains(t, events, "Normal UpdatePipelinePhase Updated pipeline phase from Paused to Running")
assert.Contains(t, events, "Warning ReconcilePipelineFailed Failed to reconcile pipeline: the length of the pipeline name plus the vertex name is over the max limit. (very-very-very-loooooooooooooooooooooooooooooooooooong-input), [must be no more than 63 characters]")
})

Expand Down Expand Up @@ -945,3 +944,72 @@ func Test_checkChildrenResourceStatus(t *testing.T) {
}
})
}

func TestIsLifecycleChange(t *testing.T) {
tests := []struct {
name string
currentPhase dfv1.PipelinePhase
desiredPhase dfv1.PipelinePhase
expectedResult bool
}{
{
name: "Change to paused from another state",
currentPhase: dfv1.PipelinePhaseRunning,
desiredPhase: dfv1.PipelinePhasePaused,
expectedResult: true,
},
{
name: "when already in paused",
currentPhase: dfv1.PipelinePhasePaused,
desiredPhase: dfv1.PipelinePhasePaused,
expectedResult: true,
},
{
name: "Change out of paused",
currentPhase: dfv1.PipelinePhasePaused,
desiredPhase: dfv1.PipelinePhaseRunning,
expectedResult: true,
},
{
name: "Change from another state to pausing",
currentPhase: dfv1.PipelinePhaseRunning,
desiredPhase: dfv1.PipelinePhasePausing,
expectedResult: false,
},
{
name: "Change from pausing to running",
currentPhase: dfv1.PipelinePhasePausing,
desiredPhase: dfv1.PipelinePhaseRunning,
expectedResult: true,
},
{
name: "No lifecycle change",
currentPhase: dfv1.PipelinePhaseRunning,
desiredPhase: dfv1.PipelinePhaseRunning,
expectedResult: false,
},
{
name: "No lifecycle change - updated phase",
currentPhase: dfv1.PipelinePhaseRunning,
desiredPhase: dfv1.PipelinePhaseDeleting,
expectedResult: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pl := &dfv1.Pipeline{
Spec: dfv1.PipelineSpec{
Lifecycle: dfv1.Lifecycle{
DesiredPhase: test.desiredPhase,
},
},
Status: dfv1.PipelineStatus{
Phase: test.currentPhase,
},
}
result := isLifecycleChange(pl)
assert.Equal(t, test.expectedResult, result)
})
}
}
4 changes: 4 additions & 0 deletions rust/numaflow-models/src/models/pipeline_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub struct PipelineStatus {
/// Conditions are the latest available observations of a resource's current state.
#[serde(rename = "conditions", skip_serializing_if = "Option::is_none")]
pub conditions: Option<Vec<k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition>>,
/// Field to indicate if a pipeline drain successfully occurred, or it timed out. Set to true when the Pipeline is in Paused state, and after it has successfully been drained. defaults to false
#[serde(rename = "drainedOnPause", skip_serializing_if = "Option::is_none")]
pub drained_on_pause: Option<bool>,
#[serde(rename = "lastUpdated", skip_serializing_if = "Option::is_none")]
pub last_updated: Option<k8s_openapi::apimachinery::pkg::apis::meta::v1::Time>,
#[serde(rename = "mapUDFCount", skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -48,6 +51,7 @@ impl PipelineStatus {
pub fn new() -> PipelineStatus {
PipelineStatus {
conditions: None,
drained_on_pause: None,
last_updated: None,
map_udf_count: None,
message: None,
Expand Down

0 comments on commit cf90e25

Please sign in to comment.