diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 3e2c5977e68..cffcc7795cf 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -18,7 +18,7 @@ package controller import ( "fmt" - "strings" + "sync" "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" @@ -46,6 +46,19 @@ func GetControllerOf(controllee metav1.Object) *metav1.OwnerReference { type baseControllerRefManager struct { controller metav1.Object selector labels.Selector + + canAdoptErr error + canAdoptOnce sync.Once + canAdoptFunc func() error +} + +func (m *baseControllerRefManager) canAdopt() error { + m.canAdoptOnce.Do(func() { + if m.canAdoptFunc != nil { + m.canAdoptErr = m.canAdoptFunc() + } + }) + return m.canAdoptErr } // claimObject tries to take ownership of an object for this controller. @@ -122,16 +135,27 @@ type PodControllerRefManager struct { // NewPodControllerRefManager returns a PodControllerRefManager that exposes // methods to manage the controllerRef of pods. +// +// The canAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If canAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once canAdopt() is called, it will not be called again by the same +// PodControllerRefManager instance. Create a new instance if it makes +// sense to check canAdopt() again (e.g. in a different sync pass). func NewPodControllerRefManager( podControl PodControlInterface, controller metav1.Object, selector labels.Selector, controllerKind schema.GroupVersionKind, + canAdopt func() error, ) *PodControllerRefManager { return &PodControllerRefManager{ baseControllerRefManager: baseControllerRefManager{ - controller: controller, - selector: selector, + controller: controller, + selector: selector, + canAdoptFunc: canAdopt, }, controllerKind: controllerKind, podControl: podControl, @@ -193,10 +217,8 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1. // AdoptPod sends a patch to take control of the pod. It returns the error if // the patching fails. func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { - // we should not adopt any pods if the controller is about to be deleted - if m.controller.GetDeletionTimestamp() != nil { - return fmt.Errorf("cancel the adopt attempt for pod %s because the controller is being deleted", - strings.Join([]string{pod.Namespace, pod.Name, string(pod.UID)}, "_")) + if err := m.canAdopt(); err != nil { + return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err) } // Note that ValidateOwnerReferences() will reject this patch if another // OwnerReference exists with controller=true. @@ -247,16 +269,27 @@ type ReplicaSetControllerRefManager struct { // NewReplicaSetControllerRefManager returns a ReplicaSetControllerRefManager that exposes // methods to manage the controllerRef of ReplicaSets. +// +// The canAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If canAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once canAdopt() is called, it will not be called again by the same +// ReplicaSetControllerRefManager instance. Create a new instance if it +// makes sense to check canAdopt() again (e.g. in a different sync pass). func NewReplicaSetControllerRefManager( rsControl RSControlInterface, controller metav1.Object, selector labels.Selector, controllerKind schema.GroupVersionKind, + canAdopt func() error, ) *ReplicaSetControllerRefManager { return &ReplicaSetControllerRefManager{ baseControllerRefManager: baseControllerRefManager{ - controller: controller, - selector: selector, + controller: controller, + selector: selector, + canAdoptFunc: canAdopt, }, controllerKind: controllerKind, rsControl: rsControl, @@ -305,19 +338,17 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.Rep // AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns the error if // the patching fails. -func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(replicaSet *extensions.ReplicaSet) error { - // we should not adopt any ReplicaSets if the Deployment is about to be deleted - if m.controller.GetDeletionTimestamp() != nil { - return fmt.Errorf("cancel the adopt attempt for RS %s because the controller %v is being deleted", - strings.Join([]string{replicaSet.Namespace, replicaSet.Name, string(replicaSet.UID)}, "_"), m.controller.GetName()) +func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *extensions.ReplicaSet) error { + if err := m.canAdopt(); err != nil { + return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err) } // Note that ValidateOwnerReferences() will reject this patch if another // OwnerReference exists with controller=true. addControllerPatch := fmt.Sprintf( `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, m.controllerKind.GroupVersion(), m.controllerKind.Kind, - m.controller.GetName(), m.controller.GetUID(), replicaSet.UID) - return m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(addControllerPatch)) + m.controller.GetName(), m.controller.GetUID(), rs.UID) + return m.rsControl.PatchReplicaSet(rs.Namespace, rs.Name, []byte(addControllerPatch)) } // ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller. @@ -342,3 +373,20 @@ func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *extension } return err } + +// RecheckDeletionTimestamp returns a canAdopt() function to recheck deletion. +// +// The canAdopt() function calls getObject() to fetch the latest value, +// and denies adoption attempts if that object has a non-nil DeletionTimestamp. +func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error { + return func() error { + obj, err := getObject() + if err != nil { + return fmt.Errorf("can't recheck DeletionTimestamp: %v", err) + } + if obj.GetDeletionTimestamp() != nil { + return fmt.Errorf("%v/%v has just been deleted at %v", obj.GetNamespace(), obj.GetName(), obj.GetDeletionTimestamp()) + } + return nil + } +} diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index a3a0ae761f9..733e8d31262 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -468,8 +468,20 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) if err != nil { return nil, err } + // If any adoptions are attempted, we should first recheck for deletion with + // an uncached quorum read sometime after listing Pods (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != ds.UID { + return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID) + } + return fresh, nil + }) // Use ControllerRefManager to adopt/orphan as needed. - cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind) + cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, canAdoptFunc) claimedPods, err := cm.ClaimPods(pods) if err != nil { return nil, err diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index 120a6aa0de9..a8e6052fbf7 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -339,15 +339,16 @@ func markPodsReady(store cache.Store) { // DaemonSets without node selectors should launch pods on every node. func TestSimpleDaemonSetLaunchesPods(t *testing.T) { - manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 5, nil) ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) + addNodes(manager.nodeStore, 0, 5, nil) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) } func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) { - manager, podControl, clientset := newTestController() + ds := newDaemonSet("foo") + manager, podControl, clientset := newTestController(ds) var updated *extensions.DaemonSet clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { @@ -360,7 +361,6 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) { return false, nil, nil }) - ds := newDaemonSet("foo") manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 5, nil) syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) @@ -383,33 +383,33 @@ func TestNoNodesDoesNothing(t *testing.T) { // DaemonSets without node selectors should launch on a single node in a // single node cluster. func TestOneNodeDaemonLaunchesPod(t *testing.T) { - manager, podControl, _ := newTestController() - manager.nodeStore.Add(newNode("only-node", nil)) ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) + manager.nodeStore.Add(newNode("only-node", nil)) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } // DaemonSets should place onto NotReady nodes func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) node := newNode("not-ready", nil) node.Status.Conditions = []v1.NodeCondition{ {Type: v1.NodeReady, Status: v1.ConditionFalse}, } manager.nodeStore.Add(node) - ds := newDaemonSet("foo") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } // DaemonSets should not place onto OutOfDisk nodes func TestOutOfDiskNodeDaemonDoesNotLaunchPod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) node := newNode("not-enough-disk", nil) node.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}} manager.nodeStore.Add(node) - ds := newDaemonSet("foo") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } @@ -436,15 +436,15 @@ func allocatableResources(memory, cpu string) v1.ResourceList { // DaemonSets should not place onto nodes with insufficient free resource func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { podSpec := resourcePodSpec("too-much-mem", "75M", "75m") - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec + manager, podControl, _ := newTestController(ds) node := newNode("too-much-mem", nil) node.Status.Allocatable = allocatableResources("100M", "200m") manager.nodeStore.Add(node) manager.podStore.Add(&v1.Pod{ Spec: podSpec, }) - ds := newDaemonSet("foo") - ds.Spec.Template.Spec = podSpec manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } @@ -453,22 +453,24 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { func TestInsufficentCapacityNodeDaemonDoesNotUnscheduleRunningPod(t *testing.T) { podSpec := resourcePodSpec("too-much-mem", "75M", "75m") podSpec.NodeName = "too-much-mem" - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec + manager, podControl, _ := newTestController(ds) node := newNode("too-much-mem", nil) node.Status.Allocatable = allocatableResources("100M", "200m") manager.nodeStore.Add(node) manager.podStore.Add(&v1.Pod{ Spec: podSpec, }) - ds := newDaemonSet("foo") - ds.Spec.Template.Spec = podSpec manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } func TestSufficientCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) { podSpec := resourcePodSpec("too-much-mem", "75M", "75m") - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec + manager, podControl, _ := newTestController(ds) node := newNode("too-much-mem", nil) node.Status.Allocatable = allocatableResources("100M", "200m") manager.nodeStore.Add(node) @@ -476,8 +478,6 @@ func TestSufficientCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) { Spec: podSpec, Status: v1.PodStatus{Phase: v1.PodSucceeded}, }) - ds := newDaemonSet("foo") - ds.Spec.Template.Spec = podSpec manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } @@ -485,30 +485,29 @@ func TestSufficientCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) { // DaemonSets should place onto nodes with sufficient free resource func TestSufficientCapacityNodeDaemonLaunchesPod(t *testing.T) { podSpec := resourcePodSpec("not-too-much-mem", "75M", "75m") - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec + manager, podControl, _ := newTestController(ds) node := newNode("not-too-much-mem", nil) node.Status.Allocatable = allocatableResources("200M", "200m") manager.nodeStore.Add(node) manager.podStore.Add(&v1.Pod{ Spec: podSpec, }) - ds := newDaemonSet("foo") - ds.Spec.Template.Spec = podSpec manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } // DaemonSet should launch a pod on a node with taint NetworkUnavailable condition. func TestNetworkUnavailableNodeDaemonLaunchesPod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("simple") + manager, podControl, _ := newTestController(ds) node := newNode("network-unavailable", nil) node.Status.Conditions = []v1.NodeCondition{ {Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue}, } manager.nodeStore.Add(node) - - ds := newDaemonSet("simple") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) @@ -517,18 +516,38 @@ func TestNetworkUnavailableNodeDaemonLaunchesPod(t *testing.T) { // DaemonSets not take any actions when being deleted func TestDontDoAnythingIfBeingDeleted(t *testing.T) { podSpec := resourcePodSpec("not-too-much-mem", "75M", "75m") - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec + now := metav1.Now() + ds.DeletionTimestamp = &now + manager, podControl, _ := newTestController(ds) node := newNode("not-too-much-mem", nil) node.Status.Allocatable = allocatableResources("200M", "200m") manager.nodeStore.Add(node) manager.podStore.Add(&v1.Pod{ Spec: podSpec, }) + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +func TestDontDoAnythingIfBeingDeletedRace(t *testing.T) { + // Bare client says it IS deleted. ds := newDaemonSet("foo") - ds.Spec.Template.Spec = podSpec now := metav1.Now() ds.DeletionTimestamp = &now - manager.dsStore.Add(ds) + manager, podControl, _ := newTestController(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // Lister (cache) says it's NOT deleted. + ds2 := *ds + ds2.DeletionTimestamp = nil + manager.dsStore.Add(&ds2) + + // The existence of a matching orphan should block all actions in this state. + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + manager.podStore.Add(pod) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } @@ -603,14 +622,14 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) { }}, }}, } - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec2 + manager, podControl, _ := newTestController(ds) node := newNode("no-port-conflict", nil) manager.nodeStore.Add(node) manager.podStore.Add(&v1.Pod{ Spec: podSpec1, }) - ds := newDaemonSet("foo") - ds.Spec.Template.Spec = podSpec2 manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } @@ -619,19 +638,6 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) { // // issue https://github.com/kubernetes/kubernetes/pull/23223 func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) { - manager, podControl, _ := newTestController() - manager.nodeStore.Add(newNode("node1", nil)) - // Create pod not controlled by a daemonset. - manager.podStore.Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"bang": "boom"}, - Namespace: metav1.NamespaceDefault, - }, - Spec: v1.PodSpec{ - NodeName: "node1", - }, - }) - // Create a misconfigured DaemonSet. An empty pod selector is invalid but could happen // if we upgrade and make a backwards incompatible change. // @@ -645,6 +651,19 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) { ls := metav1.LabelSelector{} ds.Spec.Selector = &ls ds.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"} + + manager, podControl, _ := newTestController(ds) + manager.nodeStore.Add(newNode("node1", nil)) + // Create pod not controlled by a daemonset. + manager.podStore.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"bang": "boom"}, + Namespace: metav1.NamespaceDefault, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + }, + }) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) @@ -652,8 +671,8 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) { // Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. func TestDealsWithExistingPods(t *testing.T) { - manager, podControl, _ := newTestController() ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 5, nil) addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1) @@ -665,20 +684,20 @@ func TestDealsWithExistingPods(t *testing.T) { // Daemon with node selector should launch pods on nodes matching selector. func TestSelectorDaemonLaunchesPods(t *testing.T) { - manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 4, nil) - addNodes(manager.nodeStore, 4, 3, simpleNodeLabel) daemon := newDaemonSet("foo") daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager, podControl, _ := newTestController(daemon) + addNodes(manager.nodeStore, 0, 4, nil) + addNodes(manager.nodeStore, 4, 3, simpleNodeLabel) manager.dsStore.Add(daemon) syncAndValidateDaemonSets(t, manager, daemon, podControl, 3, 0) } // Daemon with node selector should delete pods from nodes that do not satisfy selector. func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { - manager, podControl, _ := newTestController() ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager, podControl, _ := newTestController(ds) manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 5, nil) addNodes(manager.nodeStore, 5, 5, simpleNodeLabel) @@ -691,9 +710,9 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { // DaemonSet with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes. func TestSelectorDaemonDealsWithExistingPods(t *testing.T) { - manager, podControl, _ := newTestController() ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager, podControl, _ := newTestController(ds) manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 5, nil) addNodes(manager.nodeStore, 5, 5, simpleNodeLabel) @@ -721,53 +740,50 @@ func TestBadSelectorDaemonDoesNothing(t *testing.T) { // DaemonSet with node name should launch pod on node with corresponding name. func TestNameDaemonSetLaunchesPods(t *testing.T) { - manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 5, nil) ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeName = "node-0" + manager, podControl, _ := newTestController(ds) + addNodes(manager.nodeStore, 0, 5, nil) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } // DaemonSet with node name that does not exist should not launch pods. func TestBadNameDaemonSetDoesNothing(t *testing.T) { - manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 5, nil) ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeName = "node-10" + manager, podControl, _ := newTestController(ds) + addNodes(manager.nodeStore, 0, 5, nil) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } // DaemonSet with node selector, and node name, matching a node, should launch a pod on the node. func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) { - manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 4, nil) - addNodes(manager.nodeStore, 4, 3, simpleNodeLabel) ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel ds.Spec.Template.Spec.NodeName = "node-6" + manager, podControl, _ := newTestController(ds) + addNodes(manager.nodeStore, 0, 4, nil) + addNodes(manager.nodeStore, 4, 3, simpleNodeLabel) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } // DaemonSet with node selector that matches some nodes, and node name that matches a different node, should do nothing. func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) { - manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 4, nil) - addNodes(manager.nodeStore, 4, 3, simpleNodeLabel) ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel ds.Spec.Template.Spec.NodeName = "node-0" + manager, podControl, _ := newTestController(ds) + addNodes(manager.nodeStore, 0, 4, nil) + addNodes(manager.nodeStore, 4, 3, simpleNodeLabel) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } // Daemon with node affinity should launch pods on nodes matching affinity. func TestNodeAffinityDaemonLaunchesPods(t *testing.T) { - manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 4, nil) - addNodes(manager.nodeStore, 4, 3, simpleNodeLabel) daemon := newDaemonSet("foo") daemon.Spec.Template.Spec.Affinity = &v1.Affinity{ NodeAffinity: &v1.NodeAffinity{ @@ -786,13 +802,17 @@ func TestNodeAffinityDaemonLaunchesPods(t *testing.T) { }, }, } + + manager, podControl, _ := newTestController(daemon) + addNodes(manager.nodeStore, 0, 4, nil) + addNodes(manager.nodeStore, 4, 3, simpleNodeLabel) manager.dsStore.Add(daemon) syncAndValidateDaemonSets(t, manager, daemon, podControl, 3, 0) } func TestNumberReadyStatus(t *testing.T) { ds := newDaemonSet("foo") - manager, podControl, clientset := newTestController() + manager, podControl, clientset := newTestController(ds) var updated *extensions.DaemonSet clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { if action.GetSubresource() != "status" { @@ -829,7 +849,7 @@ func TestNumberReadyStatus(t *testing.T) { func TestObservedGeneration(t *testing.T) { ds := newDaemonSet("foo") ds.Generation = 1 - manager, podControl, clientset := newTestController() + manager, podControl, clientset := newTestController(ds) var updated *extensions.DaemonSet clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { if action.GetSubresource() != "status" { @@ -866,8 +886,8 @@ func TestDaemonKillFailedPods(t *testing.T) { for _, test := range tests { t.Logf("test case: %s\n", test.test) - manager, podControl, _ := newTestController() ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 1, nil) addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numFailedPods) @@ -878,13 +898,12 @@ func TestDaemonKillFailedPods(t *testing.T) { // DaemonSet should not launch a pod on a tainted node when the pod doesn't tolerate that taint. func TestTaintedNodeDaemonDoesNotLaunchUntoleratePod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("untolerate") + manager, podControl, _ := newTestController(ds) node := newNode("tainted", nil) setNodeTaint(node, noScheduleTaints) manager.nodeStore.Add(node) - - ds := newDaemonSet("untolerate") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) @@ -892,14 +911,13 @@ func TestTaintedNodeDaemonDoesNotLaunchUntoleratePod(t *testing.T) { // DaemonSet should launch a pod on a tainted node when the pod can tolerate that taint. func TestTaintedNodeDaemonLaunchesToleratePod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("tolerate") + setDaemonSetToleration(ds, noScheduleTolerations) + manager, podControl, _ := newTestController(ds) node := newNode("tainted", nil) setNodeTaint(node, noScheduleTaints) manager.nodeStore.Add(node) - - ds := newDaemonSet("tolerate") - setDaemonSetToleration(ds, noScheduleTolerations) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) @@ -907,7 +925,8 @@ func TestTaintedNodeDaemonLaunchesToleratePod(t *testing.T) { // DaemonSet should launch a pod on a not ready node with taint notReady:NoExecute. func TestNotReadyNodeDaemonLaunchesPod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("simple") + manager, podControl, _ := newTestController(ds) node := newNode("tainted", nil) setNodeTaint(node, nodeNotReady) @@ -915,8 +934,6 @@ func TestNotReadyNodeDaemonLaunchesPod(t *testing.T) { {Type: v1.NodeReady, Status: v1.ConditionFalse}, } manager.nodeStore.Add(node) - - ds := newDaemonSet("simple") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) @@ -924,7 +941,8 @@ func TestNotReadyNodeDaemonLaunchesPod(t *testing.T) { // DaemonSet should launch a pod on an unreachable node with taint unreachable:NoExecute. func TestUnreachableNodeDaemonLaunchesPod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("simple") + manager, podControl, _ := newTestController(ds) node := newNode("tainted", nil) setNodeTaint(node, nodeUnreachable) @@ -932,8 +950,6 @@ func TestUnreachableNodeDaemonLaunchesPod(t *testing.T) { {Type: v1.NodeReady, Status: v1.ConditionUnknown}, } manager.nodeStore.Add(node) - - ds := newDaemonSet("simple") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) @@ -941,13 +957,12 @@ func TestUnreachableNodeDaemonLaunchesPod(t *testing.T) { // DaemonSet should launch a pod on an untainted node when the pod has tolerations. func TestNodeDaemonLaunchesToleratePod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("tolerate") + setDaemonSetToleration(ds, noScheduleTolerations) + manager, podControl, _ := newTestController(ds) node := newNode("untainted", nil) manager.nodeStore.Add(node) - - ds := newDaemonSet("tolerate") - setDaemonSetToleration(ds, noScheduleTolerations) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) @@ -963,7 +978,9 @@ func setDaemonSetToleration(ds *extensions.DaemonSet, tolerations []v1.Toleratio // DaemonSet should launch a critical pod even when the node is OutOfDisk. func TestOutOfDiskNodeDaemonLaunchesCriticalPod(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("critical") + setDaemonSetCritical(ds) + manager, podControl, _ := newTestController(ds) node := newNode("not-enough-disk", nil) node.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}} @@ -971,8 +988,6 @@ func TestOutOfDiskNodeDaemonLaunchesCriticalPod(t *testing.T) { // Without enabling critical pod annotation feature gate, we shouldn't create critical pod utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=False") - ds := newDaemonSet("critical") - setDaemonSetCritical(ds) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) @@ -984,7 +999,11 @@ func TestOutOfDiskNodeDaemonLaunchesCriticalPod(t *testing.T) { // DaemonSet should launch a critical pod even when the node has insufficient free resource. func TestInsufficientCapacityNodeDaemonLaunchesCriticalPod(t *testing.T) { podSpec := resourcePodSpec("too-much-mem", "75M", "75m") - manager, podControl, _ := newTestController() + ds := newDaemonSet("critical") + ds.Spec.Template.Spec = podSpec + setDaemonSetCritical(ds) + + manager, podControl, _ := newTestController(ds) node := newNode("too-much-mem", nil) node.Status.Allocatable = allocatableResources("100M", "200m") manager.nodeStore.Add(node) @@ -994,9 +1013,6 @@ func TestInsufficientCapacityNodeDaemonLaunchesCriticalPod(t *testing.T) { // Without enabling critical pod annotation feature gate, we shouldn't create critical pod utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=False") - ds := newDaemonSet("critical") - ds.Spec.Template.Spec = podSpec - setDaemonSetCritical(ds) manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) @@ -1220,9 +1236,9 @@ func TestUpdateNode(t *testing.T) { } func TestGetNodesToDaemonPods(t *testing.T) { - manager, _, _ := newTestController() ds := newDaemonSet("foo") ds2 := newDaemonSet("foo2") + manager, _, _ := newTestController(ds, ds2) manager.dsStore.Add(ds) manager.dsStore.Add(ds2) addNodes(manager.nodeStore, 0, 2, nil) diff --git a/pkg/controller/daemon/update_test.go b/pkg/controller/daemon/update_test.go index ea16f6fcbb0..529c2c81c37 100644 --- a/pkg/controller/daemon/update_test.go +++ b/pkg/controller/daemon/update_test.go @@ -24,10 +24,10 @@ import ( ) func TestDaemonSetUpdatesPods(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) maxUnavailable := 2 addNodes(manager.nodeStore, 0, 5, nil) - ds := newDaemonSet("foo") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) markPodsReady(podControl.podStore) @@ -63,10 +63,10 @@ func TestDaemonSetUpdatesPods(t *testing.T) { } func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) maxUnavailable := 3 addNodes(manager.nodeStore, 0, 5, nil) - ds := newDaemonSet("foo") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) markPodsReady(podControl.podStore) @@ -90,10 +90,10 @@ func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) { } func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) maxUnavailable := 3 addNodes(manager.nodeStore, 0, 5, nil) - ds := newDaemonSet("foo") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) @@ -116,10 +116,10 @@ func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) { } func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) { - manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + manager, podControl, _ := newTestController(ds) maxUnavailable := 3 addNodes(manager.nodeStore, 0, 5, nil) - ds := newDaemonSet("foo") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index ea3d9e12cbd..9523eb2ef06 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -501,7 +501,19 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(d *extensions.Deploy if err != nil { return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) } - cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind) + // If any adoptions are attempted, we should first recheck for deletion with + // an uncached quorum read sometime after listing ReplicaSets (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := dc.client.ExtensionsV1beta1().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != d.UID { + return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID) + } + return fresh, nil + }) + cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc) return cm.ClaimReplicaSets(rsList) } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index ff9c9561f77..27f1f3a58ba 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -148,6 +148,11 @@ type fixture struct { objects []runtime.Object } +func (f *fixture) expectGetDeploymentAction(d *extensions.Deployment) { + action := core.NewGetAction(schema.GroupVersionResource{Resource: "deployments"}, d.Namespace, d.Name) + f.actions = append(f.actions, action) +} + func (f *fixture) expectUpdateDeploymentStatusAction(d *extensions.Deployment) { action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "deployments"}, d.Namespace, d) action.Subresource = "status" @@ -190,15 +195,25 @@ func (f *fixture) newController() (*DeploymentController, informers.SharedInform return c, informers } +func (f *fixture) runExpectError(deploymentName string) { + f.run_(deploymentName, true) +} + func (f *fixture) run(deploymentName string) { + f.run_(deploymentName, false) +} + +func (f *fixture) run_(deploymentName string, expectError bool) { c, informers := f.newController() stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) err := c.syncDeployment(deploymentName) - if err != nil { + if !expectError && err != nil { f.t.Errorf("error syncing deployment: %v", err) + } else if expectError && err == nil { + f.t.Error("expected error syncing deployment, got nil") } actions := filterInformerActions(f.client.Actions()) @@ -293,6 +308,30 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) { f.run(getKey(d, t)) } +func TestSyncDeploymentDeletionRace(t *testing.T) { + f := newFixture(t) + + d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) + d2 := *d + // Lister (cache) says NOT deleted. + f.dLister = append(f.dLister, d) + // Bare client says it IS deleted. This should be presumed more up-to-date. + now := metav1.Now() + d2.DeletionTimestamp = &now + f.objects = append(f.objects, &d2) + + // The recheck is only triggered if a matching orphan exists. + rs := newReplicaSet(d, "rs1", 1) + rs.OwnerReferences = nil + f.objects = append(f.objects, rs) + f.rsLister = append(f.rsLister, rs) + + // Expect to only recheck DeletionTimestamp. + f.expectGetDeploymentAction(d) + // Sync should fail and requeue to let cache catch up. + f.runExpectError(getKey(d, t)) +} + // issue: https://github.com/kubernetes/kubernetes/issues/23218 func TestDontSyncDeploymentsWithEmptyPodSelector(t *testing.T) { f := newFixture(t) diff --git a/pkg/controller/replicaset/BUILD b/pkg/controller/replicaset/BUILD index 57e78d44f4f..e9f73575995 100644 --- a/pkg/controller/replicaset/BUILD +++ b/pkg/controller/replicaset/BUILD @@ -68,6 +68,7 @@ go_test( "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/testing", + "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index c20906cfaea..98970d7840c 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -564,7 +564,19 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { filteredPods = append(filteredPods, pod) } } - cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind) + // If any adoptions are attempted, we should first recheck for deletion with + // an uncached quorum read sometime after listing Pods (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := rsc.kubeClient.ExtensionsV1beta1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != rs.UID { + return nil, fmt.Errorf("original ReplicaSet %v/%v is gone: got uid %v, wanted %v", rs.Namespace, rs.Name, fresh.UID, rs.UID) + } + return fresh, nil + }) + cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind, canAdoptFunc) // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. filteredPods, err = cm.ClaimPods(filteredPods) diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index e34e07ed854..ec074b4e531 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -19,6 +19,7 @@ limitations under the License. package replicaset import ( + "errors" "fmt" "math/rand" "net/http/httptest" @@ -39,6 +40,7 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/v1" @@ -87,7 +89,7 @@ func skipListerFunc(verb string, url url.URL) bool { if verb != "GET" { return false } - if strings.HasSuffix(url.Path, "/pods") || strings.HasSuffix(url.Path, "/replicasets") { + if strings.HasSuffix(url.Path, "/pods") || strings.Contains(url.Path, "/replicasets") { return true } return false @@ -247,7 +249,9 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { } func TestSyncReplicaSetDeletes(t *testing.T) { - client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(1, labelMap) + client := fake.NewSimpleClientset(rsSpec) fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) @@ -255,8 +259,6 @@ func TestSyncReplicaSetDeletes(t *testing.T) { manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected - labelMap := map[string]string{"foo": "bar"} - rsSpec := newReplicaSet(1, labelMap) informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 2, v1.PodRunning, labelMap, rsSpec, "pod") @@ -300,15 +302,15 @@ func TestDeleteFinalStateUnknown(t *testing.T) { } func TestSyncReplicaSetCreates(t *testing.T) { - client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + client := fake.NewSimpleClientset(rs) stopCh := make(chan struct{}) defer close(stopCh) manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // A controller with 2 replicas and no active pods in the store. // Inactive pods should be ignored. 2 creates expected. - labelMap := map[string]string{"foo": "bar"} - rs := newReplicaSet(2, labelMap) informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) failedPod := newPod("failed-pod", rs, v1.PodFailed, nil, true) deletedPod := newPod("deleted-pod", rs, v1.PodRunning, nil, true) @@ -778,21 +780,20 @@ func TestUpdatePods(t *testing.T) { func TestControllerUpdateRequeue(t *testing.T) { // This server should force a requeue of the controller because it fails to update status.Replicas. - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 500, - ResponseBody: "{}", - SkipRequestFn: skipListerFunc, - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - - stopCh := make(chan struct{}) - defer close(stopCh) - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) - labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(1, labelMap) + client := fake.NewSimpleClientset(rs) + client.PrependReactor("update", "replicasets", + func(action core.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "status" { + return false, nil, nil + } + return true, nil, errors.New("failed to update status") + }) + stopCh := make(chan struct{}) + defer close(stopCh) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) + informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: 2} newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap, rs, "pod") @@ -800,13 +801,14 @@ func TestControllerUpdateRequeue(t *testing.T) { fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl - // an error from the sync function will be requeued, check to make sure we returned an error - err := manager.syncReplicaSet(getKey(rs, t)) - if err == nil { - t.Errorf("missing error for requeue") + // Enqueue once. Then process it. Disable rate-limiting for this. + manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter()) + manager.enqueueReplicaSet(rs) + manager.processNextWorkItem() + // It should have been requeued. + if got, want := manager.queue.Len(), 1; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } - // 1 Update and 1 GET, both of which fail - fakeHandler.ValidateRequestCount(t, 2) } func TestControllerUpdateStatusWithFailure(t *testing.T) { @@ -856,15 +858,15 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { // TODO: This test is too hairy for a unittest. It should be moved to an E2E suite. func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { - client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(numReplicas, labelMap) + client := fake.NewSimpleClientset(rsSpec) fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas) manager.podControl = &fakePodControl - labelMap := map[string]string{"foo": "bar"} - rsSpec := newReplicaSet(numReplicas, labelMap) informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) expectedPods := int32(0) @@ -1044,12 +1046,12 @@ func TestRSSyncExpectations(t *testing.T) { } func TestDeleteControllerAndExpectations(t *testing.T) { - client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + rs := newReplicaSet(1, map[string]string{"foo": "bar"}) + client := fake.NewSimpleClientset(rs) stopCh := make(chan struct{}) defer close(stopCh) manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10) - rs := newReplicaSet(1, map[string]string{"foo": "bar"}) informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) fakePodControl := controller.FakePodControl{} @@ -1440,11 +1442,11 @@ func TestUpdateSelectorControllerRef(t *testing.T) { func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + now := metav1.Now() + rs.DeletionTimestamp = &now stopCh := make(chan struct{}) defer close(stopCh) manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs) - now := metav1.Now() - rs.DeletionTimestamp = &now informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) pod1 := newPod("pod1", rs, v1.PodRunning, nil, false) informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1) @@ -1457,6 +1459,33 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { validateSyncReplicaSet(t, fakePodControl, 0, 0, 0) } +func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { + labelMap := map[string]string{"foo": "bar"} + // Bare client says it IS deleted. + rs := newReplicaSet(2, labelMap) + now := metav1.Now() + rs.DeletionTimestamp = &now + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs) + // Lister (cache) says it's NOT deleted. + rs2 := *rs + rs2.DeletionTimestamp = nil + informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(&rs2) + + // Recheck occurs if a matching orphan is present. + pod1 := newPod("pod1", rs, v1.PodRunning, nil, false) + informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // sync should abort. + err := manager.syncReplicaSet(getKey(rs, t)) + if err == nil { + t.Error("syncReplicaSet() err = nil, expected non-nil") + } + // no patch, no create. + validateSyncReplicaSet(t, fakePodControl, 0, 0, 0) +} + func TestReadyReplicas(t *testing.T) { // This is a happy server just to record the PUT request we expect for status.Replicas fakeHandler := utiltesting.FakeHandler{ diff --git a/pkg/controller/replication/BUILD b/pkg/controller/replication/BUILD index 9a920a39e6c..00d190f19de 100644 --- a/pkg/controller/replication/BUILD +++ b/pkg/controller/replication/BUILD @@ -66,6 +66,7 @@ go_test( "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/testing", + "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 9ef5ff0e0f2..872acb0656c 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -582,7 +582,19 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { filteredPods = append(filteredPods, pod) } } - cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind) + // If any adoptions are attempted, we should first recheck for deletion with + // an uncached quorum read sometime after listing Pods (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := rm.kubeClient.CoreV1().ReplicationControllers(rc.Namespace).Get(rc.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != rc.UID { + return nil, fmt.Errorf("original ReplicationController %v/%v is gone: got uid %v, wanted %v", rc.Namespace, rc.Name, fresh.UID, rc.UID) + } + return fresh, nil + }) + cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind, canAdoptFunc) // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. filteredPods, err = cm.ClaimPods(filteredPods) diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 0353aea1693..b1f2699396d 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -19,9 +19,11 @@ limitations under the License. package replication import ( + "errors" "fmt" "math/rand" "net/http/httptest" + "net/url" "reflect" "strings" "testing" @@ -38,6 +40,7 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/v1" @@ -211,17 +214,21 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { } func TestSyncReplicationControllerDeletes(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + controllerSpec := newReplicationController(1) + + c := fake.NewSimpleClientset(controllerSpec) fakePodControl := controller.FakePodControl{} manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected - controllerSpec := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(controllerSpec) newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodRunning, controllerSpec, "pod") - manager.syncReplicationController(getKey(controllerSpec, t)) + err := manager.syncReplicationController(getKey(controllerSpec, t)) + if err != nil { + t.Fatalf("syncReplicationController() error: %v", err) + } validateSyncReplication(t, &fakePodControl, 0, 1, 0) } @@ -258,12 +265,12 @@ func TestDeleteFinalStateUnknown(t *testing.T) { } func TestSyncReplicationControllerCreates(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + rc := newReplicationController(2) + c := fake.NewSimpleClientset(rc) manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // A controller with 2 replicas and no active pods in the store. // Inactive pods should be ignored. 2 creates expected. - rc := newReplicationController(2) rcInformer.Informer().GetIndexer().Add(rc) failedPod := newPod("failed-pod", rc, v1.PodFailed, nil, true) deletedPod := newPod("deleted-pod", rc, v1.PodRunning, nil, true) @@ -282,6 +289,13 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { fakeHandler := utiltesting.FakeHandler{ StatusCode: 200, ResponseBody: "", + SkipRequestFn: func(verb string, url url.URL) bool { + if verb == "GET" { + // Ignore refetch to check DeletionTimestamp. + return true + } + return false + }, } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() @@ -361,20 +375,12 @@ func TestControllerUpdateReplicas(t *testing.T) { } func TestSyncReplicationControllerDormancy(t *testing.T) { - // Setup a test server so we can lie about the current state of pods - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "{}", - T: t, - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + controllerSpec := newReplicationController(2) + c := fake.NewSimpleClientset(controllerSpec) fakePodControl := controller.FakePodControl{} manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = &fakePodControl - controllerSpec := newReplicationController(2) rcInformer.Informer().GetIndexer().Add(controllerSpec) newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, controllerSpec, "pod") @@ -416,10 +422,6 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { fakePodControl.Err = nil manager.syncReplicationController(getKey(controllerSpec, t)) validateSyncReplication(t, &fakePodControl, 1, 0, 0) - - // 2 PUT for the rc status during dormancy window. - // Note that the pod creates go through pod control so they're not recorded. - fakeHandler.ValidateRequestCount(t, 2) } func TestPodControllerLookup(t *testing.T) { @@ -699,17 +701,17 @@ func TestUpdatePods(t *testing.T) { func TestControllerUpdateRequeue(t *testing.T) { // This server should force a requeue of the controller because it fails to update status.Replicas. - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 500, - ResponseBody: "", - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - - c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + rc := newReplicationController(1) + c := fake.NewSimpleClientset(rc) + c.PrependReactor("update", "replicationcontrollers", + func(action core.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "status" { + return false, nil, nil + } + return true, nil, errors.New("failed to update status") + }) manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - rc := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(rc) rc.Status = v1.ReplicationControllerStatus{Replicas: 2} newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, rc, "pod") @@ -717,13 +719,14 @@ func TestControllerUpdateRequeue(t *testing.T) { fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl - // an error from the sync function will be requeued, check to make sure we returned an error - if err := manager.syncReplicationController(getKey(rc, t)); err == nil { - t.Errorf("missing error for requeue") + // Enqueue once. Then process it. Disable rate-limiting for this. + manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter()) + manager.enqueueController(rc) + manager.processNextWorkItem() + // It should have been requeued. + if got, want := manager.queue.Len(), 1; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } - - // 1 Update and 1 GET, both of which fail - fakeHandler.ValidateRequestCount(t, 2) } func TestControllerUpdateStatusWithFailure(t *testing.T) { @@ -775,12 +778,12 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { // TODO: This test is too hairy for a unittest. It should be moved to an E2E suite. func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + controllerSpec := newReplicationController(numReplicas) + c := fake.NewSimpleClientset(controllerSpec) fakePodControl := controller.FakePodControl{} manager, podInformer, rcInformer := newReplicationManagerFromClient(c, burstReplicas) manager.podControl = &fakePodControl - controllerSpec := newReplicationController(numReplicas) rcInformer.Informer().GetIndexer().Add(controllerSpec) expectedPods := 0 @@ -956,10 +959,10 @@ func TestRCSyncExpectations(t *testing.T) { } func TestDeleteControllerAndExpectations(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + rc := newReplicationController(1) + c := fake.NewSimpleClientset(rc) manager, podInformer, rcInformer := newReplicationManagerFromClient(c, 10) - rc := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(rc) fakePodControl := controller.FakePodControl{} @@ -1236,8 +1239,8 @@ func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationMana } func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled() rc := newReplicationController(2) + manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) rcInformer.Informer().GetIndexer().Add(rc) var trueVar = true otherControllerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1", Kind: "ReplicationController", Name: "AnotherRC", Controller: &trueVar} @@ -1335,8 +1338,8 @@ func TestPatchExtraPodsThenDelete(t *testing.T) { } func TestUpdateLabelsRemoveControllerRef(t *testing.T) { - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled() rc := newReplicationController(2) + manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) rcInformer.Informer().GetIndexer().Add(rc) // put one pod in the podLister pod := newPod("pod", rc, v1.PodRunning, nil, false) @@ -1373,8 +1376,8 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) { } func TestUpdateSelectorControllerRef(t *testing.T) { - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled() rc := newReplicationController(2) + manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) // put 2 pods in the podLister newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodRunning, rc, "pod") // update the RC so that its selector no longer matches the pods @@ -1406,10 +1409,10 @@ func TestUpdateSelectorControllerRef(t *testing.T) { // RC manager shouldn't adopt or create more pods if the rc is about to be // deleted. func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled() rc := newReplicationController(2) now := metav1.Now() rc.DeletionTimestamp = &now + manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) rcInformer.Informer().GetIndexer().Add(rc) pod1 := newPod("pod1", rc, v1.PodRunning, nil, false) podInformer.Informer().GetIndexer().Add(pod1) @@ -1422,6 +1425,30 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { validateSyncReplication(t, fakePodControl, 0, 0, 0) } +func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { + // Bare client says it IS deleted. + rc := newReplicationController(2) + now := metav1.Now() + rc.DeletionTimestamp = &now + manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) + // Lister (cache) says it's NOT deleted. + rc2 := *rc + rc2.DeletionTimestamp = nil + rcInformer.Informer().GetIndexer().Add(&rc2) + + // Recheck occurs if a matching orphan is present. + pod1 := newPod("pod1", rc, v1.PodRunning, nil, false) + podInformer.Informer().GetIndexer().Add(pod1) + + // sync should abort. + err := manager.syncReplicationController(getKey(rc, t)) + if err == nil { + t.Error("syncReplicationController() err = nil, expected non-nil") + } + // no patch, no create. + validateSyncReplication(t, fakePodControl, 0, 0, 0) +} + func TestReadyReplicas(t *testing.T) { // This is a happy server just to record the PUT request we expect for status.Replicas fakeHandler := utiltesting.FakeHandler{ diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 04fce3b72db..3257e738b8f 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -289,7 +289,20 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, s return isMemberOf(set, pod) } - cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind) + // If any adoptions are attempted, we should first recheck for deletion with + // an uncached quorum read sometime after listing Pods (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := ssc.kubeClient.AppsV1beta1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != set.UID { + return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID) + } + return fresh, nil + }) + + cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, canAdoptFunc) return cm.ClaimPods(pods, filter) } @@ -386,6 +399,7 @@ func (ssc *StatefulSetController) sync(key string) error { utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err)) return err } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err)) diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 8a8fa67c5ff..2a74eeea89f 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -214,69 +214,6 @@ func TestStatefulSetControlReplacesPods(t *testing.T) { } } -func TestStatefulSetDeletionTimestamp(t *testing.T) { - set := newStatefulSet(5) - client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) - - stop := make(chan struct{}) - defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - - // Bring up a StatefulSet. - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { - t.Errorf("failed to turn up StatefulSet : %s", err) - } - var err error - set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) - if err != nil { - t.Fatalf("error getting updated StatefulSet: %v", err) - } - if set.Status.Replicas != 5 { - t.Error("failed to scale statefulset to 5 replicas") - } - selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) - if err != nil { - t.Error(err) - } - pods, err := spc.podsLister.Pods(set.Namespace).List(selector) - if err != nil { - t.Error(err) - } - sort.Sort(ascendingOrdinal(pods)) - - // Mark the StatefulSet as being deleted. - set.DeletionTimestamp = new(metav1.Time) - - // Delete the first pod. - spc.podsIndexer.Delete(pods[0]) - pods, err = spc.podsLister.Pods(set.Namespace).List(selector) - if err != nil { - t.Error(err) - } - - // The StatefulSet should update its replica count, - // but not try to fix it. - if err := ssc.UpdateStatefulSet(set, pods); err != nil { - t.Errorf("failed to update StatefulSet : %s", err) - } - set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) - if err != nil { - t.Fatalf("error getting updated StatefulSet: %v", err) - } - if e, a := int32(4), set.Status.Replicas; e != a { - t.Errorf("expected to scale to %d, got %d", e, a) - } -} - func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index b996b6ca492..71b24b9171c 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -22,9 +22,8 @@ import ( "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" @@ -35,8 +34,8 @@ import ( func alwaysReady() bool { return true } func TestStatefulSetControllerCreates(t *testing.T) { - ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) + ssc, spc := newFakeStatefulSetController(set) if err := scaleUpStatefulSetController(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } @@ -51,8 +50,8 @@ func TestStatefulSetControllerCreates(t *testing.T) { } func TestStatefulSetControllerDeletes(t *testing.T) { - ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) + ssc, spc := newFakeStatefulSetController(set) if err := scaleUpStatefulSetController(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } @@ -79,8 +78,8 @@ func TestStatefulSetControllerDeletes(t *testing.T) { } func TestStatefulSetControllerRespectsTermination(t *testing.T) { - ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) + ssc, spc := newFakeStatefulSetController(set) if err := scaleUpStatefulSetController(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } @@ -130,8 +129,8 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) { } func TestStatefulSetControllerBlocksScaling(t *testing.T) { - ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) + ssc, spc := newFakeStatefulSetController(set) if err := scaleUpStatefulSetController(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } @@ -176,6 +175,63 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) { } } +func TestStatefulSetControllerDeletionTimestamp(t *testing.T) { + set := newStatefulSet(3) + set.DeletionTimestamp = new(metav1.Time) + ssc, spc := newFakeStatefulSetController(set) + + spc.setsIndexer.Add(set) + + // Force a sync. It should not try to create any Pods. + ssc.enqueueStatefulSet(set) + fakeWorker(ssc) + + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + if got, want := len(pods), 0; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } +} + +func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) { + set := newStatefulSet(3) + // The bare client says it IS deleted. + set.DeletionTimestamp = new(metav1.Time) + ssc, spc := newFakeStatefulSetController(set) + + // The lister (cache) says it's NOT deleted. + set2 := *set + set2.DeletionTimestamp = nil + spc.setsIndexer.Add(&set2) + + // The recheck occurs in the presence of a matching orphan. + pod := newStatefulSetPod(set, 1) + pod.OwnerReferences = nil + spc.podsIndexer.Add(pod) + + // Force a sync. It should not try to create any Pods. + ssc.enqueueStatefulSet(set) + fakeWorker(ssc) + + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } +} + func TestStatefulSetControllerAddPod(t *testing.T) { ssc, spc := newFakeStatefulSetController() set1 := newStatefulSet(3) @@ -437,8 +493,8 @@ func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) { } func TestGetPodsForStatefulSetAdopt(t *testing.T) { - ssc, spc := newFakeStatefulSetController() set := newStatefulSet(5) + ssc, spc := newFakeStatefulSetController(set) pod1 := newStatefulSetPod(set, 1) // pod2 is an orphan with matching labels and name. pod2 := newStatefulSetPod(set, 2) @@ -479,8 +535,8 @@ func TestGetPodsForStatefulSetAdopt(t *testing.T) { } func TestGetPodsForStatefulSetRelease(t *testing.T) { - ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) + ssc, spc := newFakeStatefulSetController(set) pod1 := newStatefulSetPod(set, 1) // pod2 is owned but has wrong name. pod2 := newStatefulSetPod(set, 2) @@ -518,8 +574,8 @@ func TestGetPodsForStatefulSetRelease(t *testing.T) { } } -func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodControl) { - client := fake.NewSimpleClientset() +func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSetController, *fakeStatefulPodControl) { + client := fake.NewSimpleClientset(initialObjects...) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) fpc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) ssc := NewStatefulSetController(