diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index cf07cd22842..fc377134d93 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/integer" + "k8s.io/kubernetes/pkg/util/sets" ) const ( @@ -243,11 +244,105 @@ func (e *ControlleeExpectations) GetExpectations() (int64, int64) { return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del) } -// NewControllerExpectations returns a store for ControlleeExpectations. +// NewControllerExpectations returns a store for ControllerExpectations. func NewControllerExpectations() *ControllerExpectations { return &ControllerExpectations{cache.NewStore(ExpKeyFunc)} } +// UIDSetKeyFunc to parse out the key from a UIDSet. +var UIDSetKeyFunc = func(obj interface{}) (string, error) { + if u, ok := obj.(*UIDSet); ok { + return u.key, nil + } + return "", fmt.Errorf("Could not find key for obj %#v", obj) +} + +// UIDSet holds a key and a set of UIDs. Used by the +// UIDTrackingControllerExpectations to remember which UID it has seen/still +// waiting for. +type UIDSet struct { + sets.String + key string +} + +// UIDTrackingControllerExpectations tracks the UID of the pods it deletes. +// This cache is needed over plain old expectations to safely handle graceful +// deletion. The desired behavior is to treat an update that sets the +// DeletionTimestamp on an object as a delete. To do so consistenly, one needs +// to remember the expected deletes so they aren't double counted. +// TODO: Track creates as well (#22599) +type UIDTrackingControllerExpectations struct { + ControllerExpectationsInterface + // TODO: There is a much nicer way to do this that involves a single store, + // a lock per entry, and a ControlleeExpectationsInterface type. + uidStoreLock sync.Mutex + // Store used for the UIDs associated with any expectation tracked via the + // ControllerExpectationsInterface. + uidStore cache.Store +} + +// GetUIDs is a convenience method to avoid exposing the set of expected uids. +// The returned set is not thread safe, all modifications must be made holding +// the uidStoreLock. +func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String { + if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists { + return uid.(*UIDSet).String + } + return nil +} + +// ExpectDeletions records expectations for the given deleteKeys, against the given controller. +func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error { + u.uidStoreLock.Lock() + defer u.uidStoreLock.Unlock() + + if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 { + glog.Errorf("Clobbering existing delete keys: %+v", existing) + } + expectedUIDs := sets.NewString() + for _, k := range deletedKeys { + expectedUIDs.Insert(k) + } + glog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys) + if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil { + return err + } + return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len()) +} + +// DeletionObserved records the given deleteKey as a deletion, for the given rc. +func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) { + u.uidStoreLock.Lock() + defer u.uidStoreLock.Unlock() + + uids := u.GetUIDs(rcKey) + if uids != nil && uids.Has(deleteKey) { + glog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey) + u.ControllerExpectationsInterface.DeletionObserved(rcKey) + uids.Delete(deleteKey) + } +} + +// DeleteExpectations deletes the UID set and invokes DeleteExpectations on the +// underlying ControllerExpectationsInterface. +func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) { + u.uidStoreLock.Lock() + defer u.uidStoreLock.Unlock() + + u.ControllerExpectationsInterface.DeleteExpectations(rcKey) + if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists { + if err := u.uidStore.Delete(uidExp); err != nil { + glog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err) + } + } +} + +// NewUIDTrackingControllerExpectations returns a wrapper around +// ControllerExpectations that is aware of deleteKeys. +func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations { + return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)} +} + // PodControlInterface is an interface that knows how to add or delete pods // created as an interface to allow testing. type PodControlInterface interface { @@ -517,6 +612,14 @@ func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions return active } +// PodKey returns a key unique to the given pod within a cluster. +// It's used so we consistently use the same key scheme in this module. +// It does exactly what cache.MetaNamespaceKeyFunc would have done +// expcept there's not possibility for error since we know the exact type. +func PodKey(pod *api.Pod) string { + return fmt.Sprintf("%v/%v", pod.Namespace, pod.Name) +} + // ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker. type ControllersByCreationTimestamp []*api.ReplicationController diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index b3afbc623d6..bf5c8782d4f 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -183,6 +183,57 @@ func TestControllerExpectations(t *testing.T) { } } +func TestUIDExpectations(t *testing.T) { + uidExp := NewUIDTrackingControllerExpectations(NewControllerExpectations()) + rcList := []*api.ReplicationController{ + newReplicationController(2), + newReplicationController(1), + newReplicationController(0), + newReplicationController(5), + } + rcToPods := map[string][]string{} + rcKeys := []string{} + for i := range rcList { + rc := rcList[i] + rcName := fmt.Sprintf("rc-%v", i) + rc.Name = rcName + rc.Spec.Selector[rcName] = rcName + podList := newPodList(nil, 5, api.PodRunning, rc) + rcKey, err := KeyFunc(rc) + if err != nil { + t.Fatalf("Couldn't get key for object %+v: %v", rc, err) + } + rcKeys = append(rcKeys, rcKey) + rcPodNames := []string{} + for i := range podList.Items { + p := &podList.Items[i] + p.Name = fmt.Sprintf("%v-%v", p.Name, rc.Name) + rcPodNames = append(rcPodNames, PodKey(p)) + } + rcToPods[rcKey] = rcPodNames + uidExp.ExpectDeletions(rcKey, rcPodNames) + } + for i := range rcKeys { + j := rand.Intn(i + 1) + rcKeys[i], rcKeys[j] = rcKeys[j], rcKeys[i] + } + for _, rcKey := range rcKeys { + if uidExp.SatisfiedExpectations(rcKey) { + t.Errorf("Controller %v satisfied expectations before deletion", rcKey) + } + for _, p := range rcToPods[rcKey] { + uidExp.DeletionObserved(rcKey, p) + } + if !uidExp.SatisfiedExpectations(rcKey) { + t.Errorf("Controller %v didn't satisfy expectations after deletion", rcKey) + } + uidExp.DeleteExpectations(rcKey) + if uidExp.GetUIDs(rcKey) != nil { + t.Errorf("Failed to delete uid expectations for %v", rcKey) + } + } +} + func TestCreatePods(t *testing.T) { ns := api.NamespaceDefault body := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}}) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 07fff80c097..28b5374d3f8 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -49,7 +49,7 @@ const ( // happens based on contents in local pod storage. FullControllerResyncPeriod = 30 * time.Second - // Realistic value of the burstReplica field for the replication manager based off + // Realistic value of the burstReplica field for the replica set manager based off // performance requirements for kubernetes 1.0. BurstReplicas = 500 @@ -73,8 +73,8 @@ type ReplicaSetController struct { // To allow injection of syncReplicaSet for testing. syncHandler func(rsKey string) error - // A TTLCache of pod creates/deletes each ReplicaSet expects to see - expectations controller.ControllerExpectationsInterface + // A TTLCache of pod creates/deletes each rc expects to see. + expectations *controller.UIDTrackingControllerExpectations // A store of ReplicaSets, populated by the rsController rsStore cache.StoreToReplicaSetLister @@ -107,7 +107,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}), }, burstReplicas: burstReplicas, - expectations: controller.NewControllerExpectations(), + expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.New(), } @@ -297,17 +297,16 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { } rsKey, err := controller.KeyFunc(rs) if err != nil { - glog.Errorf("Couldn't get key for replication controller %#v: %v", rs, err) + glog.Errorf("Couldn't get key for replica set %#v: %v", rs, err) return } if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. - glog.V(4).Infof("Add for pod %v with deletion timestamp %+v, counted as new deletion for rs %v", pod.Name, pod.DeletionTimestamp, rsKey) - rsc.expectations.DeletionObserved(rsKey) - } else { - rsc.expectations.CreationObserved(rsKey) + rsc.deletePod(pod) + return } + rsc.expectations.CreationObserved(rsKey) rsc.enqueueReplicaSet(rs) } @@ -326,22 +325,15 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { if rs == nil { return } - rsKey, err := controller.KeyFunc(rs) - if err != nil { - glog.Errorf("Couldn't get key for replication controller %#v: %v", rs, err) - return - } - if curPod.DeletionTimestamp != nil && oldPod.DeletionTimestamp == nil { + if curPod.DeletionTimestamp != nil { // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, // and after such time has passed, the kubelet actually deletes it from the store. We receive an update // for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait // until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because // an rs never initiates a phase change, and so is never asleep waiting for the same. - glog.V(4).Infof("Update to pod %v with deletion timestamp %+v counted as delete for rs %v", curPod.Name, curPod.DeletionTimestamp, rsKey) - rsc.expectations.DeletionObserved(rsKey) - } else { - glog.V(4).Infof("Update to pod %v with deletion timestamp %+v. Not counting it as a new deletion for rs %v", curPod.Name, curPod.DeletionTimestamp, rsKey) + rsc.deletePod(curPod) + return } rsc.enqueueReplicaSet(rs) @@ -375,21 +367,14 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { return } } - glog.V(4).Infof("Pod %s deleted: %+v.", pod.Name, pod) + glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) if rs := rsc.getPodReplicaSet(pod); rs != nil { rsKey, err := controller.KeyFunc(rs) if err != nil { glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err) return } - // This method only manages expectations for the case where a pod is - // deleted without a grace period. - if pod.DeletionTimestamp == nil { - glog.V(4).Infof("Received new delete for rs %v, pod %v", rsKey, pod.Name) - rsc.expectations.DeletionObserved(rsKey) - } else { - glog.V(4).Infof("Received delete for rs %v pod %v with non nil deletion timestamp %+v. Not counting it as a new deletion.", rsKey, pod.Name, pod.DeletionTimestamp) - } + rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) rsc.enqueueReplicaSet(rs) } } @@ -442,6 +427,11 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext if diff > rsc.burstReplicas { diff = rsc.burstReplicas } + // TODO: Track UIDs of creates just like deletes. The problem currently + // is we'd need to wait on the result of a create to record the pod's + // UID, which would require locking *across* the create, which will turn + // into a performance bottleneck. We should generate a UID for the pod + // beforehand and store it via ExpectCreations. rsc.expectations.ExpectCreations(rsKey, diff) wait := sync.WaitGroup{} wait.Add(diff) @@ -462,7 +452,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext if diff > rsc.burstReplicas { diff = rsc.burstReplicas } - rsc.expectations.ExpectDeletions(rsKey, diff) glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff) // No need to sort pods if we are about to delete all of them if rs.Spec.Replicas != 0 { @@ -471,7 +460,17 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(filteredPods)) } - + // Snapshot the UIDs (ns/name) of the pods we're expecting to see + // deleted, so we know to record their expectations exactly once either + // when we see it as an update of the deletion timestamp, or as a delete. + // Note that if the labels on a pod/rs change in a way that the pod gets + // orphaned, the rs will only wake up after the expectations have + // expired even if other pods are deleted. + deletedPodKeys := []string{} + for i := 0; i < diff; i++ { + deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) + } + rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys) wait := sync.WaitGroup{} wait.Add(diff) for i := 0; i < diff; i++ { @@ -479,8 +478,9 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext defer wait.Done() if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion - glog.V(2).Infof("Failed deletion, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) - rsc.expectations.DeletionObserved(rsKey) + podKey := controller.PodKey(filteredPods[ix]) + glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name) + rsc.expectations.DeletionObserved(rsKey, podKey) utilruntime.HandleError(err) } }(i) diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index fd371b886fa..657304e28aa 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand" "net/http/httptest" + "strings" "testing" "time" @@ -652,6 +653,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { } } +// TODO: This test is too hairy for a unittest. It should be moved to an E2E suite. func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} @@ -703,7 +705,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) podExp, exists, err := manager.expectations.GetExpectations(rsKey) if !exists || err != nil { - t.Fatalf("Did not find expectations for ReplicaSet.") + t.Fatalf("Did not find expectations for rc.") } if add, _ := podExp.GetExpectations(); add != 1 { t.Fatalf("Expectations are wrong %v", podExp) @@ -714,9 +716,27 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) expectedPods = burstReplicas } validateSyncReplicaSet(t, &fakePodControl, 0, expectedPods) - for i := 0; i < expectedPods-1; i++ { - manager.podStore.Store.Delete(&pods.Items[i]) - manager.deletePod(&pods.Items[i]) + + // To accurately simulate a watch we must delete the exact pods + // the rs is waiting for. + expectedDels := manager.expectations.GetUIDs(getKey(rsSpec, t)) + podsToDelete := []*api.Pod{} + for _, key := range expectedDels.List() { + nsName := strings.Split(key, "/") + podsToDelete = append(podsToDelete, &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: nsName[1], + Namespace: nsName[0], + Labels: rsSpec.Spec.Selector.MatchLabels, + }, + }) + } + // Don't delete all pods because we confirm that the last pod + // has exactly one expectation at the end, to verify that we + // don't double delete. + for i := range podsToDelete[1:] { + manager.podStore.Delete(podsToDelete[i]) + manager.deletePod(podsToDelete[i]) } podExp, exists, err := manager.expectations.GetExpectations(rsKey) if !exists || err != nil { @@ -739,8 +759,20 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) manager.podStore.Store.Add(&pods.Items[expectedPods-1]) manager.addPod(&pods.Items[expectedPods-1]) } else { - manager.podStore.Store.Delete(&pods.Items[expectedPods-1]) - manager.deletePod(&pods.Items[expectedPods-1]) + expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t)) + if expectedDel.Len() != 1 { + t.Fatalf("Waiting on unexpected number of deletes.") + } + nsName := strings.Split(expectedDel.List()[0], "/") + lastPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: nsName[1], + Namespace: nsName[0], + Labels: rsSpec.Spec.Selector.MatchLabels, + }, + } + manager.podStore.Store.Delete(lastPod) + manager.deletePod(lastPod) } pods.Items = pods.Items[expectedPods:] } @@ -788,14 +820,14 @@ func TestRSSyncExpectations(t *testing.T) { manager.podStore.Store.Add(&pods.Items[0]) postExpectationsPod := pods.Items[1] - manager.expectations = FakeRSExpectations{ + manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{ controller.NewControllerExpectations(), true, func() { // If we check active pods before checking expectataions, the // ReplicaSet will create a new replica because it doesn't see // this pod, but has fulfilled its expectations. manager.podStore.Store.Add(&postExpectationsPod) }, - } + }) manager.syncReplicaSet(getKey(rsSpec, t)) validateSyncReplicaSet(t, &fakePodControl, 0, 0) } @@ -925,7 +957,7 @@ func TestDeletionTimestamp(t *testing.T) { } pod := newPodList(nil, 1, api.PodPending, labelMap, rs).Items[0] pod.DeletionTimestamp = &unversioned.Time{time.Now()} - manager.expectations.SetExpectations(rsKey, 0, 1) + manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) // A pod added with a deletion timestamp should decrement deletions, not creations. manager.addPod(&pod) @@ -944,7 +976,7 @@ func TestDeletionTimestamp(t *testing.T) { // An update from no deletion timestamp to having one should be treated // as a deletion. oldPod := newPodList(nil, 1, api.PodPending, labelMap, rs).Items[0] - manager.expectations.SetExpectations(rsKey, 0, 1) + manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) manager.updatePod(&oldPod, &pod) queueRC, _ = manager.queue.Get() @@ -960,7 +992,14 @@ func TestDeletionTimestamp(t *testing.T) { // An update to the pod (including an update to the deletion timestamp) // should not be counted as a second delete. - manager.expectations.SetExpectations(rsKey, 0, 1) + secondPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: pod.Namespace, + Name: "secondPod", + Labels: pod.Labels, + }, + } + manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)}) oldPod.DeletionTimestamp = &unversioned.Time{time.Now()} manager.updatePod(&oldPod, &pod) @@ -977,9 +1016,8 @@ func TestDeletionTimestamp(t *testing.T) { t.Fatalf("Wrong expectations %+v", podExp) } - // A pod with a nil timestamp should be counted as a deletion. - pod.DeletionTimestamp = nil - manager.deletePod(&pod) + // Deleting the second pod should clear expectations. + manager.deletePod(secondPod) queueRC, _ = manager.queue.Get() if queueRC != rsKey { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index c0d7194afe7..fa2dce53fb3 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -72,8 +72,8 @@ type ReplicationManager struct { // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error - // A TTLCache of pod creates/deletes each rc expects to see - expectations controller.ControllerExpectationsInterface + // A TTLCache of pod creates/deletes each rc expects to see. + expectations *controller.UIDTrackingControllerExpectations // A store of replication controllers, populated by the rcController rcStore cache.StoreToReplicationControllerLister @@ -106,7 +106,7 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}), }, burstReplicas: burstReplicas, - expectations: controller.NewControllerExpectations(), + expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.New(), } @@ -300,11 +300,10 @@ func (rm *ReplicationManager) addPod(obj interface{}) { if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. - glog.V(4).Infof("Add for pod %v with deletion timestamp %+v, counted as new deletion for rc %v", pod.Name, pod.DeletionTimestamp, rcKey) - rm.expectations.DeletionObserved(rcKey) - } else { - rm.expectations.CreationObserved(rcKey) + rm.deletePod(pod) + return } + rm.expectations.CreationObserved(rcKey) rm.enqueueController(rc) } @@ -321,25 +320,17 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { if rc == nil { return } - rcKey, err := controller.KeyFunc(rc) - if err != nil { - glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) - return - } oldPod := old.(*api.Pod) - if curPod.DeletionTimestamp != nil && oldPod.DeletionTimestamp == nil { + if curPod.DeletionTimestamp != nil { // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, // and after such time has passed, the kubelet actually deletes it from the store. We receive an update // for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait // until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because // an rc never initiates a phase change, and so is never asleep waiting for the same. - glog.V(4).Infof("Update to pod %v with deletion timestamp %+v counted as delete for rc %v", curPod.Name, curPod.DeletionTimestamp, rcKey) - rm.expectations.DeletionObserved(rcKey) - } else { - glog.V(4).Infof("Update to pod %v with deletion timestamp %+v. Not counting it as a new deletion for rc %v.", curPod.Name, curPod.DeletionTimestamp, rcKey) + rm.deletePod(curPod) + return } - rm.enqueueController(rc) // Only need to get the old controller if the labels changed. if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { @@ -372,20 +363,14 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { return } } + glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v, labels %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod.Labels) if rc := rm.getPodController(pod); rc != nil { rcKey, err := controller.KeyFunc(rc) if err != nil { glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) return } - // This method only manages expectations for the case where a pod is - // deleted without a grace period. - if pod.DeletionTimestamp == nil { - glog.V(4).Infof("Received new delete for rc %v, pod %v", rcKey, pod.Name) - rm.expectations.DeletionObserved(rcKey) - } else { - glog.V(4).Infof("Received delete for rc %v pod %v with non nil deletion timestamp %+v. Not counting it as a new deletion.", rcKey, pod.Name, pod.DeletionTimestamp) - } + rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod)) rm.enqueueController(rc) } } @@ -438,6 +423,11 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re if diff > rm.burstReplicas { diff = rm.burstReplicas } + // TODO: Track UIDs of creates just like deletes. The problem currently + // is we'd need to wait on the result of a create to record the pod's + // UID, which would require locking *across* the create, which will turn + // into a performance bottleneck. We should generate a UID for the pod + // beforehand and store it via ExpectCreations. rm.expectations.ExpectCreations(rcKey, diff) wait := sync.WaitGroup{} wait.Add(diff) @@ -458,7 +448,6 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re if diff > rm.burstReplicas { diff = rm.burstReplicas } - rm.expectations.ExpectDeletions(rcKey, diff) glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) // No need to sort pods if we are about to delete all of them if rc.Spec.Replicas != 0 { @@ -467,7 +456,20 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(filteredPods)) } - + // Snapshot the UIDs (ns/name) of the pods we're expecting to see + // deleted, so we know to record their expectations exactly once either + // when we see it as an update of the deletion timestamp, or as a delete. + // Note that if the labels on a pod/rc change in a way that the pod gets + // orphaned, the rs will only wake up after the expectations have + // expired even if other pods are deleted. + deletedPodKeys := []string{} + for i := 0; i < diff; i++ { + deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) + } + // We use pod namespace/name as a UID to wait for deletions, so if the + // labels on a pod/rc change in a way that the pod gets orphaned, the + // rc will only wake up after the expectation has expired. + rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) wait := sync.WaitGroup{} wait.Add(diff) for i := 0; i < diff; i++ { @@ -475,8 +477,9 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re defer wait.Done() if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion - glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) - rm.expectations.DeletionObserved(rcKey) + podKey := controller.PodKey(filteredPods[ix]) + glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rc.Namespace, rc.Name) + rm.expectations.DeletionObserved(rcKey, podKey) utilruntime.HandleError(err) } }(i) diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 91768577467..1cf9f21129b 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand" "net/http/httptest" + "strings" "testing" "time" @@ -638,6 +639,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { } } +// TODO: This test is too hairy for a unittest. It should be moved to an E2E suite. func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} @@ -698,9 +700,27 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) expectedPods = burstReplicas } validateSyncReplication(t, &fakePodControl, 0, expectedPods) - for i := 0; i < expectedPods-1; i++ { - manager.podStore.Store.Delete(&pods.Items[i]) - manager.deletePod(&pods.Items[i]) + + // To accurately simulate a watch we must delete the exact pods + // the rc is waiting for. + expectedDels := manager.expectations.GetUIDs(getKey(controllerSpec, t)) + podsToDelete := []*api.Pod{} + for _, key := range expectedDels.List() { + nsName := strings.Split(key, "/") + podsToDelete = append(podsToDelete, &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: nsName[1], + Namespace: nsName[0], + Labels: controllerSpec.Spec.Selector, + }, + }) + } + // Don't delete all pods because we confirm that the last pod + // has exactly one expectation at the end, to verify that we + // don't double delete. + for i := range podsToDelete[1:] { + manager.podStore.Delete(podsToDelete[i]) + manager.deletePod(podsToDelete[i]) } podExp, exists, err := manager.expectations.GetExpectations(rcKey) if !exists || err != nil { @@ -723,8 +743,20 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) manager.podStore.Store.Add(&pods.Items[expectedPods-1]) manager.addPod(&pods.Items[expectedPods-1]) } else { - manager.podStore.Store.Delete(&pods.Items[expectedPods-1]) - manager.deletePod(&pods.Items[expectedPods-1]) + expectedDel := manager.expectations.GetUIDs(getKey(controllerSpec, t)) + if expectedDel.Len() != 1 { + t.Fatalf("Waiting on unexpected number of deletes.") + } + nsName := strings.Split(expectedDel.List()[0], "/") + lastPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: nsName[1], + Namespace: nsName[0], + Labels: controllerSpec.Spec.Selector, + }, + } + manager.podStore.Store.Delete(lastPod) + manager.deletePod(lastPod) } pods.Items = pods.Items[expectedPods:] } @@ -771,14 +803,14 @@ func TestRCSyncExpectations(t *testing.T) { manager.podStore.Store.Add(&pods.Items[0]) postExpectationsPod := pods.Items[1] - manager.expectations = FakeRCExpectations{ + manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRCExpectations{ controller.NewControllerExpectations(), true, func() { // If we check active pods before checking expectataions, the rc // will create a new replica because it doesn't see this pod, but // has fulfilled its expectations. manager.podStore.Store.Add(&postExpectationsPod) }, - } + }) manager.syncReplicationController(getKey(controllerSpec, t)) validateSyncReplication(t, &fakePodControl, 0, 0) } @@ -906,7 +938,7 @@ func TestDeletionTimestamp(t *testing.T) { } pod := newPodList(nil, 1, api.PodPending, controllerSpec).Items[0] pod.DeletionTimestamp = &unversioned.Time{time.Now()} - manager.expectations.SetExpectations(rcKey, 0, 1) + manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)}) // A pod added with a deletion timestamp should decrement deletions, not creations. manager.addPod(&pod) @@ -925,7 +957,7 @@ func TestDeletionTimestamp(t *testing.T) { // An update from no deletion timestamp to having one should be treated // as a deletion. oldPod := newPodList(nil, 1, api.PodPending, controllerSpec).Items[0] - manager.expectations.SetExpectations(rcKey, 0, 1) + manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)}) manager.updatePod(&oldPod, &pod) queueRC, _ = manager.queue.Get() @@ -941,7 +973,14 @@ func TestDeletionTimestamp(t *testing.T) { // An update to the pod (including an update to the deletion timestamp) // should not be counted as a second delete. - manager.expectations.SetExpectations(rcKey, 0, 1) + secondPod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: pod.Namespace, + Name: "secondPod", + Labels: pod.Labels, + }, + } + manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(secondPod)}) oldPod.DeletionTimestamp = &unversioned.Time{time.Now()} manager.updatePod(&oldPod, &pod) @@ -958,9 +997,8 @@ func TestDeletionTimestamp(t *testing.T) { t.Fatalf("Wrong expectations %+v", podExp) } - // A pod with a nil timestamp should be counted as a deletion. - pod.DeletionTimestamp = nil - manager.deletePod(&pod) + // Deleting the second pod should clear expectations. + manager.deletePod(secondPod) queueRC, _ = manager.queue.Get() if queueRC != rcKey { diff --git a/pkg/util/runtime/runtime.go b/pkg/util/runtime/runtime.go index 76d7cb46497..32b1c710abf 100644 --- a/pkg/util/runtime/runtime.go +++ b/pkg/util/runtime/runtime.go @@ -76,3 +76,14 @@ func HandleError(err error) { func logError(err error) { glog.ErrorDepth(2, err) } + +// GetCaller returns the caller of the function that calls it. +func GetCaller() string { + var pc [1]uintptr + runtime.Callers(3, pc[:]) + f := runtime.FuncForPC(pc[0]) + if f == nil { + return fmt.Sprintf("Unable to find caller") + } + return f.Name() +}