Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gitops-engine fixes, plus enable Numaflow Controller deleted and then recreated #471

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fd88e3b
fix: set the NumaflowControllerRollout ChildResourceHealthy condition…
afugazzotto Dec 21, 2024
7f9e4d2
compare specs instead of image version of Deployment
afugazzotto Dec 21, 2024
d2dda94
print statements
juliev0 Dec 27, 2024
ca6dcb7
remove ownership from manifests, etc
juliev0 Dec 27, 2024
4558c5f
remove server side diff which was causing errors with StateDiff function
juliev0 Dec 27, 2024
262134d
logging
juliev0 Dec 27, 2024
993bcca
logging
juliev0 Dec 27, 2024
a67c20f
Merge remote-tracking branch 'origin/main' into ncdeletion
juliev0 Dec 27, 2024
ec64de9
use map function to reconcile child resources instead of relying on o…
juliev0 Dec 28, 2024
3980b05
fix numaflowcontrollerrollout progressing check
juliev0 Dec 28, 2024
cc5a5af
allow pipelinerollout status processed without depending on makePromo…
juliev0 Dec 28, 2024
0e43d41
fix test
juliev0 Dec 28, 2024
3e1bfa4
fix: empty commit
juliev0 Dec 28, 2024
29f2d84
cleanup
juliev0 Dec 29, 2024
c3930b8
lint fix
juliev0 Dec 29, 2024
0cc261e
comments
juliev0 Dec 29, 2024
9eb7fba
cleanup
juliev0 Dec 29, 2024
15ddcac
cleanup/comments
juliev0 Dec 29, 2024
c7c02f1
gitops-engine settings change
juliev0 Dec 29, 2024
9435ccb
removing explicit deletion of manifests, as gitops-engine directive t…
juliev0 Dec 29, 2024
9039e59
temporary commit to try out 1.3.3 manifest without instanceID
juliev0 Dec 29, 2024
d41cb3d
experimenting with removing deletions
juliev0 Dec 29, 2024
7123439
comment
juliev0 Dec 29, 2024
0ae2ead
comment
juliev0 Dec 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func main() {
numaLogger.Fatal(err, "Unable to create NumaflowController controller")
}

