diff --git a/cmd/kube-controller-manager/app/extensions.go b/cmd/kube-controller-manager/app/extensions.go index f349ecbe212..a3218496fa7 100644 --- a/cmd/kube-controller-manager/app/extensions.go +++ b/cmd/kube-controller-manager/app/extensions.go @@ -36,7 +36,6 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) { ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Nodes(), ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), - int(ctx.Options.LookupCacheSizeForDaemonSet), ).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop) return true, nil } diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 196ba2742bd..4738d56b3af 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -134,7 +134,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load") fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "This flag is deprecated and will be removed in future releases. ReplicationController no longer requires a lookup cache.") fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "This flag is deprecated and will be removed in future releases. ReplicaSet no longer requires a lookup cache.") - fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.") + fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "This flag is deprecated and will be removed in future releases. DaemonSet no longer requires a lookup cache.") fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers") fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", 0, ""+ "This flag is deprecated and will be removed in future releases. See node-monitor-period for Node health checking or "+ diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 3ca29d93d4d..d3ed2e45f51 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -79,6 +79,7 @@ go_test( "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", + "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 32eb8778758..d850834911c 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -108,13 +108,11 @@ type DaemonSetsController struct { // Added as a member to the struct to allow injection for testing. nodeStoreSynced cache.InformerSynced - lookupCache *controller.MatchingCache - // DaemonSet keys that need to be synced. queue workqueue.RateLimitingInterface } -func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface, lookupCacheSize int) *DaemonSetsController { +func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -144,21 +142,6 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo UpdateFunc: func(old, cur interface{}) { oldDS := old.(*extensions.DaemonSet) curDS := cur.(*extensions.DaemonSet) - // We should invalidate the whole lookup cache if a DS's selector has been updated. - // - // Imagine that you have two RSs: - // * old DS1 - // * new DS2 - // You also have a pod that is attached to DS2 (because it doesn't match DS1 selector). - // Now imagine that you are changing DS1 selector so that it is now matching that pod, - // in such case we must invalidate the whole cache so that pod could be adopted by DS1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) { - dsc.lookupCache.InvalidateAll() - } - glog.V(4).Infof("Updating daemon set %s", oldDS.Name) dsc.enqueueDaemonSet(curDS) }, @@ -187,7 +170,6 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo dsc.syncHandler = dsc.syncDaemonSet dsc.enqueueDaemonSet = dsc.enqueue - dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return dsc } @@ -276,78 +258,57 @@ func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after ti dsc.queue.AddAfter(key, after) } -func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.DaemonSet { - // look up in the cache, if cached and the cache is valid, just return cached value - if obj, cached := dsc.lookupCache.GetMatchingObject(pod); cached { - ds, ok := obj.(*extensions.DaemonSet) - if !ok { - // This should not happen - utilruntime.HandleError(fmt.Errorf("lookup cache does not return a DaemonSet object")) - return nil - } - if dsc.isCacheValid(pod, ds) { - return ds - } - } +// getPodDaemonSets returns a list of DaemonSets that potentially match the pod. +func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.DaemonSet { sets, err := dsc.dsLister.GetPodDaemonSets(pod) if err != nil { glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name) return nil } if len(sets) > 1 { - // More than two items in this list indicates user error. If two daemon - // sets overlap, sort by creation timestamp, subsort by name, then pick - // the first. + // ControllerRef will ensure we don't do anythign crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(byCreationTimestamp(sets)) } - - // update lookup cache - dsc.lookupCache.Update(pod, sets[0]) - - return sets[0] -} - -// isCacheValid check if the cache is valid -func (dsc *DaemonSetsController) isCacheValid(pod *v1.Pod, cachedDS *extensions.DaemonSet) bool { - _, err := dsc.dsLister.DaemonSets(cachedDS.Namespace).Get(cachedDS.Name) - // ds has been deleted or updated, cache is invalid - if err != nil || !isDaemonSetMatch(pod, cachedDS) { - return false - } - return true -} - -// isDaemonSetMatch take a Pod and DaemonSet, return whether the Pod and DaemonSet are matching -// TODO(mqliang): This logic is a copy from GetPodDaemonSets(), remove the duplication -func isDaemonSetMatch(pod *v1.Pod, ds *extensions.DaemonSet) bool { - if ds.Namespace != pod.Namespace { - return false - } - selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) - if err != nil { - err = fmt.Errorf("invalid selector: %v", err) - return false - } - - // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - return false - } - return true + return sets } func (dsc *DaemonSetsController) addPod(obj interface{}) { pod := obj.(*v1.Pod) glog.V(4).Infof("Pod %s added.", pod.Name) - if ds := dsc.getPodDaemonSet(pod); ds != nil { + + if pod.DeletionTimestamp != nil { + // on a restart of the controller manager, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + dsc.deletePod(pod) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } dsKey, err := controller.KeyFunc(ds) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) return } dsc.expectations.CreationObserved(dsKey) dsc.enqueueDaemonSet(ds) + return + } + + // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, ds := range dsc.getPodDaemonSets(pod) { + dsc.enqueueDaemonSet(ds) } } @@ -364,22 +325,43 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { } glog.V(4).Infof("Pod %s updated.", curPod.Name) changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) - if curDS := dsc.getPodDaemonSet(curPod); curDS != nil { - dsc.enqueueDaemonSet(curDS) + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) - // See https://github.com/kubernetes/kubernetes/pull/38076 for more details - if changedToReady && curDS.Spec.MinReadySeconds > 0 { - dsc.enqueueDaemonSetAfter(curDS, time.Duration(curDS.Spec.MinReadySeconds)*time.Second) + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + ds, err := dsc.dsLister.DaemonSets(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + dsc.enqueueDaemonSet(ds) } } - // If the labels have not changed, then the daemon set responsible for - // the pod is the same as it was before. In that case we have enqueued the daemon - // set above, and do not have to enqueue the set again. - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - // It's ok if both oldDS and curDS are the same, because curDS will set - // the expectations on its run so oldDS will have no effect. - if oldDS := dsc.getPodDaemonSet(oldPod); oldDS != nil { - dsc.enqueueDaemonSet(oldDS) + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + ds, err := dsc.dsLister.DaemonSets(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + dsc.enqueueDaemonSet(ds) + // See https://github.com/kubernetes/kubernetes/pull/38076 for more details + if changedToReady && ds.Spec.MinReadySeconds > 0 { + dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second) + } + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, ds := range dsc.getPodDaemonSets(curPod) { + dsc.enqueueDaemonSet(ds) } } } @@ -394,25 +376,37 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } glog.V(4).Infof("Pod %s deleted.", pod.Name) - if ds := dsc.getPodDaemonSet(pod); ds != nil { - dsKey, err := controller.KeyFunc(ds) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) - return - } - dsc.expectations.DeletionObserved(dsKey) - dsc.enqueueDaemonSet(ds) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + dsKey, err := controller.KeyFunc(ds) + if err != nil { + return + } + dsc.expectations.DeletionObserved(dsKey) + dsc.enqueueDaemonSet(ds) } func (dsc *DaemonSetsController) addNode(obj interface{}) { @@ -481,7 +475,7 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) return nil, err } // Use ControllerRefManager to adopt/orphan as needed. - cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, ControllerKind) + cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind) claimedPods, err := cm.ClaimPods(pods) if err != nil { return nil, err @@ -867,7 +861,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten } // ignore pods that belong to the daemonset when taking into account whether // a daemonset should bind to a node. - if pds := dsc.getPodDaemonSet(pod); pds != nil && ds.Name == pds.Name { + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil && controllerRef.UID == ds.UID { continue } pods = append(pods, pod) @@ -975,8 +969,8 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit func newControllerRef(ds *extensions.DaemonSet) *metav1.OwnerReference { isController := true return &metav1.OwnerReference{ - APIVersion: ControllerKind.GroupVersion().String(), - Kind: ControllerKind.Kind, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, Name: ds.Name, UID: ds.UID, Controller: &isController, diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index ea62750ae50..b41ba545840 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -18,6 +18,9 @@ package daemon import ( "fmt" + "reflect" + "sort" + "strconv" "sync" "testing" @@ -30,6 +33,7 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/v1" @@ -248,7 +252,6 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), clientset, - 0, ) manager.eventRecorder = record.NewFakeRecorder(100) @@ -542,16 +545,17 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) { manager, podControl, _ := newTestController() node := newNode("port-conflict", nil) manager.nodeStore.Add(node) - manager.podStore.Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: simpleDaemonSetLabel, - Namespace: metav1.NamespaceDefault, - }, - Spec: podSpec, - }) ds := newDaemonSet("foo") ds.Spec.Template.Spec = podSpec manager.dsStore.Add(ds) + manager.podStore.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: simpleDaemonSetLabel, + Namespace: metav1.NamespaceDefault, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(ds)}, + }, + Spec: podSpec, + }) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } @@ -1244,3 +1248,244 @@ func TestGetNodesToDaemonPods(t *testing.T) { t.Errorf("unexpected pod %v was returned", podName) } } + +func TestAddPod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + manager.addPod(pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + manager.addPod(pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestAddPodOrphan(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + ds3 := newDaemonSet("foo3") + ds3.Spec.Selector.MatchLabels = simpleDaemonSetLabel2 + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + manager.dsStore.Add(ds3) + + // Make pod an orphan. Expect matching sets to be queued. + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + manager.addPod(pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + if got, want := getQueuedKeys(manager.queue), []string{"default/foo1", "default/foo2"}; !reflect.DeepEqual(got, want) { + t.Errorf("getQueuedKeys() = %v, want %v", got, want) + } +} + +func TestUpdatePod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod1 + bumpResourceVersion(pod1) + manager.updatePod(&prev, pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + prev = *pod2 + bumpResourceVersion(pod2) + manager.updatePod(&prev, pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanSameLabels(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + prev := *pod + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanWithNewLabels(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + prev := *pod + prev.Labels = map[string]string{"foo2": "bar2"} + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + if got, want := getQueuedKeys(manager.queue), []string{"default/foo1", "default/foo2"}; !reflect.DeepEqual(got, want) { + t.Errorf("getQueuedKeys() = %v, want %v", got, want) + } +} + +func TestUpdatePodChangeControllerRef(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod + prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(ds2)} + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodControllerRefRemoved(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod + pod.OwnerReferences = nil + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestDeletePod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + manager.deletePod(pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + manager.deletePod(pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestDeletePodOrphan(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + ds3 := newDaemonSet("foo3") + ds3.Spec.Selector.MatchLabels = simpleDaemonSetLabel2 + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + manager.dsStore.Add(ds3) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + manager.deletePod(pod) + if got, want := manager.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func bumpResourceVersion(obj metav1.Object) { + ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) + obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) +} + +// getQueuedKeys returns a sorted list of keys in the queue. +// It can be used to quickly check that multiple keys are in there. +func getQueuedKeys(queue workqueue.RateLimitingInterface) []string { + var keys []string + count := queue.Len() + for i := 0; i < count; i++ { + key, done := queue.Get() + if done { + return keys + } + keys = append(keys, key.(string)) + } + sort.Strings(keys) + return keys +}