diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index b61330afd57..9c848fd35a5 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -139,12 +139,9 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer } rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: rsc.enqueueReplicaSet, + AddFunc: rsc.addRS, UpdateFunc: rsc.updateRS, - // This will enter the sync loop and no-op, because the replica set has been deleted from the store. - // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended - // way of achieving this is by performing a `stop` operation on the replica set. - DeleteFunc: rsc.enqueueReplicaSet, + DeleteFunc: rsc.deleteRS, }) rsc.rsLister = rsInformer.Lister() rsc.rsListerSynced = rsInformer.Informer().HasSynced @@ -228,11 +225,50 @@ func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controll return rs } +func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) { + key, err := controller.KeyFunc(rs) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) + return + } + + rsc.queue.Add(key) +} + +func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) { + key, err := controller.KeyFunc(rs) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) + return + } + + rsc.queue.AddAfter(key, duration) +} + +func (rsc *ReplicaSetController) addRS(obj interface{}) { + rs := obj.(*apps.ReplicaSet) + klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name) + rsc.enqueueRS(rs) +} + // callback when RS is updated func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { oldRS := old.(*apps.ReplicaSet) curRS := cur.(*apps.ReplicaSet) + // TODO: make a KEP and fix informers to always call the delete event handler on re-create + if curRS.UID != oldRS.UID { + key, err := controller.KeyFunc(oldRS) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err)) + return + } + rsc.deleteRS(cache.DeletedFinalStateUnknown{ + Key: key, + Obj: oldRS, + }) + } + // You might imagine that we only really need to enqueue the // replica set when Spec changes, but it is safer to sync any // time this function is triggered. That way a full informer @@ -248,7 +284,36 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) { klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas)) } - rsc.enqueueReplicaSet(cur) + rsc.enqueueRS(curRS) +} + +func (rsc *ReplicaSetController) deleteRS(obj interface{}) { + rs, ok := obj.(*apps.ReplicaSet) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + rs, ok = tombstone.Obj.(*apps.ReplicaSet) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj)) + return + } + } + + key, err := controller.KeyFunc(rs) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) + return + } + + klog.V(4).Infof("Deleting %s %q", rsc.Kind, key) + + // Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean + rsc.expectations.DeleteExpectations(key) + + rsc.queue.Add(key) } // When a pod is created, enqueue the replica set that manages it and update its expectations. @@ -274,7 +339,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { } klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) rsc.expectations.CreationObserved(rsKey) - rsc.enqueueReplicaSet(rs) + rsc.queue.Add(rsKey) return } @@ -288,7 +353,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { } klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod) for _, rs := range rss { - rsc.enqueueReplicaSet(rs) + rsc.enqueueRS(rs) } } @@ -325,7 +390,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil { - rsc.enqueueReplicaSet(rs) + rsc.enqueueRS(rs) } } @@ -336,7 +401,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { return } klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) - rsc.enqueueReplicaSet(rs) + rsc.enqueueRS(rs) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // the Pod status which in turn will trigger a requeue of the owning replica set thus // having its status updated with the newly available replica. For now, we can fake the @@ -348,7 +413,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds) // Add a second to avoid milliseconds skew in AddAfter. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. - rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second) + rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second) } return } @@ -362,7 +427,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { } klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) for _, rs := range rss { - rsc.enqueueReplicaSet(rs) + rsc.enqueueRS(rs) } } } @@ -400,31 +465,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { } rsKey, err := controller.KeyFunc(rs) if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) return } klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) - rsc.enqueueReplicaSet(rs) -} - -// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item. -func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { - key, err := controller.KeyFunc(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) - return - } - rsc.queue.Add(key) -} - -// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item. -func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) { - key, err := controller.KeyFunc(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) - return - } - rsc.queue.AddAfter(key, after) + rsc.queue.Add(rsKey) } // worker runs a worker thread that just dequeues items, processes them, and marks them done. @@ -447,7 +493,7 @@ func (rsc *ReplicaSetController) processNextWorkItem() bool { return true } - utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err)) + utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err)) rsc.queue.AddRateLimited(key) return true @@ -460,7 +506,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) return nil } if diff < 0 { @@ -560,7 +606,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. func (rsc *ReplicaSetController) syncReplicaSet(key string) error { - startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) @@ -583,7 +628,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err)) + utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err)) return nil } @@ -622,7 +667,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) { - rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) + rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) } return manageReplicasErr }