if err = numaflowControllerReconciler.SetupWithManager(mgr); err != nil {
if err = numaflowControllerReconciler.SetupWithManager(ctx, mgr); err != nil {
numaLogger.Fatal(err, "Unable to set up NumaflowController controller")
}

Expand Down
423 changes: 185 additions & 238 deletions internal/controller/numaflowcontroller/numaflowcontroller_controller.go

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (r *NumaflowControllerRolloutReconciler) processExistingNumaflowController(
}

numaLogger.
WithValues("numaflowControllerNeedsToUpdate", numaflowControllerNeedsToUpdate, "upgradeStrategyType", upgradeStrategyType).
WithValues("numaflowControllerNeedsToUpdate", numaflowControllerNeedsToUpdate, "upgradeStrategyType", upgradeStrategyType, "numaflowControllerIsUpdating", numaflowControllerIsUpdating).
Debug("Upgrade decision result")

// set the Status appropriately to "Pending" or "Deployed"
Expand Down Expand Up @@ -503,16 +503,18 @@ func (r *NumaflowControllerRolloutReconciler) processNumaflowControllerStatus(

healthyChildCond := existingNumaflowControllerStatus.GetCondition(apiv1.ConditionChildResourceHealthy)

if existingNumaflowControllerStatus.IsHealthy() &&
healthyChildCond != nil && existingNumaflowControllerDef.GetGeneration() <= healthyChildCond.ObservedGeneration &&
healthyChildCond.Status == metav1.ConditionTrue {

if existingNumaflowControllerDef.GetGeneration() > existingNumaflowControllerStatus.ObservedGeneration {
nfcRollout.Status.MarkChildResourcesUnhealthy("Progressing",
fmt.Sprintf("observedGeneration %d < generation %d", existingNumaflowControllerStatus.ObservedGeneration, existingNumaflowControllerDef.GetGeneration()),
nfcRollout.Generation)
} else if existingNumaflowControllerStatus.IsHealthy() &&
healthyChildCond != nil && healthyChildCond.Status == metav1.ConditionTrue {
nfcRollout.Status.MarkChildResourcesHealthy(nfcRollout.Generation)
} else {
if healthyChildCond != nil {
nfcRollout.Status.MarkChildResourcesUnhealthy(healthyChildCond.Reason, healthyChildCond.Message, nfcRollout.Generation)
} else {
nfcRollout.Status.MarkChildResourcesUnhealthy(apiv1.ProgressingReasonString, "Progressing", nfcRollout.Generation)
nfcRollout.Status.MarkChildResourcesUnhealthy("Unhealthy", "Unhealthy", nfcRollout.Generation)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,16 @@ func (r *PipelineRolloutReconciler) processPipelineStatus(ctx context.Context, p

// Only fetch the latest pipeline object while deleting the pipeline object, i.e. when pipelineRollout.DeletionTimestamp.IsZero() is false
if existingPipelineDef == nil {
pipelineDef, err := r.makePromotedPipelineDefinition(ctx, pipelineRollout)
// determine name of the promoted Pipeline
pipelineName, err := progressive.GetChildName(ctx, pipelineRollout, r, common.LabelValueUpgradePromoted, r.client, true)
if err != nil {
return err
return fmt.Errorf("Unable to process pipeline status: err=%s", err)
}
pipelineDef := &unstructured.Unstructured{}
pipelineDef.SetGroupVersionKind(numaflowv1.PipelineGroupVersionKind)
pipelineDef.SetNamespace(pipelineRollout.Namespace)
pipelineDef.SetName(pipelineName)

livePipelineDef, err := kubernetes.GetLiveResource(ctx, pipelineDef, "pipelines")
if err != nil {
if apierrors.IsNotFound(err) {
Expand Down
44 changes: 34 additions & 10 deletions internal/sync/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var (
)

type ResourceInfo struct {
Name string
OwnerName string

Health *health.HealthStatus
manifestHash string
Expand All @@ -77,9 +77,11 @@ type ResourceInfo struct {
type LiveStateCache interface {
// GetClusterCache returns synced cluster cache
GetClusterCache() (clustercache.ClusterCache, error)
// GetManagedLiveObjs returns state of live objects which correspond to target
// objects with the specified ResourceInfo name and matching namespace.
GetManagedLiveObjs(name, namespace string, targetObjs []*unstructured.Unstructured) (map[kube.ResourceKey]*unstructured.Unstructured, error)
// GetManagedLiveObjects returns the live objects associated with the owner name
GetManagedLiveObjects(ownerName, namespace string) (map[kube.ResourceKey]*clustercache.Resource, error)
// GetManagedLiveObjsFromResourceList returns state of live objects which correspond to target
// objects with the specified ResourceInfo name and matching namespace, associated with the owner name
GetManagedLiveObjsFromResourceList(ownerName, namespace string, targetObjs []*unstructured.Unstructured) (map[kube.ResourceKey]*unstructured.Unstructured, error)
// Init must be executed before cache can be used
Init(numaLogger *logger.NumaLogger) error
// PopulateResourceInfo is called by the cache to update ResourceInfo struct for a managed resource
Expand Down Expand Up @@ -219,8 +221,8 @@ func (c *liveStateCache) PopulateResourceInfo(un *unstructured.Unstructured, isR
res.Health, _ = health.GetResourceHealth(un, settings.clusterSettings.ResourceHealthOverride)

numaplaneInstanceName := getNumaplaneInstanceName(un)
if isRoot && numaplaneInstanceName != "" {
res.Name = numaplaneInstanceName
if /*isRoot &&*/ numaplaneInstanceName != "" { // TODO: can probably put this back now that we don't have OwnerRef - need to test
res.OwnerName = numaplaneInstanceName
}

gvk := un.GroupVersionKind()
Expand All @@ -236,7 +238,7 @@ func (c *liveStateCache) PopulateResourceInfo(un *unstructured.Unstructured, isR

// edge case. we do not label CRDs, so they miss the tracking label we inject. But we still
// want the full resource to be available in our cache (to diff), so we store all CRDs
return res, res.Name != "" || gvk.Kind == kube.CustomResourceDefinitionKind
return res, res.OwnerName != "" || gvk.Kind == kube.CustomResourceDefinitionKind
}

// getNumaplaneInstanceName gets the Numaplane Object that owns the resource from a label in the resource
Expand Down Expand Up @@ -432,17 +434,39 @@ func (c *liveStateCache) GetClusterCache() (clustercache.ClusterCache, error) {
return clusterCache, nil
}

func (c *liveStateCache) GetManagedLiveObjs(
name, namespace string,
func (c *liveStateCache) GetManagedLiveObjects(
ownerName, namespace string,
) (map[kube.ResourceKey]*clustercache.Resource, error) {
clusterInfo, err := c.getSyncedCluster()
if err != nil {
return nil, fmt.Errorf("failed to get cluster info: %w", err)
}

liveObjs := clusterInfo.FindResources(namespace, func(r *clustercache.Resource) bool {
matches := resInfo(r).OwnerName == ownerName && r.Ref.Namespace == namespace
fmt.Printf("deletethis: viewing live objects, I see: r.Ref.Kind=%q, r.Ref.Name=%q, r.Ref.Namespace=%q, resInfo(r)=%+v, checking against namespace=%q, ownerName=%q, matches=%t\n",
r.Ref.Kind, r.Ref.Name, r.Ref.Namespace, resInfo(r), namespace, ownerName, matches)
// distinguish it by numaplane resource's name and namespace
return matches
})
return liveObjs, err
}

func (c *liveStateCache) GetManagedLiveObjsFromResourceList(
ownerName, namespace string,
targetObjs []*unstructured.Unstructured,
) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
clusterInfo, err := c.getSyncedCluster()
if err != nil {
return nil, fmt.Errorf("failed to get cluster info: %w", err)
}

liveObjs, err := clusterInfo.GetManagedLiveObjs(targetObjs, func(r *clustercache.Resource) bool {
matches := resInfo(r).OwnerName == ownerName && r.Ref.Namespace == namespace
fmt.Printf("deletethis: viewing live objects, I see: r.Ref.Kind=%q, r.Ref.Name=%q, r.Ref.Namespace=%q, resInfo(r)=%+v, checking against namespace=%q, ownerName=%q, matches=%t\n",
r.Ref.Kind, r.Ref.Name, r.Ref.Namespace, resInfo(r), namespace, ownerName, matches)
// distinguish it by numaplane resource's name and namespace
return resInfo(r).Name == name && r.Ref.Namespace == namespace
return matches
})
c.customMetrics.KubeResourceMonitored.WithLabelValues().Set(float64(len(liveObjs)))
return liveObjs, err
Expand Down
2 changes: 1 addition & 1 deletion internal/sync/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ metadata:
labels:
numaplane.numaproj.io/tracking-id: my-example`)

managedObjs, err := clusterCache.GetManagedLiveObjs(testName, testNamespace, []*unstructured.Unstructured{targetDeploy})
managedObjs, err := clusterCache.GetManagedLiveObjsFromResourceList(testName, testNamespace, []*unstructured.Unstructured{targetDeploy})
require.NoError(t, err)
assert.Equal(t, map[kube.ResourceKey]*unstructured.Unstructured{
kube.NewResourceKey("apps", "Deployment", "default", "my-app"): mustToUnstructured(testDeploy()),
Expand Down
Loading
Loading