Track deletes in rc manager with a UID expectations cache.

This commit is contained in:
Prashanth Balasubramanian
2016-03-04 16:51:01 -08:00
parent 2e03822386
commit 7f5c9bd676
7 changed files with 334 additions and 90 deletions

View File

@@ -72,8 +72,8 @@ type ReplicationManager struct {
// To allow injection of syncReplicationController for testing.
syncHandler func(rcKey string) error
// A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface
// A TTLCache of pod creates/deletes each rc expects to see.
expectations *controller.UIDTrackingControllerExpectations
// A store of replication controllers, populated by the rcController
rcStore cache.StoreToReplicationControllerLister
@@ -106,7 +106,7 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewControllerExpectations(),
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.New(),
}
@@ -300,11 +300,10 @@ func (rm *ReplicationManager) addPod(obj interface{}) {
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
glog.V(4).Infof("Add for pod %v with deletion timestamp %+v, counted as new deletion for rc %v", pod.Name, pod.DeletionTimestamp, rcKey)
rm.expectations.DeletionObserved(rcKey)
} else {
rm.expectations.CreationObserved(rcKey)
rm.deletePod(pod)
return
}
rm.expectations.CreationObserved(rcKey)
rm.enqueueController(rc)
}
@@ -321,25 +320,17 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
if rc == nil {
return
}
rcKey, err := controller.KeyFunc(rc)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
return
}
oldPod := old.(*api.Pod)
if curPod.DeletionTimestamp != nil && oldPod.DeletionTimestamp == nil {
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rc never initiates a phase change, and so is never asleep waiting for the same.
glog.V(4).Infof("Update to pod %v with deletion timestamp %+v counted as delete for rc %v", curPod.Name, curPod.DeletionTimestamp, rcKey)
rm.expectations.DeletionObserved(rcKey)
} else {
glog.V(4).Infof("Update to pod %v with deletion timestamp %+v. Not counting it as a new deletion for rc %v.", curPod.Name, curPod.DeletionTimestamp, rcKey)
rm.deletePod(curPod)
return
}
rm.enqueueController(rc)
// Only need to get the old controller if the labels changed.
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
@@ -372,20 +363,14 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
return
}
}
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v, labels %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod.Labels)
if rc := rm.getPodController(pod); rc != nil {
rcKey, err := controller.KeyFunc(rc)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
return
}
// This method only manages expectations for the case where a pod is
// deleted without a grace period.
if pod.DeletionTimestamp == nil {
glog.V(4).Infof("Received new delete for rc %v, pod %v", rcKey, pod.Name)
rm.expectations.DeletionObserved(rcKey)
} else {
glog.V(4).Infof("Received delete for rc %v pod %v with non nil deletion timestamp %+v. Not counting it as a new deletion.", rcKey, pod.Name, pod.DeletionTimestamp)
}
rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod))
rm.enqueueController(rc)
}
}
@@ -438,6 +423,11 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
rm.expectations.ExpectCreations(rcKey, diff)
wait := sync.WaitGroup{}
wait.Add(diff)
@@ -458,7 +448,6 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
rm.expectations.ExpectDeletions(rcKey, diff)
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff)
// No need to sort pods if we are about to delete all of them
if rc.Spec.Replicas != 0 {
@@ -467,7 +456,20 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rc change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
deletedPodKeys := []string{}
for i := 0; i < diff; i++ {
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
}
// We use pod namespace/name as a UID to wait for deletions, so if the
// labels on a pod/rc change in a way that the pod gets orphaned, the
// rc will only wake up after the expectation has expired.
rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
@@ -475,8 +477,9 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
defer wait.Done()
if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.DeletionObserved(rcKey)
podKey := controller.PodKey(filteredPods[ix])
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rc.Namespace, rc.Name)
rm.expectations.DeletionObserved(rcKey, podKey)
utilruntime.HandleError(err)
}
}(i)