Skip to content

Commit

Permalink
Add new Cache for Pod Events, allow forced upgrade in failure even if…
Browse files Browse the repository at this point in the history
… ForceRackUpgrade is not set and fix some missing () for ifs
  • Loading branch information
burmanm committed Dec 4, 2024
1 parent 0477c0e commit 139fbf8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 105 deletions.
14 changes: 13 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"go.uber.org/zap/zapcore"
_ "k8s.io/client-go/plugin/pkg/client/auth"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand Down Expand Up @@ -119,6 +121,8 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()

if err = (&controllers.CassandraDatacenterReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("CassandraDatacenter"),
Expand Down Expand Up @@ -152,8 +156,16 @@ func main() {
os.Exit(1)
}

mgr.GetCache().IndexField(ctx, &corev1.Event{}, "involvedObject.name", func(obj client.Object) []string {
event := obj.(*corev1.Event)
if event.InvolvedObject.Kind == "Pod" {
return []string{event.InvolvedObject.Name}
}
return []string{}
})

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down
121 changes: 17 additions & 104 deletions pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,14 @@ func (rc *ReconciliationContext) CheckRackCreation() result.ReconcileResult {
}

func (rc *ReconciliationContext) failureModeDetection() bool {
// TODO Even if these are true, we shouldn't allow update if we have a pod starting (that hasn't crashed yet)

// First check - do we even need a force?
// We can check if StatefulSet was updated, but that wouldn't tell us if there's crashing pods
for _, pod := range rc.dcPods {
if pod == nil {
continue
}
if pod.Status.Phase == corev1.PodPending {
if hasBeenXMinutes(5, pod.Status.StartTime.Time) {
// Pod has been over 5 minutes in Pending state. This can be normal, but lets see
// if we have some detected failures events like FailedScheduling

events := &corev1.EventList{}
if err := rc.Client.List(rc.Ctx, events, &client.ListOptions{Namespace: pod.Namespace, FieldSelector: fields.SelectorFromSet(fields.Set{"involvedObject.name": pod.Name})}); err != nil {
rc.ReqLogger.Error(err, "error getting events for pod", "pod", pod.Name)
Expand All @@ -186,6 +184,7 @@ func (rc *ReconciliationContext) failureModeDetection() bool {

for _, event := range events.Items {
if event.Reason == "FailedScheduling" {
rc.ReqLogger.Info("Found FailedScheduling event for pod", "pod", pod.Name)
// We have a failed scheduling event
return true
}
Expand All @@ -201,13 +200,15 @@ func (rc *ReconciliationContext) failureModeDetection() bool {
if waitingReason == "CrashLoopBackOff" ||
waitingReason == "ImagePullBackOff" ||
waitingReason == "ErrImagePull" {
rc.ReqLogger.Info("Failing container state for pod", "pod", pod.Name, "reason", waitingReason)
// We have a container in a failing state
return true
}
}
if containerStatus.RestartCount > 2 {
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.ExitCode != 0 {
rc.ReqLogger.Info("Failing container state for pod", "pod", pod.Name, "exitCode", containerStatus.State.Terminated.ExitCode)
return true
}
}
Expand All @@ -221,12 +222,14 @@ func (rc *ReconciliationContext) failureModeDetection() bool {
waitingReason == "ImagePullBackOff" ||
waitingReason == "ErrImagePull" {
// We have a container in a failing state
rc.ReqLogger.Info("Failing initcontainer state for pod", "pod", pod.Name, "reason", waitingReason)
return true
}
}
if containerStatus.RestartCount > 2 {
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.ExitCode != 0 {
rc.ReqLogger.Info("Failing initcontainer state for pod", "pod", pod.Name, "exitCode", containerStatus.State.Terminated.ExitCode)
return true
}
}
Expand Down Expand Up @@ -377,12 +380,13 @@ func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.Reconci
logger.Info("reconcile_racks::CheckRackPodTemplate")

for idx := range rc.desiredRackInformation {

rackName := rc.desiredRackInformation[idx].RackName
if force {
forceRacks := dc.Spec.ForceUpgradeRacks
if utils.IndexOfString(forceRacks, rackName) <= 0 {
continue
if len(forceRacks) > 0 {
if utils.IndexOfString(forceRacks, rackName) <= 0 {
continue
}
}
}

Expand All @@ -402,9 +406,9 @@ func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.Reconci
updatedReplicas = status.CurrentReplicas + status.UpdatedReplicas
}

if !force && statefulSet.Generation != status.ObservedGeneration ||
if !force && (statefulSet.Generation != status.ObservedGeneration ||
status.Replicas != status.ReadyReplicas ||
status.Replicas != updatedReplicas {
status.Replicas != updatedReplicas) {

logger.Info(
"waiting for upgrade to finish on statefulset",
Expand Down Expand Up @@ -523,115 +527,24 @@ func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.Reconci
return result.Continue()
}

/*
TODO An idea.. if the startNode phase is failing due to a Pod being unable to start (or get ready?), we could
make that as a state for CheckRackForceUpgrade to be allowed.
TODO Also, verify this code is close to the CheckRackPodTemplate() code or even merge those two if at all possible at this stage,
given that so much time has passed since the original comment.
*/

func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult {
dc := rc.Datacenter
logger := rc.ReqLogger
logger.Info("starting CheckRackForceUpgrade()")

forceRacks := dc.Spec.ForceUpgradeRacks
if len(forceRacks) == 0 {
return result.Continue()
}
logger.Info("reconcile_racks::CheckRackForceUpgrade")

// Datacenter configuration isn't healthy, we allow upgrades here before pods start
if rc.failureModeDetection() {
logger.Info("Failure detected, forcing CheckRackPodTemplate()")
return rc.CheckRackPodTemplate(true)
}

return rc.CheckRackPodTemplate(true)
}

/*
func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult {
// This code is *very* similar to CheckRackPodTemplate(), but it's not an exact
// copy. Some 3 to 5 line parts could maybe be extracted into functions.
logger := rc.ReqLogger
dc := rc.Datacenter
logger.Info("starting CheckRackForceUpgrade()")
forceRacks := dc.Spec.ForceUpgradeRacks
if len(forceRacks) == 0 {
return result.Continue()
}

for idx, nextRack := range rc.desiredRackInformation {
rackName := rc.desiredRackInformation[idx].RackName
if utils.IndexOfString(forceRacks, rackName) >= 0 {
statefulSet := rc.statefulSets[idx]
// have to use zero here, because each statefulset is created with no replicas
// in GetStatefulSetForRack()
desiredSts, err := newStatefulSetForCassandraDatacenter(statefulSet, rackName, dc, nextRack.NodeCount)
if err != nil {
logger.Error(err, "error calling newStatefulSetForCassandraDatacenter")
return result.Error(err)
}
// Set the CassandraDatacenter as the owner and controller
err = setControllerReference(
rc.Datacenter,
desiredSts,
rc.Scheme)
if err != nil {
logger.Error(err, "error calling setControllerReference for statefulset", "desiredSts.Namespace",
desiredSts.Namespace, "desireSts.Name", desiredSts.Name)
return result.Error(err)
}
// "fix" the replica count, and maintain labels and annotations the k8s admin may have set
desiredSts.Spec.Replicas = statefulSet.Spec.Replicas
desiredSts.Labels = utils.MergeMap(map[string]string{}, statefulSet.Labels, desiredSts.Labels)
desiredSts.Annotations = utils.MergeMap(map[string]string{}, statefulSet.Annotations, desiredSts.Annotations)
desiredSts.DeepCopyInto(statefulSet)
rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack,
"Force updating rack %s", rackName)
if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil {
return result.Error(err)
}
if err := setOperatorProgressStatus(rc, api.ProgressUpdating); err != nil {
return result.Error(err)
}
logger.Info("Force updating statefulset pod specs",
"statefulSet", statefulSet,
)
if err := rc.Client.Update(rc.Ctx, statefulSet); err != nil {
if errors.IsInvalid(err) {
if err = rc.deleteStatefulSet(statefulSet); err != nil {
return result.Error(err)
}
} else {
return result.Error(err)
}
}
}
}
dcPatch := client.MergeFrom(dc.DeepCopy())
dc.Spec.ForceUpgradeRacks = nil
if err := rc.Client.Patch(rc.Ctx, dc, dcPatch); err != nil {
logger.Error(err, "error patching datacenter to clear force upgrade")
return result.Error(err)
}
logger.Info("done CheckRackForceUpgrade()")
return result.Done()
return rc.CheckRackPodTemplate(true)
}
*/

func (rc *ReconciliationContext) deleteStatefulSet(statefulSet *appsv1.StatefulSet) error {
policy := metav1.DeletePropagationOrphan
Expand Down

0 comments on commit 139fbf8

Please sign in to comment.